Source code for django_agents.base

"""
Base module for Django agents
"""
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, Optional

logger = logging.getLogger(__name__)


[docs] class AgentException(Exception): """Base exception for agents""" pass
[docs] class AgentValidationError(AgentException): """Validation error for the execution context""" pass
[docs] class AgentExecutionError(AgentException): """Error raised when the agent execution fails""" pass
[docs] class BaseAgent(ABC): """ Base class for every Django agent. All agents must inherit from this class and implement the execute() method. Attributes: name: Unique agent name description: Agent description version: Agent version enabled: Whether the agent is enabled """ name: str = "base_agent" description: str = "Base agent" version: str = "1.0.0" enabled: bool = True
[docs] def __init__(self): """Initializes the agent""" self.logger = logging.getLogger(f"django_agents.{self.name}") self._execution_count = 0 self._last_execution = None self._context = {}
[docs] @abstractmethod def execute(self, context: Dict[str, Any]) -> Dict[str, Any]: """ Executes the main agent logic Args: context: Dictionary containing the execution context Returns: Execution result as a dictionary Raises: AgentExecutionError: If something goes wrong during execution """ pass
[docs] def validate_context(self, context: Dict[str, Any]) -> bool: """ Validates the provided context Args: context: Context to validate Returns: True when the context is valid Raises: AgentValidationError: If the context is not valid """ if not isinstance(context, dict): raise AgentValidationError("Context must be a dictionary") return True
[docs] def pre_execute(self, context: Dict[str, Any]) -> Dict[str, Any]: """ Hook executed before the main execution Args: context: Execution context Returns: Updated or original context """ self._context = context self._execution_count += 1 self._last_execution = datetime.now() self.log_info(f"Starting execution #{self._execution_count}") return context
[docs] def post_execute(self, result: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """ Hook executed after the main execution Args: result: Execution result context: Execution context Returns: Updated or original result """ self.log_info(f"Execution #{self._execution_count} completed") return result
[docs] def run(self, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Main entry point to run the agent Args: context: Execution context Returns: Execution result Raises: AgentValidationError: If the context is invalid AgentExecutionError: If an error occurs during execution """ if context is None: context = {} try: # Validation self.validate_context(context) # Pre-execution context = self.pre_execute(context) # Execution result = self.execute(context) # Post-execution result = self.post_execute(result, context) return result except AgentValidationError as e: self.log_error(f"Validation error: {str(e)}") raise except Exception as e: self.log_error(f"Execution error: {str(e)}") raise AgentExecutionError(f"Error executing {self.name}: {str(e)}") from e
[docs] def log_info(self, message: str): """Log an info message""" self.logger.info(f"[{self.name}] {message}")
[docs] def log_warning(self, message: str): """Log a warning message""" self.logger.warning(f"[{self.name}] {message}")
[docs] def log_error(self, message: str): """Log an error message""" self.logger.error(f"[{self.name}] {message}")
[docs] def log_debug(self, message: str): """Log a debug message""" self.logger.debug(f"[{self.name}] {message}")
[docs] def get_stats(self) -> Dict[str, Any]: """ Returns agent statistics Returns: Dictionary containing agent statistics """ return { 'name': self.name, 'version': self.version, 'enabled': self.enabled, 'execution_count': self._execution_count, 'last_execution': self._last_execution.isoformat() if self._last_execution else None, }
[docs] def reset(self): """Reset the agent state""" self._execution_count = 0 self._last_execution = None self._context = {} self.log_info("Agent reset")
[docs] def __repr__(self) -> str: return f"<{self.__class__.__name__}(name='{self.name}', version='{self.version}')>"
[docs] def __str__(self) -> str: return f"{self.name} v{self.version}"
[docs] class AsyncAgent(BaseAgent): """ Base class for asynchronous agents. Extends BaseAgent and adds support for asynchronous execution through Celery or other task queues. """ async_enabled: bool = True queue_name: str = "default" priority: int = 5 # 0-10, 10 is the highest priority
[docs] async def async_execute(self, context: Dict[str, Any]) -> Dict[str, Any]: """ Asynchronous version of execute() Args: context: Execution context Returns: Execution result """ # By default, fall back to the synchronous version return self.execute(context)
[docs] def enqueue(self, context: Optional[Dict[str, Any]] = None): """ Queue the agent for asynchronous execution Args: context: Execution context Returns: Task ID or task reference """ if context is None: context = {} self.log_info(f"Enqueuing to queue '{self.queue_name}' with priority {self.priority}") # Integrate with Celery or your queueing system here. # Example with Celery (to implement): # from django_agents.tasks import execute_agent_task # return execute_agent_task.apply_async( # args=[self.name, context], # queue=self.queue_name, # priority=self.priority # ) # Placeholder return value for now return f"task_{self.name}_{datetime.now().timestamp()}"
[docs] class WorkflowAgent(BaseAgent): """ Agent specialized in managing workflows. A workflow is a sequence of agents executed in a defined order. """
[docs] def __init__(self): super().__init__() self.steps = [] self.current_step = 0
[docs] def add_step(self, agent: BaseAgent, config: Optional[Dict[str, Any]] = None): """ Add a step to the workflow Args: agent: Agent to add config: Optional configuration for the step """ if config is None: config = {} self.steps.append({ 'agent': agent, 'config': config, 'status': 'pending' }) self.log_info(f"Added step: {agent.name}")
[docs] def execute(self, context: Dict[str, Any]) -> Dict[str, Any]: """ Execute the workflow Args: context: Initial context Returns: Workflow result """ results = [] workflow_context = context.copy() for i, step in enumerate(self.steps): self.current_step = i agent = step['agent'] config = step['config'] self.log_info(f"Executing step {i+1}/{len(self.steps)}: {agent.name}") try: # Merge step configuration into the workflow context step_context = {**workflow_context, **config} # Execute the agent result = agent.run(step_context) # Update the step status step['status'] = 'completed' # Store the execution result results.append({ 'step': i, 'agent': agent.name, 'result': result, 'status': 'success' }) # Update the context for the next step if isinstance(result, dict): workflow_context.update(result) except Exception as e: step['status'] = 'failed' self.log_error(f"Step {i+1} failed: {str(e)}") results.append({ 'step': i, 'agent': agent.name, 'error': str(e), 'status': 'failed' }) # Decide whether to continue or stop if config.get('stop_on_error', True): break return { 'success': all(r['status'] == 'success' for r in results), 'steps': results, 'final_context': workflow_context }