rApps Development Guide
Complete guide to building Network Applications (rApps) for the NeuAIs AI Network Layer.
Overview
rApps (Network Applications) automate network management tasks using ML-powered intelligence from RIC and orchestration from SMO.
Built-in rApps
1. Anomaly Detection rApp
Monitors network metrics and identifies unusual patterns using ML.
Features
- Real-time network monitoring
- ML-powered anomaly detection
- Intelligent alerting with webhooks
- Automatic remediation suggestions
Configuration
export RIC_ENDPOINT="http://localhost:8081"
export SMO_ENDPOINT="http://localhost:8080"
export ANOMALY_THRESHOLD="0.7"
export CHECK_INTERVAL="60s"
export WEBHOOK_URL="https://alerts.example.com/webhook"
Usage
cd services/ai-network/rapps/anomaly-detector
go run main.go
Alert Example
{
"id": "alert_abc123",
"node_id": "node-1",
"severity": "high",
"anomaly_score": 0.87,
"timestamp": "2024-01-15T10:00:00Z",
"contributing_factors": [
{"metric": "latency", "value": 250.5, "normal_range": "10-100"},
{"metric": "cpu_usage", "value": 95.2, "normal_range": "0-80"}
],
"suggested_actions": ["reroute_traffic", "scale_resources"]
}
2. Traffic Optimization rApp
AI-powered route optimization for network traffic.
Features
- Multi-objective scoring (latency, throughput, cost)
- Automatic route updates via SMO
- Local fallback when RIC unavailable
- Tracks optimization improvements
Configuration
export RIC_ENDPOINT="http://localhost:8081"
export SMO_ENDPOINT="http://localhost:8080"
export OPTIMIZATION_INTERVAL="300s"
export LATENCY_WEIGHT="0.4"
export THROUGHPUT_WEIGHT="0.4"
export COST_WEIGHT="0.2"
export SCORE_THRESHOLD="0.7"
Usage
cd services/ai-network/rapps/traffic-optimizer
go run main.go
Optimization Example
{
"route": {
"source": "node-1",
"destination": "node-5",
"path": ["node-1", "node-3", "node-5"]
},
"metrics": {
"latency": 45.2,
"throughput": 1024.5,
"cost": 0.05
},
"score": 0.85,
"improvement": 0.15
}
Creating Custom rApps
Step 1: Implement the Interface
package myrapps
import (
"context"
"github.com/neuais/ai-network/rapps/framework"
)
type MyRApp struct {
*framework.BaseRApp
config MyConfig
}
type MyConfig struct {
Threshold float64
Interval time.Duration
}
func NewMyRApp() *MyRApp {
base := framework.NewBaseRApp(
"my-rapp",
"1.0.0",
"Description of my rApp",
)
return &MyRApp{
BaseRApp: base,
config: MyConfig{
Threshold: 0.8,
Interval: 60 * time.Second,
},
}
}
func (r *MyRApp) Initialize(ctx context.Context, config map[string]interface{}) error {
// Parse config
if threshold, ok := config["threshold"].(float64); ok {
r.config.Threshold = threshold
}
// Initialize resources
return nil
}
func (r *MyRApp) Start(ctx context.Context) error {
ticker := time.NewTicker(r.config.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := r.process(ctx); err != nil {
return err
}
}
}
}
func (r *MyRApp) ProcessEvent(ctx context.Context, event framework.NetworkEvent) ([]framework.NetworkAction, error) {
// Process network events
if event.Priority > 8 {
return []framework.NetworkAction{
{
Type: "alert",
Target: "admin",
Operation: "send_notification",
Parameters: map[string]interface{}{
"message": event.Data,
},
Reason: "High priority event detected",
},
}, nil
}
return nil, nil
}
func (r *MyRApp) GetStatus() framework.RAppStatus {
return framework.RAppStatus{
State: framework.RAppStateRunning,
Health: framework.RAppHealthHealthy,
Uptime: time.Since(r.StartTime),
}
}
func (r *MyRApp) GetMetrics() framework.RAppMetrics {
return framework.RAppMetrics{
EventsProcessed: r.EventCount,
ActionsGenerated: r.ActionCount,
ErrorCount: r.ErrorCount,
}
}
func (r *MyRApp) Stop(ctx context.Context) error {
// Cleanup resources
return nil
}
func (r *MyRApp) process(ctx context.Context) error {
// Your processing logic
return nil
}
Step 2: Create Main Entry Point
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/neuais/ai-network/rapps/myrapps"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create rApp
rapp := myrapps.NewMyRApp()
// Initialize
config := map[string]interface{}{
"threshold": 0.8,
"interval": "60s",
}
if err := rapp.Initialize(ctx, config); err != nil {
log.Fatal(err)
}
// Start
go func() {
if err := rapp.Start(ctx); err != nil {
log.Fatal(err)
}
}()
// Wait for signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
// Stop
if err := rapp.Stop(ctx); err != nil {
log.Fatal(err)
}
}
Step 3: Register with SMO
curl -X POST http://localhost:8080/api/v1/rapps \
-H "Content-Type: application/json" \
-d '{
"name": "my-rapp",
"type": "custom",
"version": "1.0.0",
"endpoint": "http://localhost:8082"
}'
RIC Integration
Making Inference Requests
type RICClient struct {
endpoint string
client *http.Client
}
func (c *RICClient) Infer(modelID string, features map[string]float64) (*InferenceResult, error) {
req := InferenceRequest{
ModelID: modelID,
Features: features,
}
resp, err := c.client.Post(
c.endpoint+"/api/v1/infer",
"application/json",
marshalJSON(req),
)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result InferenceResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return &result, nil
}
SMO Integration
Publishing Events
func (r *MyRApp) publishEvent(ctx context.Context, eventType string, data interface{}) error {
event := Event{
Type: eventType,
Source: r.Name,
Severity: "medium",
Timestamp: time.Now(),
Data: data,
}
return r.smo.PublishEvent(ctx, event)
}
Generating Actions
func (r *MyRApp) ProcessEvent(ctx context.Context, event framework.NetworkEvent) ([]framework.NetworkAction, error) {
actions := []framework.NetworkAction{}
if event.Type == "high_latency" {
actions = append(actions, framework.NetworkAction{
Type: "reroute",
Target: event.NodeID,
Operation: "find_alternate_path",
Priority: 8,
Reason: "High latency detected",
})
}
return actions, nil
}
Testing
Unit Tests
func TestMyRApp(t *testing.T) {
rapp := NewMyRApp()
config := map[string]interface{}{
"threshold": 0.7,
}
ctx := context.Background()
if err := rapp.Initialize(ctx, config); err != nil {
t.Fatal(err)
}
event := framework.NetworkEvent{
Type: "test_event",
Priority: 9,
}
actions, err := rapp.ProcessEvent(ctx, event)
if err != nil {
t.Fatal(err)
}
if len(actions) == 0 {
t.Error("Expected actions to be generated")
}
}
Integration Tests
# Start RIC and SMO
docker-compose up -d ric smo
# Start rApp
go run main.go &
# Send test event
curl -X POST http://localhost:8080/api/v1/events \
-H "Content-Type: application/json" \
-d '{
"type": "test_event",
"source": "test",
"data": {"node_id": "node-1"}
}'
# Check actions were generated
curl http://localhost:8080/api/v1/events | jq '.events[] | select(.type == "action_generated")'
Best Practices
- Error Handling: Always return errors, never panic
- Context: Respect context cancellation
- Metrics: Emit metrics for monitoring
- Logging: Use structured logging
- Configuration: Make everything configurable
- Testing: Write unit and integration tests
- Documentation: Document your rApp’s behavior
Next Steps
- AI Network Service - AI Network overview
- Services Overview - All backend services
- Agent Development - Create agents