Examples
This page contains practical examples of using django-agents.
Email Notification Agent
from django.core.mail import send_mail
from django_agents.base import BaseAgent
from typing import Any, Dict
class EmailNotificationAgent(BaseAgent):
"""Send email notifications."""
name = "email_notification"
description = "Sends email notifications to users"
version = "1.0.0"
def validate_context(self, context: Dict[str, Any]) -> bool:
"""Validate required fields."""
super().validate_context(context)
required = ['recipient', 'subject', 'message']
for field in required:
if field not in context:
raise AgentValidationError(f"Missing required field: {field}")
return True
def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Send email."""
send_mail(
subject=context['subject'],
message=context['message'],
from_email='noreply@example.com',
recipient_list=[context['recipient']],
)
return {'success': True}
Async Report Generation Agent
from django_agents.base import AsyncAgent
from typing import Any, Dict
import asyncio
class ReportGeneratorAgent(AsyncAgent):
"""Generate reports asynchronously."""
name = "report_generator"
description = "Generates PDF reports"
version = "1.0.0"
queue_name = "reports"
priority = 7
async def async_execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Generate report asynchronously."""
report_type = context.get('report_type', 'monthly')
self.log_info(f"Generating {report_type} report")
# Simulate report generation
await asyncio.sleep(2)
return {
'success': True,
'report_type': report_type,
'file_path': f'/reports/{report_type}_report.pdf'
}
Order Processing Workflow
from django_agents.base import WorkflowAgent
from typing import Any, Dict
class OrderProcessingWorkflow(WorkflowAgent):
"""Process orders through multiple steps."""
name = "order_processing"
description = "Complete order processing workflow"
version = "1.0.0"
def __init__(self):
super().__init__()
# Add workflow steps
self.add_step(ValidateOrderAgent())
self.add_step(ProcessPaymentAgent())
self.add_step(UpdateInventoryAgent())
self.add_step(SendConfirmationAgent())
# Usage
workflow = OrderProcessingWorkflow()
result = workflow.run({
'order_id': 12345,
'items': [...],
'customer': {...}
})
REST API Integration Agent
import requests
from django_agents.base import BaseAgent
from typing import Any, Dict
class WeatherAPIAgent(BaseAgent):
"""Fetch weather data from external API."""
name = "weather_api"
description = "Fetches weather information"
version = "1.0.0"
def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Fetch weather data."""
city = context.get('city', 'London')
api_key = context.get('api_key')
self.log_info(f"Fetching weather for {city}")
try:
response = requests.get(
f'https://api.weather.com/v1/weather',
params={'city': city, 'key': api_key},
timeout=10
)
response.raise_for_status()
return {
'success': True,
'data': response.json()
}
except requests.RequestException as e:
self.log_error(f"API request failed: {str(e)}")
return {
'success': False,
'error': str(e)
}
Data Processing Agent
import pandas as pd
from django_agents.base import BaseAgent
from typing import Any, Dict
class CSVProcessorAgent(BaseAgent):
"""Process CSV files."""
name = "csv_processor"
description = "Processes and transforms CSV data"
version = "1.0.0"
def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Process CSV file."""
file_path = context.get('file_path')
self.log_info(f"Processing CSV: {file_path}")
try:
# Read CSV
df = pd.read_csv(file_path)
# Process data
df = self.clean_data(df)
df = self.transform_data(df)
# Save results
output_path = file_path.replace('.csv', '_processed.csv')
df.to_csv(output_path, index=False)
return {
'success': True,
'rows_processed': len(df),
'output_path': output_path
}
except Exception as e:
self.log_error(f"Processing failed: {str(e)}")
return {
'success': False,
'error': str(e)
}
def clean_data(self, df):
"""Clean data."""
df = df.dropna()
df = df.drop_duplicates()
return df
def transform_data(self, df):
"""Transform data."""
# Apply transformations
return df
Database Migration Agent
from django.db import transaction
from django_agents.base import BaseAgent
from typing import Any, Dict
from myapp.models import OldModel, NewModel
class DataMigrationAgent(BaseAgent):
"""Migrate data between models."""
name = "data_migration"
description = "Migrates data from old to new schema"
version = "1.0.0"
@transaction.atomic
def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Migrate data."""
batch_size = context.get('batch_size', 1000)
migrated_count = 0
self.log_info("Starting data migration")
old_records = OldModel.objects.all()
total = old_records.count()
for i in range(0, total, batch_size):
batch = old_records[i:i + batch_size]
for record in batch:
NewModel.objects.create(
field1=record.old_field1,
field2=record.old_field2,
# ... map fields
)
migrated_count += 1
self.log_info(f"Migrated {migrated_count}/{total} records")
return {
'success': True,
'migrated_count': migrated_count,
'total': total
}
Scheduled Task Agent
from django_agents.base import BaseAgent
from typing import Any, Dict
from datetime import datetime, timedelta
class CleanupAgent(BaseAgent):
"""Clean up old records."""
name = "cleanup_agent"
description = "Removes expired records"
version = "1.0.0"
def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Clean up old records."""
days_old = context.get('days_old', 30)
cutoff_date = datetime.now() - timedelta(days=days_old)
self.log_info(f"Cleaning records older than {cutoff_date}")
from myapp.models import TempRecord
deleted = TempRecord.objects.filter(
created_at__lt=cutoff_date
).delete()
return {
'success': True,
'deleted_count': deleted[0],
'cutoff_date': cutoff_date.isoformat()
}
Using Agents in Django Views
from django.http import JsonResponse
from django.views import View
from django_agents import get_registry
class ProcessOrderView(View):
"""Process order using agent."""
def post(self, request):
# Get agent from registry
registry = get_registry()
agent = registry.get_agent('order_processing')
# Execute agent
result = agent.run({
'order_id': request.POST.get('order_id'),
'items': request.POST.get('items'),
})
return JsonResponse(result)
Using Agents in Celery Tasks
from celery import shared_task
from django_agents import get_registry
@shared_task
def process_report(report_id):
"""Process report using agent."""
registry = get_registry()
agent = registry.get_agent('report_generator')
result = agent.run({
'report_id': report_id
})
return result
Testing Agents
from django.test import TestCase
from myapp.agents import EmailNotificationAgent
class EmailNotificationAgentTest(TestCase):
"""Test email notification agent."""
def setUp(self):
self.agent = EmailNotificationAgent()
def test_agent_execution(self):
"""Test successful execution."""
context = {
'recipient': 'test@example.com',
'subject': 'Test',
'message': 'Test message'
}
result = self.agent.run(context)
self.assertTrue(result['success'])
def test_validation(self):
"""Test context validation."""
with self.assertRaises(AgentValidationError):
self.agent.run({}) # Missing required fields
def test_statistics(self):
"""Test agent statistics."""
self.agent.run({'recipient': 'test@example.com'})
stats = self.agent.get_stats()
self.assertEqual(stats['execution_count'], 1)
self.assertIsNotNone(stats['last_execution'])