Drop rust-Kathy (#713)
parent
ae5db869a5
commit
02f9589fdc
@ -1,29 +0,0 @@ |
||||
[package] |
||||
name = "kathy" |
||||
version = "0.1.0" |
||||
edition = "2021" |
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html |
||||
|
||||
[dependencies] |
||||
async-trait = { version = "0.1", default-features = false } |
||||
eyre = "0.6" |
||||
color-eyre = { version = "0.6", optional = true } |
||||
config = "0.13" |
||||
ethers = { git = "https://github.com/abacus-network/ethers-rs", tag = "2022-08-19-01" } |
||||
futures-util = "0.3" |
||||
serde = {version = "1.0", features = ["derive"]} |
||||
serde_json = {version = "1.0", default-features = false} |
||||
thiserror = {version = "1.0", default-features = false} |
||||
tokio = {version = "1", features = ["rt", "macros"]} |
||||
tracing = "0.1" |
||||
tracing-futures = "0.2" |
||||
tracing-subscriber = "0.3" |
||||
rand = "0.8.3" |
||||
|
||||
abacus-base = {path = "../../abacus-base"} |
||||
abacus-core = {path = "../../abacus-core"} |
||||
|
||||
[features] |
||||
default = ["color-eyre"] |
||||
oneline-errors = ["abacus-base/oneline-eyre"] |
@ -1,190 +0,0 @@ |
||||
use std::{sync::Arc, time::Duration}; |
||||
|
||||
use ethers::core::types::H256; |
||||
use eyre::{Result, WrapErr}; |
||||
use rand::distributions::Alphanumeric; |
||||
use rand::{thread_rng, Rng}; |
||||
use tokio::{sync::Mutex, task::JoinHandle, time::sleep}; |
||||
use tracing::instrument::Instrumented; |
||||
use tracing::{info, Instrument}; |
||||
|
||||
use abacus_base::{decl_agent, run_all, AbacusAgentCore, Agent, BaseAgent, CachingInbox}; |
||||
use abacus_core::{AbacusCommon, Message, Outbox}; |
||||
|
||||
decl_agent!(Kathy { |
||||
duration: u64, |
||||
generator: ChatGenerator, |
||||
outbox_lock: Arc<Mutex<()>>, |
||||
}); |
||||
|
||||
impl Kathy { |
||||
pub fn new(duration: u64, generator: ChatGenerator, core: AbacusAgentCore) -> Self { |
||||
Self { |
||||
duration, |
||||
generator, |
||||
core, |
||||
outbox_lock: Arc::new(Mutex::new(())), |
||||
} |
||||
} |
||||
} |
||||
|
||||
#[async_trait::async_trait] |
||||
impl BaseAgent for Kathy { |
||||
const AGENT_NAME: &'static str = "kathy"; |
||||
|
||||
type Settings = crate::settings::KathySettings; |
||||
|
||||
async fn from_settings(settings: Self::Settings) -> Result<Self> { |
||||
Ok(Self::new( |
||||
settings.interval.parse().expect("invalid u64"), |
||||
settings.chat.into(), |
||||
settings |
||||
.base |
||||
.try_into_abacus_core(Self::AGENT_NAME, true) |
||||
.await?, |
||||
)) |
||||
} |
||||
|
||||
#[allow(clippy::async_yields_async)] |
||||
async fn run(&self) -> Instrumented<JoinHandle<Result<()>>> { |
||||
let inbox_tasks: Vec<Instrumented<JoinHandle<Result<()>>>> = self |
||||
.inboxes() |
||||
.iter() |
||||
.map(|(inbox_name, inbox_contracts)| { |
||||
self.wrap_inbox_run(inbox_name, inbox_contracts.inbox.clone()) |
||||
}) |
||||
.collect(); |
||||
run_all(inbox_tasks) |
||||
} |
||||
} |
||||
|
||||
impl Kathy { |
||||
#[tracing::instrument] |
||||
fn run_inbox(&self, inbox: Arc<CachingInbox>) -> Instrumented<JoinHandle<Result<()>>> { |
||||
let outbox = self.outbox(); |
||||
let outbox_lock = self.outbox_lock.clone(); |
||||
|
||||
let mut generator = self.generator.clone(); |
||||
let duration = Duration::from_secs(self.duration); |
||||
|
||||
tokio::spawn(async move { |
||||
let destination = inbox.local_domain(); |
||||
|
||||
loop { |
||||
let msg = generator.gen_chat(); |
||||
let recipient = generator.gen_recipient(); |
||||
|
||||
match msg { |
||||
Some(body) => { |
||||
let message = Message { |
||||
destination, |
||||
recipient, |
||||
body, |
||||
}; |
||||
info!( |
||||
target: "outgoing_messages", |
||||
"Enqueuing message of length {} to {}::{}", |
||||
length = message.body.len(), |
||||
destination = message.destination, |
||||
recipient = message.recipient |
||||
); |
||||
|
||||
let guard = outbox_lock.lock().await; |
||||
outbox.dispatch(&message).await?; |
||||
drop(guard); |
||||
} |
||||
_ => { |
||||
info!("Reached the end of the static message queue. Shutting down."); |
||||
return Ok(()); |
||||
} |
||||
} |
||||
|
||||
sleep(duration).await; |
||||
} |
||||
}) |
||||
.in_current_span() |
||||
} |
||||
|
||||
fn wrap_inbox_run( |
||||
&self, |
||||
inbox_name: &str, |
||||
inbox: Arc<CachingInbox>, |
||||
) -> Instrumented<JoinHandle<Result<()>>> { |
||||
let m = format!("Task for inbox named {} failed", inbox_name); |
||||
let handle = self.run_inbox(inbox).in_current_span(); |
||||
let fut = async move { handle.await?.wrap_err(m) }; |
||||
|
||||
tokio::spawn(fut).in_current_span() |
||||
} |
||||
} |
||||
|
||||
/// Generators for messages
|
||||
#[derive(Debug, Clone)] |
||||
pub enum ChatGenerator { |
||||
Static { |
||||
recipient: H256, |
||||
message: String, |
||||
}, |
||||
OrderedList { |
||||
messages: Vec<String>, |
||||
counter: usize, |
||||
}, |
||||
Random { |
||||
length: usize, |
||||
}, |
||||
Default, |
||||
} |
||||
|
||||
impl Default for ChatGenerator { |
||||
fn default() -> Self { |
||||
Self::Default |
||||
} |
||||
} |
||||
|
||||
impl ChatGenerator { |
||||
fn rand_string(length: usize) -> String { |
||||
thread_rng() |
||||
.sample_iter(&Alphanumeric) |
||||
.take(length) |
||||
.map(char::from) |
||||
.collect() |
||||
} |
||||
|
||||
pub fn gen_recipient(&mut self) -> H256 { |
||||
match self { |
||||
ChatGenerator::Default => Default::default(), |
||||
ChatGenerator::Static { |
||||
recipient, |
||||
message: _, |
||||
} => *recipient, |
||||
ChatGenerator::OrderedList { |
||||
messages: _, |
||||
counter: _, |
||||
} => Default::default(), |
||||
ChatGenerator::Random { length: _ } => H256::random(), |
||||
} |
||||
} |
||||
|
||||
pub fn gen_chat(&mut self) -> Option<Vec<u8>> { |
||||
match self { |
||||
ChatGenerator::Default => Some(Default::default()), |
||||
ChatGenerator::Static { |
||||
recipient: _, |
||||
message, |
||||
} => Some(message.as_bytes().to_vec()), |
||||
ChatGenerator::OrderedList { messages, counter } => { |
||||
if *counter >= messages.len() { |
||||
return None; |
||||
} |
||||
|
||||
let msg = messages[*counter].clone().into(); |
||||
|
||||
// Increment counter to next message in list
|
||||
*counter += 1; |
||||
|
||||
Some(msg) |
||||
} |
||||
ChatGenerator::Random { length } => Some(Self::rand_string(*length).into()), |
||||
} |
||||
} |
||||
} |
@ -1,41 +0,0 @@ |
||||
//! Kathy is chatty. She sends random messages to random recipients
|
||||
|
||||
#![forbid(unsafe_code)] |
||||
#![warn(missing_docs)] |
||||
#![warn(unused_extern_crates)] |
||||
|
||||
use eyre::Result; |
||||
|
||||
use abacus_base::{Agent, BaseAgent}; |
||||
|
||||
use crate::kathy::Kathy; |
||||
|
||||
mod kathy; |
||||
mod settings; |
||||
|
||||
async fn _main() -> Result<()> { |
||||
#[cfg(feature = "oneline-errors")] |
||||
abacus_base::oneline_eyre::install()?; |
||||
#[cfg(not(feature = "oneline-errors"))] |
||||
color_eyre::install()?; |
||||
|
||||
let settings = settings::KathySettings::new()?; |
||||
let agent = Kathy::from_settings(settings).await?; |
||||
|
||||
agent |
||||
.as_ref() |
||||
.settings |
||||
.tracing |
||||
.start_tracing(&agent.metrics())?; |
||||
let _ = agent.metrics().run_http_server(); |
||||
|
||||
agent.run().await.await? |
||||
} |
||||
|
||||
fn main() -> Result<()> { |
||||
tokio::runtime::Builder::new_current_thread() |
||||
.enable_all() |
||||
.build() |
||||
.unwrap() |
||||
.block_on(_main()) |
||||
} |
@ -1,54 +0,0 @@ |
||||
//! Configuration
|
||||
|
||||
use ethers::core::types::H256; |
||||
|
||||
use crate::kathy::ChatGenerator; |
||||
|
||||
use abacus_base::decl_settings; |
||||
|
||||
#[derive(Debug, serde::Deserialize)] |
||||
#[serde(tag = "type", rename_all = "camelCase")] |
||||
pub enum ChatGenConfig { |
||||
Static { |
||||
recipient: H256, |
||||
message: String, |
||||
}, |
||||
OrderedList { |
||||
messages: Vec<String>, |
||||
}, |
||||
Random { |
||||
length: usize, |
||||
}, |
||||
#[serde(other)] |
||||
Default, |
||||
} |
||||
|
||||
impl Default for ChatGenConfig { |
||||
fn default() -> Self { |
||||
Self::Default |
||||
} |
||||
} |
||||
|
||||
impl From<ChatGenConfig> for ChatGenerator { |
||||
fn from(conf: ChatGenConfig) -> ChatGenerator { |
||||
match conf { |
||||
ChatGenConfig::Static { recipient, message } => { |
||||
ChatGenerator::Static { recipient, message } |
||||
} |
||||
ChatGenConfig::OrderedList { messages } => ChatGenerator::OrderedList { |
||||
messages, |
||||
counter: 0, |
||||
}, |
||||
ChatGenConfig::Random { length } => ChatGenerator::Random { length }, |
||||
ChatGenConfig::Default => ChatGenerator::Default, |
||||
} |
||||
} |
||||
} |
||||
|
||||
decl_settings!(Kathy { |
||||
/// The message interval (in seconds)
|
||||
interval: String, |
||||
/// Chat generation configuration
|
||||
#[serde(default)] |
||||
chat: ChatGenConfig, |
||||
}); |
@ -1,50 +0,0 @@ |
||||
{{- if .Values.abacus.kathy.enabled }} |
||||
apiVersion: external-secrets.io/v1beta1 |
||||
kind: ExternalSecret |
||||
metadata: |
||||
name: {{ include "abacus-agent.fullname" . }}-kathy-external-secret |
||||
labels: |
||||
{{- include "abacus-agent.labels" . | nindent 4 }} |
||||
annotations: |
||||
update-on-redeploy: "{{ now }}" |
||||
spec: |
||||
secretStoreRef: |
||||
name: {{ include "abacus-agent.cluster-secret-store.name" . }} |
||||
kind: ClusterSecretStore |
||||
refreshInterval: "1h" |
||||
# The secret that will be created |
||||
target: |
||||
name: {{ include "abacus-agent.fullname" . }}-kathy-secret |
||||
template: |
||||
type: Opaque |
||||
metadata: |
||||
labels: |
||||
{{- include "abacus-agent.labels" . | nindent 10 }} |
||||
data: |
||||
{{- range .Values.abacus.kathy.signers }} |
||||
{{- if eq .keyConfig.type "hexKey" }} |
||||
ABC_BASE_SIGNERS_{{ .name | upper }}_KEY: {{ printf "'{{ .%s_signer_key | toString }}'" .name }} |
||||
{{- end }} |
||||
{{- end }} |
||||
{{- if .Values.abacus.kathy.aws }} |
||||
AWS_ACCESS_KEY_ID: {{ print "'{{ .aws_access_key_id | toString }}'" }} |
||||
AWS_SECRET_ACCESS_KEY: {{ print "'{{ .aws_secret_access_key | toString }}'" }} |
||||
{{- end }} |
||||
data: |
||||
{{- range .Values.abacus.kathy.signers }} |
||||
{{- if eq .keyConfig.type "hexKey" }} |
||||
- secretKey: {{ printf "%s_signer_key" .name }} |
||||
remoteRef: |
||||
key: {{ printf "%s-%s-key-kathy" $.Values.abacus.context $.Values.abacus.runEnv }} |
||||
property: privateKey |
||||
{{- end }} |
||||
{{- end }} |
||||
{{- if .Values.abacus.kathy.aws }} |
||||
- secretKey: aws_access_key_id |
||||
remoteRef: |
||||
key: {{ printf "%s-%s-kathy-aws-access-key-id" .Values.abacus.context .Values.abacus.runEnv }} |
||||
- secretKey: aws_secret_access_key |
||||
remoteRef: |
||||
key: {{ printf "%s-%s-kathy-aws-secret-access-key" .Values.abacus.context .Values.abacus.runEnv }} |
||||
{{- end }} |
||||
{{- end }} |
@ -1,101 +0,0 @@ |
||||
{{- if .Values.abacus.kathy.enabled }} |
||||
apiVersion: apps/v1 |
||||
kind: StatefulSet |
||||
metadata: |
||||
name: {{ include "abacus-agent.fullname" . }}-kathy |
||||
labels: |
||||
{{- include "abacus-agent.labels" . | nindent 4 }} |
||||
app.kubernetes.io/component: kathy |
||||
spec: |
||||
selector: |
||||
matchLabels: |
||||
{{- include "abacus-agent.selectorLabels" . | nindent 6 }} |
||||
app.kubernetes.io/component: kathy |
||||
replicas: 1 |
||||
serviceName: {{ include "abacus-agent.fullname" . }}-kathy |
||||
template: |
||||
metadata: |
||||
annotations: |
||||
checksum/configmap: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }} |
||||
{{- with .Values.podAnnotations }} |
||||
{{- toYaml . | nindent 8 }} |
||||
{{- end }} |
||||
{{- with .Values.abacus.kathy.podAnnotations }} |
||||
{{ toYaml . | nindent 8 }} |
||||
{{- end }} |
||||
labels: |
||||
{{- include "abacus-agent.labels" . | nindent 8 }} |
||||
app.kubernetes.io/component: kathy |
||||
{{- with .Values.podCommonLabels }} |
||||
{{ toYaml . | nindent 8 }} |
||||
{{- end }} |
||||
{{- with .Values.abacus.kathy.podLabels }} |
||||
{{ toYaml . | nindent 8 }} |
||||
{{- end }} |
||||
spec: |
||||
{{- with .Values.imagePullSecrets }} |
||||
imagePullSecrets: |
||||
{{- toYaml . | nindent 8 }} |
||||
{{- end }} |
||||
terminationGracePeriodSeconds: 10 |
||||
securityContext: |
||||
{{- toYaml .Values.podSecurityContext | nindent 8 }} |
||||
containers: |
||||
- name: {{ .Chart.Name }} |
||||
securityContext: |
||||
{{- toYaml .Values.securityContext | nindent 10 }} |
||||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" |
||||
imagePullPolicy: {{ .Values.image.pullPolicy }} |
||||
command: ["./kathy"] |
||||
envFrom: |
||||
- configMapRef: |
||||
name: {{ include "abacus-agent.fullname" . }} |
||||
- secretRef: |
||||
name: {{ include "abacus-agent.fullname" . }}-secret |
||||
- secretRef: |
||||
name: {{ include "abacus-agent.fullname" . }}-kathy-secret |
||||
env: |
||||
{{- include "abacus-agent.config-env-vars" (dict "config" .Values.abacus.kathy.config "agent_name" "kathy") | indent 10 }} |
||||
{{- range .Values.abacus.kathy.signers }} |
||||
{{- include "abacus-agent.config-env-vars" (dict "config" .keyConfig "agent_name" "base" "key_name_prefix" (printf "SIGNERS_%s_" (.name | upper))) | indent 10 }} |
||||
{{- end }} |
||||
{{- if .Values.abacus.tracing.uri }} |
||||
- name: ABC_BASE_TRACING_JAEGER_NAME |
||||
value: {{ include "abacus-agent.fullname" . }}-kathy |
||||
{{- end }} |
||||
resources: |
||||
{{- toYaml .Values.abacus.kathy.resources | nindent 10 }} |
||||
volumeMounts: |
||||
- name: state |
||||
mountPath: {{ .Values.abacus.dbPath }} |
||||
ports: |
||||
- name: metrics |
||||
containerPort: {{ .Values.abacus.metrics.port }} |
||||
{{- with .Values.nodeSelector }} |
||||
nodeSelector: |
||||
{{- toYaml . | nindent 8 }} |
||||
{{- end }} |
||||
{{- with .Values.affinity }} |
||||
affinity: |
||||
{{- toYaml . | nindent 8 }} |
||||
{{- end }} |
||||
{{- with .Values.tolerations }} |
||||
tolerations: |
||||
{{- toYaml . | nindent 8 }} |
||||
{{- end }} |
||||
volumeClaimTemplates: |
||||
- metadata: |
||||
name: state |
||||
spec: |
||||
storageClassName: {{ .Values.storage.storageClass }} |
||||
accessModes: [ {{ .Values.storage.accessModes }} ] |
||||
{{- if .Values.abacus.kathy.storage.snapshot.enabled }} |
||||
dataSource: |
||||
name: {{ .Values.abacus.kathy.storage.snapshot.name }} |
||||
kind: VolumeSnapshot |
||||
apiGroup: snapshot.storage.k8s.io |
||||
{{- end }} |
||||
resources: |
||||
requests: |
||||
storage: {{ .Values.abacus.kathy.storage.size }} |
||||
{{- end }} |
Loading…
Reference in new issue