"""
Base module for Django agents
"""
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
[docs]
class AgentException(Exception):
"""Base exception for agents"""
pass
[docs]
class AgentValidationError(AgentException):
"""Validation error for the execution context"""
pass
[docs]
class AgentExecutionError(AgentException):
"""Error raised when the agent execution fails"""
pass
[docs]
class BaseAgent(ABC):
"""
Base class for every Django agent.
All agents must inherit from this class and implement the execute() method.
Attributes:
name: Unique agent name
description: Agent description
version: Agent version
enabled: Whether the agent is enabled
"""
name: str = "base_agent"
description: str = "Base agent"
version: str = "1.0.0"
enabled: bool = True
[docs]
def __init__(self):
"""Initializes the agent"""
self.logger = logging.getLogger(f"django_agents.{self.name}")
self._execution_count = 0
self._last_execution = None
self._context = {}
[docs]
@abstractmethod
def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Executes the main agent logic
Args:
context: Dictionary containing the execution context
Returns:
Execution result as a dictionary
Raises:
AgentExecutionError: If something goes wrong during execution
"""
pass
[docs]
def validate_context(self, context: Dict[str, Any]) -> bool:
"""
Validates the provided context
Args:
context: Context to validate
Returns:
True when the context is valid
Raises:
AgentValidationError: If the context is not valid
"""
if not isinstance(context, dict):
raise AgentValidationError("Context must be a dictionary")
return True
[docs]
def pre_execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Hook executed before the main execution
Args:
context: Execution context
Returns:
Updated or original context
"""
self._context = context
self._execution_count += 1
self._last_execution = datetime.now()
self.log_info(f"Starting execution #{self._execution_count}")
return context
[docs]
def post_execute(self, result: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""
Hook executed after the main execution
Args:
result: Execution result
context: Execution context
Returns:
Updated or original result
"""
self.log_info(f"Execution #{self._execution_count} completed")
return result
[docs]
def run(self, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Main entry point to run the agent
Args:
context: Execution context
Returns:
Execution result
Raises:
AgentValidationError: If the context is invalid
AgentExecutionError: If an error occurs during execution
"""
if context is None:
context = {}
try:
# Validation
self.validate_context(context)
# Pre-execution
context = self.pre_execute(context)
# Execution
result = self.execute(context)
# Post-execution
result = self.post_execute(result, context)
return result
except AgentValidationError as e:
self.log_error(f"Validation error: {str(e)}")
raise
except Exception as e:
self.log_error(f"Execution error: {str(e)}")
raise AgentExecutionError(f"Error executing {self.name}: {str(e)}") from e
[docs]
def log_info(self, message: str):
"""Log an info message"""
self.logger.info(f"[{self.name}] {message}")
[docs]
def log_warning(self, message: str):
"""Log a warning message"""
self.logger.warning(f"[{self.name}] {message}")
[docs]
def log_error(self, message: str):
"""Log an error message"""
self.logger.error(f"[{self.name}] {message}")
[docs]
def log_debug(self, message: str):
"""Log a debug message"""
self.logger.debug(f"[{self.name}] {message}")
[docs]
def get_stats(self) -> Dict[str, Any]:
"""
Returns agent statistics
Returns:
Dictionary containing agent statistics
"""
return {
'name': self.name,
'version': self.version,
'enabled': self.enabled,
'execution_count': self._execution_count,
'last_execution': self._last_execution.isoformat() if self._last_execution else None,
}
[docs]
def reset(self):
"""Reset the agent state"""
self._execution_count = 0
self._last_execution = None
self._context = {}
self.log_info("Agent reset")
[docs]
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(name='{self.name}', version='{self.version}')>"
[docs]
def __str__(self) -> str:
return f"{self.name} v{self.version}"
[docs]
class AsyncAgent(BaseAgent):
"""
Base class for asynchronous agents.
Extends BaseAgent and adds support for asynchronous execution
through Celery or other task queues.
"""
async_enabled: bool = True
queue_name: str = "default"
priority: int = 5 # 0-10, 10 is the highest priority
[docs]
async def async_execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Asynchronous version of execute()
Args:
context: Execution context
Returns:
Execution result
"""
# By default, fall back to the synchronous version
return self.execute(context)
[docs]
def enqueue(self, context: Optional[Dict[str, Any]] = None):
"""
Queue the agent for asynchronous execution
Args:
context: Execution context
Returns:
Task ID or task reference
"""
if context is None:
context = {}
self.log_info(f"Enqueuing to queue '{self.queue_name}' with priority {self.priority}")
# Integrate with Celery or your queueing system here.
# Example with Celery (to implement):
# from django_agents.tasks import execute_agent_task
# return execute_agent_task.apply_async(
# args=[self.name, context],
# queue=self.queue_name,
# priority=self.priority
# )
# Placeholder return value for now
return f"task_{self.name}_{datetime.now().timestamp()}"
[docs]
class WorkflowAgent(BaseAgent):
"""
Agent specialized in managing workflows.
A workflow is a sequence of agents executed in a defined order.
"""
[docs]
def __init__(self):
super().__init__()
self.steps = []
self.current_step = 0
[docs]
def add_step(self, agent: BaseAgent, config: Optional[Dict[str, Any]] = None):
"""
Add a step to the workflow
Args:
agent: Agent to add
config: Optional configuration for the step
"""
if config is None:
config = {}
self.steps.append({
'agent': agent,
'config': config,
'status': 'pending'
})
self.log_info(f"Added step: {agent.name}")
[docs]
def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute the workflow
Args:
context: Initial context
Returns:
Workflow result
"""
results = []
workflow_context = context.copy()
for i, step in enumerate(self.steps):
self.current_step = i
agent = step['agent']
config = step['config']
self.log_info(f"Executing step {i+1}/{len(self.steps)}: {agent.name}")
try:
# Merge step configuration into the workflow context
step_context = {**workflow_context, **config}
# Execute the agent
result = agent.run(step_context)
# Update the step status
step['status'] = 'completed'
# Store the execution result
results.append({
'step': i,
'agent': agent.name,
'result': result,
'status': 'success'
})
# Update the context for the next step
if isinstance(result, dict):
workflow_context.update(result)
except Exception as e:
step['status'] = 'failed'
self.log_error(f"Step {i+1} failed: {str(e)}")
results.append({
'step': i,
'agent': agent.name,
'error': str(e),
'status': 'failed'
})
# Decide whether to continue or stop
if config.get('stop_on_error', True):
break
return {
'success': all(r['status'] == 'success' for r in results),
'steps': results,
'final_context': workflow_context
}