Creating Agents
Learn how to build custom agents in Rust, Go, Python, or TypeScript.
Agent Structure
All agents follow the same basic structure:
- Initialize: Set up resources and connections
- Run: Main execution loop
- Health: Report health status
- Shutdown: Clean up resources
Rust Agent
Setup
[dependencies]
neuais-sdk = "0.1"
tokio = { version = "1", features = ["full"] }
anyhow = "1"
Basic Agent
use neuais_sdk::prelude::*;
use anyhow::Result;
#[agent(name = "my-rust-agent")]
pub struct MyAgent {
counter: u64,
config: AgentConfig,
}
#[async_trait]
impl Agent for MyAgent {
async fn initialize(config: AgentConfig) -> Result<Self> {
Ok(Self {
counter: 0,
config,
})
}
async fn run(&mut self, ctx: &Context) -> Result<()> {
loop {
self.counter += 1;
ctx.log(format!("Processing: {}", self.counter)).await?;
ctx.emit_metric("counter", self.counter as f64).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
async fn health(&self) -> HealthStatus {
if self.counter > 0 {
HealthStatus::Healthy
} else {
HealthStatus::Starting
}
}
async fn shutdown(&mut self) -> Result<()> {
ctx.log("Shutting down").await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let config = AgentConfig::from_env()?;
let agent = MyAgent::initialize(config).await?;
agent.start().await
}
Advanced Features
#![allow(unused)]
fn main() {
// HTTP endpoint
#[endpoint(path = "/status", method = "GET")]
async fn status(&self) -> Response {
json!({
"counter": self.counter,
"uptime": self.uptime()
})
}
// Background task
#[task(interval = "30s")]
async fn cleanup(&mut self, ctx: &Context) -> Result<()> {
ctx.log("Running cleanup").await?;
Ok(())
}
// Event handler
#[event(type = "user.created")]
async fn on_user_created(&mut self, event: Event) -> Result<()> {
let user_id = event.data["user_id"].as_str()?;
ctx.log(format!("New user: {}", user_id)).await?;
Ok(())
}
}
Go Agent
Setup
go get github.com/neuais/sdk-go
Basic Agent
package main
import (
"context"
"log"
"time"
"github.com/neuais/sdk-go/neuais"
)
type MyAgent struct {
counter int64
config *neuais.Config
}
func (a *MyAgent) Initialize(config *neuais.Config) error {
a.config = config
a.counter = 0
return nil
}
func (a *MyAgent) Run(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
a.counter++
log.Printf("Processing: %d", a.counter)
neuais.EmitMetric("counter", float64(a.counter))
}
}
}
func (a *MyAgent) Health() neuais.HealthStatus {
if a.counter > 0 {
return neuais.HealthStatusHealthy
}
return neuais.HealthStatusStarting
}
func (a *MyAgent) Shutdown() error {
log.Println("Shutting down")
return nil
}
func main() {
config := neuais.LoadConfig()
agent := &MyAgent{}
if err := agent.Initialize(config); err != nil {
log.Fatal(err)
}
if err := neuais.Run(agent); err != nil {
log.Fatal(err)
}
}
Python Agent
Setup
pip install neuais
Basic Agent
from neuais import Agent, Context, HealthStatus
import asyncio
class MyAgent(Agent):
def __init__(self, config):
self.counter = 0
self.config = config
async def run(self, ctx: Context):
while True:
self.counter += 1
await ctx.log(f"Processing: {self.counter}")
await ctx.emit_metric("counter", self.counter)
await asyncio.sleep(5)
async def health(self) -> HealthStatus:
if self.counter > 0:
return HealthStatus.HEALTHY
return HealthStatus.STARTING
async def shutdown(self):
await self.ctx.log("Shutting down")
if __name__ == "__main__":
config = Agent.load_config()
agent = MyAgent(config)
agent.start()
TypeScript Agent
Setup
npm install @neuais/sdk
Basic Agent
import { Agent, Context, HealthStatus } from '@neuais/sdk';
class MyAgent extends Agent {
private counter = 0;
async run(ctx: Context): Promise<void> {
while (true) {
this.counter++;
await ctx.log(`Processing: ${this.counter}`);
await ctx.emitMetric('counter', this.counter);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
async health(): Promise<HealthStatus> {
return this.counter > 0
? HealthStatus.Healthy
: HealthStatus.Starting;
}
async shutdown(): Promise<void> {
await this.ctx.log('Shutting down');
}
}
const config = Agent.loadConfig();
const agent = new MyAgent(config);
agent.start();
Configuration
Agent Config File
agent.toml:
[agent]
name = "my-agent"
version = "1.0.0"
runtime = "rust"
[resources]
cpu = "1.0"
memory = "1Gi"
replicas = 3
[health]
endpoint = "/health"
interval = "30s"
timeout = "5s"
retries = 3
[scaling]
min_replicas = 1
max_replicas = 10
target_cpu = 70
target_memory = 80
[environment]
LOG_LEVEL = "info"
METRICS_PORT = "9090"
DATABASE_URL = "postgresql://localhost/db"
[endpoints]
"/status" = { method = "GET", public = true }
"/metrics" = { method = "GET", public = false }
Best Practices
Error Handling
#![allow(unused)]
fn main() {
// Good: Return errors
async fn process(&self, ctx: &Context) -> Result<()> {
let data = fetch_data().await?;
process_data(data)?;
Ok(())
}
// Bad: Panic
async fn process(&self, ctx: &Context) {
let data = fetch_data().await.unwrap();
process_data(data).unwrap();
}
}
Logging
#![allow(unused)]
fn main() {
// Structured logging
ctx.log_info("Processing started", json!({
"user_id": user_id,
"batch_size": batch.len()
})).await?;
// Log levels
ctx.log_debug("Debug info").await?;
ctx.log_info("Info message").await?;
ctx.log_warn("Warning").await?;
ctx.log_error("Error occurred").await?;
}
Metrics
#![allow(unused)]
fn main() {
// Counter
ctx.emit_metric("requests_total", 1.0).await?;
// Gauge
ctx.emit_metric("queue_size", queue.len() as f64).await?;
// Histogram
ctx.emit_metric("request_duration_ms", duration.as_millis() as f64).await?;
// With labels
ctx.emit_metric_with_labels(
"requests_total",
1.0,
&[("method", "GET"), ("status", "200")]
).await?;
}
Graceful Shutdown
#![allow(unused)]
fn main() {
async fn run(&mut self, ctx: &Context) -> Result<()> {
loop {
select! {
_ = ctx.shutdown_signal() => {
self.cleanup().await?;
break;
}
result = self.process_batch() => {
result?;
}
}
}
Ok(())
}
}
Testing
Unit Tests
#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_agent_initialization() {
let config = AgentConfig::default();
let agent = MyAgent::initialize(config).await.unwrap();
assert_eq!(agent.counter, 0);
}
#[tokio::test]
async fn test_health_check() {
let agent = create_test_agent();
let status = agent.health().await;
assert_eq!(status, HealthStatus::Healthy);
}
}
}
Integration Tests
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_agent_deployment() {
let client = NeuaisClient::new()?;
let agent_id = client.deploy_agent(
"test-agent",
"./target/release/test-agent"
).await?;
// Wait for agent to start
tokio::time::sleep(Duration::from_secs(5)).await;
let status = client.get_agent_status(&agent_id).await?;
assert_eq!(status, "running");
client.delete_agent(&agent_id).await?;
}
}
Next Steps
- Agent Lifecycle - Understand agent lifecycle
- Deployment - Deploy agents to production
- Scaling - Auto-scaling configuration
- Monitoring - Metrics and observability