Examples

This page contains practical examples of using django-agents.

Email Notification Agent

from django.core.mail import send_mail
from django_agents.base import BaseAgent
from typing import Any, Dict

class EmailNotificationAgent(BaseAgent):
    """Send email notifications."""

    name = "email_notification"
    description = "Sends email notifications to users"
    version = "1.0.0"

    def validate_context(self, context: Dict[str, Any]) -> bool:
        """Validate required fields."""
        super().validate_context(context)

        required = ['recipient', 'subject', 'message']
        for field in required:
            if field not in context:
                raise AgentValidationError(f"Missing required field: {field}")

        return True

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Send email."""
        send_mail(
            subject=context['subject'],
            message=context['message'],
            from_email='noreply@example.com',
            recipient_list=[context['recipient']],
        )

        return {'success': True}

Async Report Generation Agent

from django_agents.base import AsyncAgent
from typing import Any, Dict
import asyncio

class ReportGeneratorAgent(AsyncAgent):
    """Generate reports asynchronously."""

    name = "report_generator"
    description = "Generates PDF reports"
    version = "1.0.0"
    queue_name = "reports"
    priority = 7

    async def async_execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Generate report asynchronously."""
        report_type = context.get('report_type', 'monthly')

        self.log_info(f"Generating {report_type} report")

        # Simulate report generation
        await asyncio.sleep(2)

        return {
            'success': True,
            'report_type': report_type,
            'file_path': f'/reports/{report_type}_report.pdf'
        }

Order Processing Workflow

from django_agents.base import WorkflowAgent
from typing import Any, Dict

class OrderProcessingWorkflow(WorkflowAgent):
    """Process orders through multiple steps."""

    name = "order_processing"
    description = "Complete order processing workflow"
    version = "1.0.0"

    def __init__(self):
        super().__init__()

        # Add workflow steps
        self.add_step(ValidateOrderAgent())
        self.add_step(ProcessPaymentAgent())
        self.add_step(UpdateInventoryAgent())
        self.add_step(SendConfirmationAgent())

# Usage
workflow = OrderProcessingWorkflow()
result = workflow.run({
    'order_id': 12345,
    'items': [...],
    'customer': {...}
})

REST API Integration Agent

import requests
from django_agents.base import BaseAgent
from typing import Any, Dict

class WeatherAPIAgent(BaseAgent):
    """Fetch weather data from external API."""

    name = "weather_api"
    description = "Fetches weather information"
    version = "1.0.0"

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Fetch weather data."""
        city = context.get('city', 'London')
        api_key = context.get('api_key')

        self.log_info(f"Fetching weather for {city}")

        try:
            response = requests.get(
                f'https://api.weather.com/v1/weather',
                params={'city': city, 'key': api_key},
                timeout=10
            )
            response.raise_for_status()

            return {
                'success': True,
                'data': response.json()
            }
        except requests.RequestException as e:
            self.log_error(f"API request failed: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }

Data Processing Agent

import pandas as pd
from django_agents.base import BaseAgent
from typing import Any, Dict

class CSVProcessorAgent(BaseAgent):
    """Process CSV files."""

    name = "csv_processor"
    description = "Processes and transforms CSV data"
    version = "1.0.0"

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Process CSV file."""
        file_path = context.get('file_path')

        self.log_info(f"Processing CSV: {file_path}")

        try:
            # Read CSV
            df = pd.read_csv(file_path)

            # Process data
            df = self.clean_data(df)
            df = self.transform_data(df)

            # Save results
            output_path = file_path.replace('.csv', '_processed.csv')
            df.to_csv(output_path, index=False)

            return {
                'success': True,
                'rows_processed': len(df),
                'output_path': output_path
            }
        except Exception as e:
            self.log_error(f"Processing failed: {str(e)}")
            return {
                'success': False,
                'error': str(e)
            }

    def clean_data(self, df):
        """Clean data."""
        df = df.dropna()
        df = df.drop_duplicates()
        return df

    def transform_data(self, df):
        """Transform data."""
        # Apply transformations
        return df

Database Migration Agent

from django.db import transaction
from django_agents.base import BaseAgent
from typing import Any, Dict
from myapp.models import OldModel, NewModel

class DataMigrationAgent(BaseAgent):
    """Migrate data between models."""

    name = "data_migration"
    description = "Migrates data from old to new schema"
    version = "1.0.0"

    @transaction.atomic
    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Migrate data."""
        batch_size = context.get('batch_size', 1000)
        migrated_count = 0

        self.log_info("Starting data migration")

        old_records = OldModel.objects.all()
        total = old_records.count()

        for i in range(0, total, batch_size):
            batch = old_records[i:i + batch_size]

            for record in batch:
                NewModel.objects.create(
                    field1=record.old_field1,
                    field2=record.old_field2,
                    # ... map fields
                )
                migrated_count += 1

            self.log_info(f"Migrated {migrated_count}/{total} records")

        return {
            'success': True,
            'migrated_count': migrated_count,
            'total': total
        }

Scheduled Task Agent

from django_agents.base import BaseAgent
from typing import Any, Dict
from datetime import datetime, timedelta

class CleanupAgent(BaseAgent):
    """Clean up old records."""

    name = "cleanup_agent"
    description = "Removes expired records"
    version = "1.0.0"

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """Clean up old records."""
        days_old = context.get('days_old', 30)
        cutoff_date = datetime.now() - timedelta(days=days_old)

        self.log_info(f"Cleaning records older than {cutoff_date}")

        from myapp.models import TempRecord

        deleted = TempRecord.objects.filter(
            created_at__lt=cutoff_date
        ).delete()

        return {
            'success': True,
            'deleted_count': deleted[0],
            'cutoff_date': cutoff_date.isoformat()
        }

Using Agents in Django Views

from django.http import JsonResponse
from django.views import View
from django_agents import get_registry

class ProcessOrderView(View):
    """Process order using agent."""

    def post(self, request):
        # Get agent from registry
        registry = get_registry()
        agent = registry.get_agent('order_processing')

        # Execute agent
        result = agent.run({
            'order_id': request.POST.get('order_id'),
            'items': request.POST.get('items'),
        })

        return JsonResponse(result)

Using Agents in Celery Tasks

from celery import shared_task
from django_agents import get_registry

@shared_task
def process_report(report_id):
    """Process report using agent."""
    registry = get_registry()
    agent = registry.get_agent('report_generator')

    result = agent.run({
        'report_id': report_id
    })

    return result

Testing Agents

from django.test import TestCase
from myapp.agents import EmailNotificationAgent

class EmailNotificationAgentTest(TestCase):
    """Test email notification agent."""

    def setUp(self):
        self.agent = EmailNotificationAgent()

    def test_agent_execution(self):
        """Test successful execution."""
        context = {
            'recipient': 'test@example.com',
            'subject': 'Test',
            'message': 'Test message'
        }

        result = self.agent.run(context)

        self.assertTrue(result['success'])

    def test_validation(self):
        """Test context validation."""
        with self.assertRaises(AgentValidationError):
            self.agent.run({})  # Missing required fields

    def test_statistics(self):
        """Test agent statistics."""
        self.agent.run({'recipient': 'test@example.com'})

        stats = self.agent.get_stats()
        self.assertEqual(stats['execution_count'], 1)
        self.assertIsNotNone(stats['last_execution'])