Hey everyone, Alex here from agntai.net. Hope you’re all having a productive week. It’s May 12th, 2026, and I’ve been wrestling with a particular problem in agent architecture lately that I think many of you working in this space might relate to. We’re all building increasingly complex AI agents, right? Agents that need to do more than just execute a single task; they need to adapt, learn, and maintain context across multiple interactions and even across different tools. The old “prompt-in, response-out” model just doesn’t cut it anymore for anything beyond basic applications.
Specifically, I’ve been thinking a lot about how we design agents that need to manage long-running tasks, especially those that involve human feedback or external systems with non-instantaneous responses. Think about an agent designed to onboard a new employee, which might involve coordinating with HR, IT, and the manager over several days, or an agent managing a complex software deployment that hits various roadblocks requiring human intervention. The traditional approach often leads to agents that lose their way, forget previous steps, or simply fall over when an unexpected delay happens. It’s frustrating for developers and even more so for the end-users.
So, today, I want to talk about something I’ve been experimenting with: Event-Driven Architectures for Stateful AI Agents. It’s not a new concept in software engineering, by any means, but applying it deliberately to AI agent design, especially with the current capabilities of large language models (LLMs) and their ability to reason about events, feels like a really powerful direction. I believe this can significantly improve an agent’s resilience, its ability to maintain context over time, and its overall reliability when dealing with complex, multi-stage processes.
Why Traditional Agent Architectures Struggle with State and Time
Most basic AI agent frameworks, whether they’re based on LangChain, LlamaIndex, or even custom orchestrators, often treat a “run” as a relatively atomic operation. You give it a goal, it plans, executes, and hopefully, achieves the goal. If it needs to interact with a human, it often pauses, waits for input, and then continues. This works fine for short, synchronous loops. But what happens when:
- A human takes 3 hours to respond to an agent’s query?
- An external API call fails and needs to be retried after a delay?
- The agent needs to perform a series of actions over several days, like monitoring a system, reporting status, and only acting when a specific condition is met?
- Multiple humans need to provide input at different stages, possibly asynchronously?
In these scenarios, a simple synchronous loop either blocks indefinitely, times out, or, worst of all, loses its internal state and forgets what it was doing before the interruption. I’ve seen countless internal tools where an agent, after a minor hiccup, just restarts the entire process, frustrating everyone involved. My own attempt at building an agent to help manage my blog’s content pipeline – from idea generation to draft review – initially suffered from this exact issue. If I didn’t review a draft within a few hours, the agent would often lose track of where we were in the feedback loop.
The core problem is that many agent designs don’t inherently model time or external asynchronous interactions well. They’re often designed for immediate gratification, not for long-term persistence and reactive behavior.
Enter Event-Driven Architectures
An event-driven architecture (EDA) flips this script. Instead of the agent actively polling or blocking, it reacts to events. Events are immutable facts that something happened. When an event occurs, it can trigger a reaction in one or more parts of the system. For an AI agent, this means:
- The agent isn’t constantly running; it’s mostly waiting.
- Its state is explicitly managed and persisted.
- It reacts to external stimuli (events) and potentially generates new events.
- It can handle long-running, asynchronous processes much more gracefully.
The Core Components for an Event-Driven Agent
Here’s how I’ve been thinking about breaking down an event-driven agent system:
- Event Store/Bus: This is the central nervous system. It could be a simple pub/sub system (like Redis Pub/Sub, Kafka, RabbitMQ) or even a database where events are logged. All interactions, both internal and external, are recorded as events.
- Agent State Manager: This component is responsible for persisting and retrieving the agent’s current state. This isn’t just the LLM’s context window, but a structured representation of where the agent is in its overall process, what it has done, what it’s waiting for, and what its goals are. A simple relational database or a document store like MongoDB works well here.
- Event Processors/Reactors: These are the “brains” that listen for specific events and decide what to do. An LLM can be one of these processors, reasoning about the event and the current state to determine the next action.
- External System Adapters: Components that translate external actions (e.g., an email received, a database record updated, a Slack message) into internal events, and translate agent actions into external system commands.
A Practical Example: The “Content Pipeline Assistant”
Let’s revisit my blog content pipeline agent. Instead of a linear script, I reimagined it with an EDA. Here’s a simplified flow:
Initial State and Event Trigger
The process starts when I submit a new article idea. This generates an ARTICLE_IDEA_SUBMITTED event.
{
"event_type": "ARTICLE_IDEA_SUBMITTED",
"timestamp": "2026-05-12T10:00:00Z",
"agent_id": "content_agent_001",
"payload": {
"idea_id": "a_123",
"title": "Event-Driven AI Agents",
"keywords": ["AI agent", "architecture", "stateful", "event-driven"],
"author_id": "alex_petrov"
}
}
Agent Reaction and State Update
An event processor (which could be an LLM call orchestrated by a small Python script) listens for ARTICLE_IDEA_SUBMITTED. It retrieves the agent’s current overall state (which might be “idle” for this idea). It then reasons:
- “Okay, a new idea. I need to generate an outline.”
- It updates the agent’s state for
a_123to “OUTLINE_GENERATION_PENDING”. - It then generates a new event:
OUTLINE_GENERATION_REQUESTED.
Another processor picks up OUTLINE_GENERATION_REQUESTED. This processor’s job is to actually call the LLM to create the outline. Once the outline is generated, it emits OUTLINE_GENERATED:
{
"event_type": "OUTLINE_GENERATED",
"timestamp": "2026-05-12T10:05:00Z",
"agent_id": "content_agent_001",
"payload": {
"idea_id": "a_123",
"outline": "..." // The actual outline content
}
}
Human Interaction and Asynchronicity
Upon receiving OUTLINE_GENERATED, the agent’s state for a_123 is updated to “WAITING_FOR_OUTLINE_REVIEW”. Simultaneously, a notification event (e.g., NOTIFICATION_SENT_TO_AUTHOR) is triggered, which an adapter turns into an email or a Slack message to me.
Now, I might take an hour, a day, or even two days to review the outline. The agent isn’t blocked. It’s just waiting. Its state is persisted. Other article ideas can be processed. When I finally click a “Approve Outline” button in an internal tool, or reply to an email, that action is translated by an adapter into an OUTLINE_REVIEWED event:
{
"event_type": "OUTLINE_REVIEWED",
"timestamp": "2026-05-14T09:30:00Z",
"agent_id": "content_agent_001",
"payload": {
"idea_id": "a_123",
"review_status": "APPROVED",
"feedback": "Looks good, proceed!"
}
}
When this event hits the system, the agent (or rather, the event processor for OUTLINE_REVIEWED) wakes up. It checks the state for a_123 (“WAITING_FOR_OUTLINE_REVIEW”). Seeing that it’s approved, it updates the state to “DRAFT_GENERATION_PENDING” and emits DRAFT_GENERATION_REQUESTED. The process continues.
Benefits I’ve Seen
- Resilience: If any part of the system goes down (e.g., the LLM API is temporarily unavailable), the agent’s state is preserved. When the system comes back up, it can simply pick up from the last event it processed.
- Asynchronous Handling: Long waits for human input or external systems are no longer blockers. The agent can context switch to other tasks or simply remain dormant until the relevant event arrives.
- Auditability: The event log provides a complete, immutable history of everything the agent has done and reacted to. This is invaluable for debugging and understanding agent behavior.
- Extensibility: New event processors can be added easily without modifying existing logic. Want to add a spell-checker after draft generation? Just add a new listener for
DRAFT_GENERATEDthat emitsSPELL_CHECK_REQUESTED. - Decoupling: Different parts of the agent system (e.g., the LLM call, the database interaction, the email sender) are loosely coupled, making development and maintenance easier.
Implementing a Basic Event-Driven Agent
For a basic implementation, you don’t need a full-blown Kafka cluster. Here’s a simplified Python approach using a Redis Pub/Sub for the event bus and a simple dictionary (or a SQLite database for persistence) for state management.
1. Agent State Management
Let’s keep agent states in a dictionary for simplicity. In a real application, this would be a database.
# agent_state_manager.py
import json
import redis
class AgentStateManager:
def __init__(self, redis_host='localhost', redis_port=6379):
self.r = redis.Redis(host=redis_host, port=redis_port, db=0)
def get_state(self, agent_id, task_id):
key = f"agent:{agent_id}:task:{task_id}"
state = self.r.get(key)
return json.loads(state) if state else {"status": "NEW", "history": []}
def update_state(self, agent_id, task_id, new_state):
key = f"agent:{agent_id}:task:{task_id}"
self.r.set(key, json.dumps(new_state))
def add_event_to_history(self, agent_id, task_id, event):
state = self.get_state(agent_id, task_id)
state["history"].append(event)
self.update_state(agent_id, task_id, state)
2. Event Bus (Redis Pub/Sub)
We’ll use Redis for publishing and subscribing to events.
# event_bus.py
import redis
import json
import time
class EventBus:
def __init__(self, channel="agent_events", redis_host='localhost', redis_port=6379):
self.r = redis.Redis(host=redis_host, port=redis_port, db=0)
self.channel = channel
def publish(self, event):
message = json.dumps(event)
self.r.publish(self.channel, message)
print(f"Published event: {event['event_type']} for {event['payload'].get('task_id', 'N/A')}")
def subscribe(self, callback):
pubsub = self.r.pubsub()
pubsub.subscribe(self.channel)
print(f"Subscribed to channel: {self.channel}")
for message in pubsub.listen():
if message['type'] == 'message':
try:
event = json.loads(message['data'])
callback(event)
except json.JSONDecodeError:
print(f"Error decoding message: {message['data']}")
3. Event Processor (Simplified LLM Interaction)
This is where your agent’s logic lives. For this example, we’ll simulate an LLM call.
# agent_processor.py
import time
from event_bus import EventBus
from agent_state_manager import AgentStateManager
class AgentProcessor:
def __init__(self, agent_id):
self.agent_id = agent_id
self.event_bus = EventBus()
self.state_manager = AgentStateManager()
def _simulate_llm_call(self, prompt):
# In a real scenario, this would be an actual LLM API call
print(f"Simulating LLM call for: {prompt[:50]}...")
time.sleep(2) # Simulate API latency
if "generate outline" in prompt.lower():
return "Generated Outline: Introduction, Key Concepts, Benefits, Conclusion."
elif "generate draft" in prompt.lower():
return "Generated Draft: This is a long draft about event-driven agents..."
return "Simulated LLM response."
def process_event(self, event):
if event["agent_id"] != self.agent_id:
return # Not for this agent
task_id = event["payload"]["task_id"]
current_state = self.state_manager.get_state(self.agent_id, task_id)
print(f"Agent {self.agent_id} received event: {event['event_type']} for task {task_id}. Current state: {current_state['status']}")
self.state_manager.add_event_to_history(self.agent_id, task_id, event)
# Logic based on event type and current state
if event["event_type"] == "TASK_STARTED":
if current_state["status"] == "NEW":
current_state["status"] = "OUTLINE_PENDING"
self.state_manager.update_state(self.agent_id, task_id, current_state)
self.event_bus.publish({
"event_type": "REQUEST_OUTLINE_GEN",
"agent_id": self.agent_id,
"payload": {"task_id": task_id, "prompt": "Generate outline for the task."}
})
elif event["event_type"] == "REQUEST_OUTLINE_GEN":
if current_state["status"] == "OUTLINE_PENDING":
outline = self._simulate_llm_call(event["payload"]["prompt"])
current_state["outline"] = outline
current_state["status"] = "OUTLINE_GENERATED"
self.state_manager.update_state(self.agent_id, task_id, current_state)
self.event_bus.publish({
"event_type": "OUTLINE_READY_FOR_REVIEW",
"agent_id": self.agent_id,
"payload": {"task_id": task_id, "outline": outline}
})
elif event["event_type"] == "OUTLINE_REVIEWED":
if current_state["status"] == "OUTLINE_GENERATED" and event["payload"]["review_status"] == "APPROVED":
current_state["status"] = "DRAFT_PENDING"
self.state_manager.update_state(self.agent_id, task_id, current_state)
self.event_bus.publish({
"event_type": "REQUEST_DRAFT_GEN",
"agent_id": self.agent_id,
"payload": {"task_id": task_id, "prompt": "Generate draft based on outline: " + current_state.get("outline", "")}
})
elif event["payload"]["review_status"] == "REJECTED":
current_state["status"] = "OUTLINE_REJECTED"
self.state_manager.update_state(self.agent_id, task_id, current_state)
print(f"Task {task_id}: Outline rejected. Human feedback: {event['payload']['feedback']}")
# Optionally, publish an event to notify human or restart outline generation
elif event["event_type"] == "REQUEST_DRAFT_GEN":
if current_state["status"] == "DRAFT_PENDING":
draft = self._simulate_llm_call(event["payload"]["prompt"])
current_state["draft"] = draft
current_state["status"] = "DRAFT_GENERATED"
self.state_manager.update_state(self.agent_id, task_id, current_state)
self.event_bus.publish({
"event_type": "DRAFT_READY_FOR_FINAL_REVIEW",
"agent_id": self.agent_id,
"payload": {"task_id": task_id, "draft": draft}
})
# ... and so on for other stages
4. Running the System
You’d typically have separate processes for the event publisher (initiating tasks) and the event processor (the agent itself). Ensure you have Redis running.
# main.py
import threading
import time
from event_bus import EventBus
from agent_processor import AgentProcessor
def start_processor(agent_id):
processor = AgentProcessor(agent_id)
# The subscribe method blocks, so run it in a separate thread
processor.event_bus.subscribe(processor.process_event)
if __name__ == "__main__":
event_bus = EventBus()
# Start the agent processor in a separate thread
agent_thread = threading.Thread(target=start_processor, args=("content_agent_001",))
agent_thread.daemon = True # Allow main program to exit even if thread is running
agent_thread.start()
time.sleep(1) # Give the subscriber a moment to connect
# Simulate starting a new task
print("\n--- Initiating Task 1 ---")
event_bus.publish({
"event_type": "TASK_STARTED",
"agent_id": "content_agent_001",
"payload": {"task_id": "task_abc_001", "description": "Write article on EDA"}
})
time.sleep(10) # Let events process
# Simulate human review for Task 1
print("\n--- Simulating Human Review for Task 1 (Approved) ---")
event_bus.publish({
"event_type": "OUTLINE_REVIEWED",
"agent_id": "content_agent_001",
"payload": {"task_id": "task_abc_001", "review_status": "APPROVED", "feedback": "Good to go!"}
})
time.sleep(10) # Let events process
print("\n--- Initiating Task 2 (another task in parallel) ---")
event_bus.publish({
"event_type": "TASK_STARTED",
"agent_id": "content_agent_001",
"payload": {"task_id": "task_xyz_002", "description": "Review Q2 Marketing Plan"}
})
time.sleep(5) # Let events process for Task 2
print("\n--- Simulating Human Review for Task 2 (Rejected) ---")
event_bus.publish({
"event_type": "OUTLINE_REVIEWED",
"agent_id": "content_agent_001",
"payload": {"task_id": "task_xyz_002", "review_status": "REJECTED", "feedback": "Needs more detail on budget."}
})
time.sleep(10)
print("\n--- Simulation Complete ---")
This setup allows the agent to process tasks in parallel and handle human feedback asynchronously. You can see how task_abc_001 moves to draft generation, while task_xyz_002 gets rejected, all without blocking the main agent logic. The agent’s “memory” (its state) is external and persistent.
Actionable Takeaways
If you’re building AI agents that need to handle complex, multi-stage, or long-running processes, consider an event-driven architecture. Here’s how to start:
- Model your agent’s interactions as events: Every significant action, internal decision, or external trigger should be an event. Think about what happens, not just what the agent does.
- Decouple state from LLM context: Your agent’s long-term memory and process state should live in a persistent store, separate from the LLM’s current conversation window. The LLM gets fed relevant snippets of state and event history when it needs to make a decision.
- Choose an event bus: For prototyping, Redis Pub/Sub is great. For production, consider Kafka or RabbitMQ for more robustness, message guarantees, and scalability.
- Design clear event contracts: Define the structure and meaning of your events. This helps keep your system organized.
- Start simple: Don’t try to build a monolithic event-driven system from day one. Identify a single complex workflow in your agent that struggles with state or asynchronicity, and refactor it using an EDA.
- Embrace asynchronicity: Your agent should largely be reactive, not constantly active. It waits for events and acts upon them, making it inherently more resilient to delays and external system outages.
I’m still refining my own approach, but this shift to thinking in terms of events has dramatically improved the reliability and maintainability of my more complex agents. It’s a bit more upfront design work, but the payoff in terms of agent robustness and clarity is absolutely worth it.
Let me know your thoughts in the comments! Have you tried something similar? What challenges did you face? I’m always keen to hear about real-world experiences.
Until next time,
Alex Petrov
agntai.net
đź•’ Published:
Related Articles
- L’avantage stratĂ©gique de Deccan AI : Plus que du capital
- Quando Bilhões Fluem Rio Acima: Por Que a OpenAI Fechou a Sora Enquanto os VCs Perseguem o Próximo Unicórnio da IA
- Mon correctif de conception d’agent pour la complexitĂ© de l’IA dans le monde rĂ©el
- Architettura del Trasformatore Approfondimento: Intuizioni sull’Ingegneria ML