Hey everyone, Alex here from agntai.net! It’s March 2026, and I’ve been spending way too much time lately thinking about how we build AI agents. Specifically, I’ve been wrestling with the “glue code” – the stuff that connects all the fancy LLM outputs, tool calls, and state management. We’ve all seen the impressive demos, right? Agents doing amazing things. But then you try to build one for a real-world problem, and you hit a wall of callbacks, conditional logic, and state updates. It feels less like building an intelligent system and more like managing a very complex spaghetti factory.
So, today, I want to talk about something that’s been quietly gaining traction and, frankly, saving my sanity: Event-Driven Architectures for AI Agents. It’s not a new concept in software engineering, by any stretch, but applying it thoughtfully to AI agents, especially those orchestrating multiple LLM interactions and external tools, feels like a breath of fresh air. Forget the linear, step-by-step thinking for a moment. Let’s think about reactive systems.
My Personal Struggle with Agent Monoliths
A few months ago, I was working on an agent designed to help me manage my freelance writing pipeline. The idea was simple: it would monitor my inbox for new inquiries, draft initial responses, suggest relevant past articles from my knowledge base, and even help schedule follow-up calls. Seemed straightforward enough.
My initial approach was pretty typical: a main loop. Get email. Parse email. Decide action (draft, schedule, search). Call LLM. Process LLM output. Call tool (calendar API, email API, knowledge base API). Update internal state. Repeat.
It started okay, but as I added more “intelligence” and more tools, it became a nightmare. What if the calendar API call failed? What if the LLM hallucinated a contact that didn’t exist? What if I needed to pause and ask for human input for a critical decision? My single, monolithic agent script quickly turned into a nested `if/else` labyrinth with `try/except` blocks everywhere. Debugging was a nightmare. Modifying one part often broke another. It felt like I was constantly patching leaks in a sinking ship.
I remember one late night, trying to figure out why my agent kept drafting responses for emails it had already processed. Turned out, the state update for “email processed” was happening *after* a potential LLM re-run in a failure path. It was a classic race condition in a system that wasn’t designed to handle asynchronous, non-deterministic operations gracefully. That’s when I started looking for a better way.
Why Event-Driven Agents Make Sense
Think about how humans work. We don’t usually follow a strict, predefined script for every interaction. We react to things. Someone asks a question – that’s an event. We process it and respond – that’s another event. We get a new piece of information – event. We decide to use a tool (like opening a browser) – event. Our internal “state” changes constantly based on these events.
An event-driven architecture (EDA) for AI agents mirrors this natural interaction pattern. Instead of a rigid control flow, components emit events when something significant happens. Other components (listeners, handlers) react to these events. This brings several key benefits:
- Modularity: Components become loosely coupled. A tool executor doesn’t need to know who called it or what will happen next; it just emits an event like “tool_call_succeeded” or “tool_call_failed.”
- Flexibility: It’s much easier to add new features or modify existing ones. Want a new tool? Just add a handler that listens for a specific intent event. Need to log every LLM call? Add a logger that listens for “llm_response_received.”
- Resilience: If one component fails, it’s less likely to bring down the whole system. An event can be retried, or an alternative handler can pick it up. You can build in dead-letter queues for events that can’t be processed.
- Concurrency: Many events can be processed in parallel, either by different handlers or by the same handler on different event instances. This is crucial for agents that need to manage multiple ongoing tasks.
- Observability: The stream of events provides a clear, auditable log of everything the agent is doing. You can easily trace the flow of information and decisions.
The Core Idea: Events, Dispatchers, and Handlers
At its heart, an EDA needs three things:
- Events: Simple data structures that describe something that happened (e.g., `ToolCalled`, `LLMResponseReceived`, `UserQueryReceived`).
- An Event Dispatcher: A central mechanism that takes an event and routes it to all interested parties.
- Event Handlers: Functions or classes that “listen” for specific types of events and execute some logic when they receive one.
Let’s look at a simplified example. Imagine our writing pipeline agent. Instead of a giant function, we have:
- A `UserQueryReceived` event (when a new email comes in).
- A `LLMInputGenerated` event (when we’ve crafted a prompt for the LLM).
- A `LLMResponseReceived` event (when the LLM sends back its output).
- A `ToolCallRequested` event (when the LLM suggests using a tool).
- A `ToolCallSucceeded` / `ToolCallFailed` event (after a tool interaction).
- A `DraftResponseReady` event (when a draft is ready for review).
Each of these events carries relevant data – the email content, the LLM prompt/response, tool name and arguments, etc.
Building Blocks: A Pythonic Approach
You don’t need a heavy-duty message queue like Kafka for simple agent systems (though for production, distributed agents, you definitely might!). For a single-process agent, a simple in-memory event dispatcher works wonders.
Step 1: Define Your Events
I like using `dataclasses` for events because they’re clean and explicit.
from dataclasses import dataclass
from typing import Any, Dict, Optional
@dataclass
class AgentEvent:
"""Base class for all agent events."""
timestamp: float # Add a timestamp for ordering and debugging
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class UserQueryReceived(AgentEvent):
query_id: str
content: str
source: str = "email"
@dataclass
class LLMRequestSent(AgentEvent):
query_id: str
model_name: str
prompt: str
@dataclass
class LLMResponseReceived(AgentEvent):
query_id: str
model_name: str
response_text: str
tool_calls: Optional[list[Dict[str, Any]]] = None
@dataclass
class ToolCallRequested(AgentEvent):
query_id: str
tool_name: str
tool_args: Dict[str, Any]
@dataclass
class ToolCallSucceeded(AgentEvent):
query_id: str
tool_name: str
tool_args: Dict[str, Any]
result: Any
@dataclass
class ToolCallFailed(AgentEvent):
query_id: str
tool_name: str
tool_args: Dict[str, Any]
error_message: str
@dataclass
class AgentThoughtEvent(AgentEvent):
query_id: str
thought: str
@dataclass
class FinalResponseReady(AgentEvent):
query_id: str
response_content: str
action_taken: str
Notice the `query_id`. This is critical! It allows us to correlate events belonging to the same overall user interaction or task. Without it, your event stream becomes a chaotic mess.
Step 2: Create an Event Dispatcher
This is where events get routed. A simple dictionary mapping event types to lists of handlers works well.
import time
from collections import defaultdict
from typing import Callable, Type, List, Union
class EventDispatcher:
def __init__(self):
self._handlers: defaultdict[Type[AgentEvent], List[Callable[[AgentEvent], None]]] = defaultdict(list)
def register_handler(self, event_type: Type[AgentEvent], handler: Callable[[AgentEvent], None]):
"""Register a function to handle a specific event type."""
self._handlers[event_type].append(handler)
def dispatch(self, event: AgentEvent):
"""Send an event to all registered handlers."""
# Ensure timestamp is set if not already
if not hasattr(event, 'timestamp') or event.timestamp is None:
event.timestamp = time.time()
# Dispatch to handlers specific to the event type
for handler in self._handlers[type(event)]:
try:
handler(event)
except Exception as e:
print(f"Error in handler {handler.__name__} for event {type(event).__name__}: {e}")
# Potentially dispatch an error event here for solidness
# Also dispatch to handlers registered for the base AgentEvent type
# This allows for generic logging or monitoring
for handler in self._handlers[AgentEvent]:
try:
handler(event)
except Exception as e:
print(f"Error in generic handler {handler.__name__} for event {type(event).__name__}: {e}")
Step 3: Define Your Handlers
Each handler is a simple function that takes an event object. It performs its specific task and, crucially, can dispatch new events.
Let’s sketch out some handlers for our writing agent:
# Assuming 'dispatcher' is an instance of EventDispatcher
# --- Handler for initial user query ---
def handle_user_query(event: UserQueryReceived):
print(f"[{event.query_id}] User query received: {event.content[:50]}...")
# Here, we'd typically use an LLM to decide the initial intent
# For simplicity, let's assume it always goes to LLM for drafting
prompt = f"You are a helpful assistant for a freelance writer. Draft an initial, polite response to the following client inquiry, and suggest a follow-up action (e.g., 'schedule_call', 'search_knowledge_base'):\n\n{event.content}\n\nOutput in JSON with 'draft_response' and 'suggested_action' fields."
# Dispatch an event to send to the LLM
dispatcher.dispatch(LLMRequestSent(
query_id=event.query_id,
model_name="gpt-4",
prompt=prompt,
metadata={"previous_event": type(event).__name__}
))
# --- Handler for LLM responses ---
def handle_llm_response(event: LLMResponseReceived):
print(f"[{event.query_id}] LLM response received: {event.response_text[:50]}...")
# Parse LLM response (this would be more solid with Pydantic)
try:
llm_output = json.loads(event.response_text)
draft = llm_output.get("draft_response")
action = llm_output.get("suggested_action")
dispatcher.dispatch(AgentThoughtEvent(
query_id=event.query_id,
thought=f"LLM suggested action: {action}"
))
if draft:
dispatcher.dispatch(DraftResponseReady(
query_id=event.query_id,
response_content=draft,
action_taken="drafted_initial_response"
))
if action == "schedule_call":
# Assume LLM also provided call details if needed
dispatcher.dispatch(ToolCallRequested(
query_id=event.query_id,
tool_name="calendar_scheduler",
tool_args={"client_email": "[email protected]", "duration": "30min"} # Placeholder
))
elif action == "search_knowledge_base":
# Assume LLM provided search query
dispatcher.dispatch(ToolCallRequested(
query_id=event.query_id,
tool_name="knowledge_base_search",
tool_args={"query": "related articles on AI agents"} # Placeholder
))
except json.JSONDecodeError:
print(f"[{event.query_id}] LLM response not valid JSON. Sending for human review.")
dispatcher.dispatch(FinalResponseReady(
query_id=event.query_id,
response_content="LLM output parsing failed, needs human. Original LLM response: " + event.response_text,
action_taken="human_review_needed"
))
# --- Handler for tool calls ---
def handle_tool_call_request(event: ToolCallRequested):
print(f"[{event.query_id}] Tool call requested: {event.tool_name} with args {event.tool_args}")
# Simulate tool execution
if event.tool_name == "calendar_scheduler":
# In a real system, this would call an actual API
print(f"Scheduling call for {event.tool_args.get('client_email')}...")
time.sleep(1) # Simulate network delay
if random.random() > 0.1: # 90% success rate
dispatcher.dispatch(ToolCallSucceeded(
query_id=event.query_id,
tool_name=event.tool_name,
tool_args=event.tool_args,
result={"status": "scheduled", "meeting_link": "https://meet.google.com/abc-xyz"}
))
else:
dispatcher.dispatch(ToolCallFailed(
query_id=event.query_id,
tool_name=event.tool_name,
tool_args=event.tool_args,
error_message="Calendar API error or busy"
))
# ... other tools ...
# --- Generic logger handler ---
def log_all_events(event: AgentEvent):
print(f"LOG: {type(event).__name__} - {event.query_id} - {event.timestamp}")
# --- Register handlers ---
dispatcher = EventDispatcher()
dispatcher.register_handler(UserQueryReceived, handle_user_query)
dispatcher.register_handler(LLMResponseReceived, handle_llm_response)
dispatcher.register_handler(ToolCallRequested, handle_tool_call_request)
# ... other handlers for ToolCallSucceeded, ToolCallFailed, etc.
dispatcher.register_handler(AgentEvent, log_all_events) # Generic handler for all events
This is a very simplified example, but you can see how each piece is independent. The `handle_user_query` doesn’t know *how* the LLM request will be sent, only that it needs to emit an `LLMRequestSent` event. Similarly, `handle_llm_response` doesn’t care who sent the original prompt; it just processes the response and decides what to do next.
Simulating LLM and Tool Calls
For a real system, `LLMRequestSent` would trigger a component that actually calls the LLM API, and then dispatches `LLMResponseReceived` when the result comes back. This is where `asyncio` or a simple thread pool can come in handy for concurrent LLM calls or tool executions without blocking the event loop.
import asyncio
import json
import random
import time
# ... (Event definitions and EventDispatcher from above) ...
# Mock LLM API
async def mock_llm_call(prompt: str) -> str:
print(f" [Mock LLM] Processing prompt: {prompt[:80]}...")
await asyncio.sleep(random.uniform(1.0, 3.0)) # Simulate LLM latency
# Very basic mock logic for our use case
if "schedule_call" in prompt:
return json.dumps({
"draft_response": "Thanks for your inquiry! I'd love to chat more. How about we schedule a quick call next week?",
"suggested_action": "schedule_call"
})
elif "search_knowledge_base" in prompt:
return json.dumps({
"draft_response": "Great question! I've drafted a response and also looked up some relevant articles.",
"suggested_action": "search_knowledge_base"
})
else:
return json.dumps({
"draft_response": "Thanks for reaching out! I've reviewed your request and drafted an initial response.",
"suggested_action": "none"
})
# LLM Agent component (listens for LLMRequestSent, dispatches LLMResponseReceived)
async def llm_agent_component(event: LLMRequestSent, dispatcher: EventDispatcher):
response_text = await mock_llm_call(event.prompt)
# In a real system, you'd parse for tool calls from the LLM response
tool_calls = [] # Placeholder
dispatcher.dispatch(LLMResponseReceived(
query_id=event.query_id,
model_name=event.model_name,
response_text=response_text,
tool_calls=tool_calls,
metadata={"original_prompt_event": event.timestamp}
))
# Register the async handler
dispatcher.register_handler(LLMRequestSent, lambda e: asyncio.create_task(llm_agent_component(e, dispatcher)))
# ... (other handlers from above) ...
# To run an example:
async def main():
query_id = "user_email_123"
dispatcher.dispatch(UserQueryReceived(
query_id=query_id,
content="I need an article about event-driven AI agents and a follow-up call.",
timestamp=time.time()
))
# Give some time for events to process
await asyncio.sleep(10)
print("\n--- Processing complete for user_email_123 ---\n")
query_id_2 = "user_email_456"
dispatcher.dispatch(UserQueryReceived(
query_id=query_id_2,
content="Can you summarize my past articles on deep learning architectures?",
timestamp=time.time()
))
await asyncio.sleep(10)
print("\n--- Processing complete for user_email_456 ---\n")
if __name__ == "__main__":
asyncio.run(main())
I introduced `asyncio.create_task` to allow the `llm_agent_component` to run concurrently with other handlers or subsequent dispatches. This is where event-driven architectures really shine for performance and responsiveness in AI agents.
Actionable Takeaways for Your Next Agent Project
- Start Simple, Think Events: Even for a small agent, sketch out the key events that happen. What triggers what? What information needs to be passed along?
- Define Clear Event Schemas: Use `dataclasses` or Pydantic models for your events. This ensures consistency and makes debugging easier. Always include a `query_id` or `correlation_id`.
- Separate Concerns: Each handler should do one thing well. Don’t try to cram too much logic into a single handler. If a handler needs to make an external call, it should dispatch a request event and wait for a corresponding response event.
- Embrace Asynchronicity: AI agent interactions (LLM calls, tool execution) are inherently asynchronous. Use `asyncio` or a similar framework to handle these concurrently without blocking your event loop.
- Build Observability In: A generic event logger (like my `log_all_events`) is incredibly valuable. You can easily pipe these events to a monitoring system or simply print them for development. This event stream becomes your agent’s internal “thought process” log.
- Error Handling with Events: Instead of deep nested `try/except`, dispatch `ErrorEvent` or `ToolCallFailed` events. Other handlers can then specifically listen for these to implement retry logic, fallbacks, or human intervention requests.
Moving to an event-driven model completely changed how I think about building agents. It moved me away from trying to anticipate every possible path in a linear flow and towards building a system that reacts intelligently to its environment and its own internal operations. It’s a more resilient, scalable, and frankly, more enjoyable way to build complex AI agents.
Give it a try for your next agent project. You might find yourself untangling that spaghetti code faster than you think!
🕒 Last updated: · Originally published: March 16, 2026