Salut tout le monde, Alex ici d’agntai.net ! Nous sommes en mars 2026, et j’ai passé beaucoup trop de temps ces derniers temps à réfléchir à la manière dont nous construisons des agents IA. Plus précisément, je me suis battu avec le « glue code » – les éléments qui relient toutes les sorties LLM sophistiquées, les appels d’outils et la gestion d’état. Nous avons tous vu les démos impressionnantes, non ? Des agents réalisant des choses incroyables. Mais ensuite, vous essayez d’en construire un pour un problème réel, et vous vous heurtez à un mur de rappels, de logique conditionnelle et de mises à jour d’état. On a l’impression de construire un système intelligent, mais plutôt de gérer une usine de spaghetti très complexe.
Aujourd’hui, je veux parler de quelque chose qui a gagné en traction discrètement et, franchement, qui a sauvé ma santé mentale : les Architectures Orientées Événements pour les Agents IA. Ce n’est pas un nouveau concept en ingénierie logicielle, loin de là, mais l’appliquer de manière réfléchie aux agents IA, en particulier ceux qui orchestrent plusieurs interactions LLM et outils externes, semble être une bouffée d’air frais. Oublions un moment la pensée linéaire et étape par étape. Pensons aux systèmes réactifs.
Mon Combat Personnel avec les Monolithes d’Agents
Il y a quelques mois, je travaillais sur un agent conçu pour m’aider à gérer mon flux de travail d’écriture freelance. L’idée était simple : il surveillerait ma boîte de réception pour de nouvelles demandes, rédigerait des réponses initiales, suggérerait des articles passés pertinents de ma base de connaissances, et même aiderait à programmer des appels de suivi. Cela semblait assez simple.
Mon approche initiale était assez typique : une boucle principale. Recevoir un email. Analyser l’email. Décider de l’action (rédiger, programmer, rechercher). Appeler le LLM. Traiter la sortie du LLM. Appeler un outil (API calendrier, API email, API base de connaissances). Mettre à jour l’état interne. Répéter.
Tout a bien commencé, mais à mesure que j’ai ajouté plus d’« intelligence » et plus d’outils, c’est devenu un cauchemar. Que se passerait-il si l’appel à l’API calendrier échouait ? Que se passerait-il si le LLM halluciné un contact qui n’existait pas ? Que faire si je devais faire une pause et demander une input humain pour une décision critique ? Mon script d’agent unique et monolithique s’est rapidement transformé en un labyrinthe imbriqué d’`if/else` avec des blocs `try/except` partout. Le débogage était un cauchemar. Modifier une partie brisait souvent une autre. On avait l’impression de constamment réparer des fuites dans un navire qui coule.
Je me souviens d’une nuit tardive, essayant de comprendre pourquoi mon agent continuait à rédiger des réponses pour des emails qu’il avait déjà traités. Il s’est avéré que la mise à jour de l’état pour « email traité » se produisait *après* une nouvelle exécution potentielle du LLM dans un chemin d’échec. C’était une condition de course classique dans un système qui n’était pas conçu pour gérer gracieusement des opérations asynchrones et non déterministes. C’est à ce moment-là que j’ai commencé à chercher un meilleur moyen.
Pourquoi les Agents Orientés Événements Ont du Sens
Pensez à la façon dont les humains fonctionnent. Nous ne suivons généralement pas un script strict et prédéfini pour chaque interaction. Nous réagissons aux choses. Quelqu’un pose une question – c’est un événement. Nous le traitons et répondons – c’est un autre événement. Nous recevons une nouvelle information – événement. Nous décidons d’utiliser un outil (comme ouvrir un navigateur) – événement. Notre « état » interne change constamment en fonction de ces événements.
Une architecture orientée événements (AOE) pour les agents IA reflète ce schéma d’interaction naturel. Au lieu d’un flux de contrôle rigide, les composants émettent des événements lorsque quelque chose de significatif se produit. D’autres composants (écouteurs, gestionnaires) réagissent à ces événements. Cela apporte plusieurs avantages clés :
- Modularité : Les composants deviennent faiblement couplés. Un exécuteur d’outils n’a pas besoin de savoir qui l’a appelé ou ce qui va se passer ensuite ; il émet simplement un événement comme « tool_call_succeeded » ou « tool_call_failed ».
- Flexibilité : Il est beaucoup plus facile d’ajouter de nouvelles fonctionnalités ou de modifier celles qui existent. Vous voulez un nouvel outil ? Il suffit d’ajouter un gestionnaire qui écoute un événement d’intention spécifique. Vous devez enregistrer chaque appel LLM ? Ajoutez un enregistreur qui écoute « llm_response_received ».
- Résilience : Si un composant échoue, il est moins probable qu’il fasse tomber tout le système. Un événement peut être réessayé, ou un gestionnaire alternatif peut le reprendre. Vous pouvez intégrer des files d’attente d’erreurs pour les événements qui ne peuvent pas être traités.
- Concurrence : De nombreux événements peuvent être traités en parallèle, soit par des gestionnaires différents, soit par le même gestionnaire sur différentes instances d’événements. Cela est crucial pour les agents qui doivent gérer plusieurs tâches en cours.
- Observabilité : Le flux d’événements fournit un journal clair et auditable de tout ce que fait l’agent. Vous pouvez facilement suivre le flux d’informations et de décisions.
L’Idée Centrale : Événements, Dispatchers et Gestionnaires
Au cœur d’une AOE, il vous faut trois choses :
- Événements : Structures de données simples qui décrivent quelque chose qui s’est produit (par exemple, `ToolCalled`, `LLMResponseReceived`, `UserQueryReceived`).
- Un Dispatchers d’Événements : Un mécanisme central qui prend un événement et le dirige vers toutes les parties intéressées.
- Gestionnaires d’Événements : Fonctions ou classes qui « écoutent » des types spécifiques d’événements et exécutent une logique lorsqu’elles en reçoivent un.
Examinons un exemple simplifié. Imaginez notre agent de flux d’écriture. Au lieu d’une fonction géante, nous avons :
- Un événement `UserQueryReceived` (lorsqu’un nouvel email arrive).
- Un événement `LLMInputGenerated` (lorsque nous avons créé un prompt pour le LLM).
- Un événement `LLMResponseReceived` (lorsque le LLM envoie sa sortie).
- Un événement `ToolCallRequested` (lorsque le LLM suggère d’utiliser un outil).
- Un événement `ToolCallSucceeded` / `ToolCallFailed` (après une interaction avec un outil).
- Un événement `DraftResponseReady` (lorsqu’un brouillon est prêt pour révision).
Chacun de ces événements transporte des données pertinentes – le contenu de l’email, le prompt/réponse du LLM, le nom et les arguments de l’outil, etc.
Éléments de Base : Une Approche Pythonique
Vous n’avez pas besoin d’une file de messages lourde comme Kafka pour des systèmes d’agents simples (bien que, pour des agents distribués en production, vous pourriez en avoir besoin !). Pour un agent à processus unique, un simple dispatchers d’événements en mémoire fonctionne à merveille.
Étape 1 : Définissez Vos Événements
J’aime utiliser `dataclasses` pour les événements car ils sont propres et explicites.
from dataclasses import dataclass
from typing import Any, Dict, Optional
@dataclass
class AgentEvent:
"""Classe de base pour tous les événements d'agent."""
timestamp: float # Ajouter un timestamp pour l'ordre et le débogage
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class UserQueryReceived(AgentEvent):
query_id: str
content: str
source: str = "email"
@dataclass
class LLMRequestSent(AgentEvent):
query_id: str
model_name: str
prompt: str
@dataclass
class LLMResponseReceived(AgentEvent):
query_id: str
model_name: str
response_text: str
tool_calls: Optional[list[Dict[str, Any]]] = None
@dataclass
class ToolCallRequested(AgentEvent):
query_id: str
tool_name: str
tool_args: Dict[str, Any]
@dataclass
class ToolCallSucceeded(AgentEvent):
query_id: str
tool_name: str
tool_args: Dict[str, Any]
result: Any
@dataclass
class ToolCallFailed(AgentEvent):
query_id: str
tool_name: str
tool_args: Dict[str, Any]
error_message: str
@dataclass
class AgentThoughtEvent(AgentEvent):
query_id: str
thought: str
@dataclass
class FinalResponseReady(AgentEvent):
query_id: str
response_content: str
action_taken: str
Remarquez le `query_id`. C’est essentiel ! Cela nous permet de corréler les événements appartenant à la même interaction ou tâche utilisateur globale. Sans cela, votre flux d’événements devient un désordre chaotique.
Étape 2 : Créez un Dispatchers d’Événements
C’est ici que les événements sont routés. Un simple dictionnaire associant les types d’événements à des listes de gestionnaires fonctionne bien.
import time
from collections import defaultdict
from typing import Callable, Type, List, Union
class EventDispatcher:
def __init__(self):
self._handlers: defaultdict[Type[AgentEvent], List[Callable[[AgentEvent], None]]] = defaultdict(list)
def register_handler(self, event_type: Type[AgentEvent], handler: Callable[[AgentEvent], None]):
"""Enregistrer une fonction pour gérer un type d'événement spécifique."""
self._handlers[event_type].append(handler)
def dispatch(self, event: AgentEvent):
"""Envoyer un événement à tous les gestionnaires enregistrés."""
# Assurez-vous que le timestamp est défini s'il ne l'est pas déjà
if not hasattr(event, 'timestamp') or event.timestamp is None:
event.timestamp = time.time()
# Dispatch aux gestionnaires spécifiques au type d'événement
for handler in self._handlers[type(event)]:
try:
handler(event)
except Exception as e:
print(f"Erreur dans le gestionnaire {handler.__name__} pour l'événement {type(event).__name__}: {e}")
# Il pourrait être judicieux de dispatch un événement d'erreur ici pour la solidité
# Dispatch également aux gestionnaires enregistrés pour le type de base AgentEvent
# Cela permet un journalisation ou une surveillance générique
for handler in self._handlers[AgentEvent]:
try:
handler(event)
except Exception as e:
print(f"Erreur dans le gestionnaire générique {handler.__name__} pour l'événement {type(event).__name__}: {e}")
Étape 3 : Définissez Vos Gestionnaires
Chaque gestionnaire est une fonction simple qui prend un objet d’événement. Il effectue sa tâche spécifique et, crucialement, peut dispatch de nouveaux événements.
Esquissons quelques gestionnaires pour notre agent d’écriture :
# En supposant que 'dispatcher' soit une instance de EventDispatcher
# --- Gestionnaire pour la requête utilisateur initiale ---
def handle_user_query(event: UserQueryReceived):
print(f"[{event.query_id}] Requête utilisateur reçue : {event.content[:50]}...")
# Ici, nous utiliserions normalement un LLM pour décider de l'intention initiale
# Pour simplifier, supposons qu'il va toujours au LLM pour le brouillon
prompt = f"Vous êtes un assistant utile pour un écrivain freelance. Rédigez une réponse initiale et polie à la demande du client suivante, et suggérez une action de suivi (par exemple, 'schedule_call', 'search_knowledge_base'):\n\n{event.content}\n\nSortie en JSON avec les champs 'draft_response' et 'suggested_action'."
# Émet une requête pour l'envoyer au LLM
dispatcher.dispatch(LLMRequestSent(
query_id=event.query_id,
model_name="gpt-4",
prompt=prompt,
metadata={"previous_event": type(event).__name__}
))
# --- Gestionnaire pour les réponses LLM ---
def handle_llm_response(event: LLMResponseReceived):
print(f"[{event.query_id}] Réponse LLM reçue : {event.response_text[:50]}...")
# Analyser la réponse LLM (cela serait plus solide avec Pydantic)
try:
llm_output = json.loads(event.response_text)
draft = llm_output.get("draft_response")
action = llm_output.get("suggested_action")
dispatcher.dispatch(AgentThoughtEvent(
query_id=event.query_id,
thought=f"Action suggérée par LLM : {action}"
))
if draft:
dispatcher.dispatch(DraftResponseReady(
query_id=event.query_id,
response_content=draft,
action_taken="drafted_initial_response"
))
if action == "schedule_call":
# Supposer que le LLM a également fourni les détails d'appel si nécessaire
dispatcher.dispatch(ToolCallRequested(
query_id=event.query_id,
tool_name="calendar_scheduler",
tool_args={"client_email": "[email protected]", "duration": "30min"} # Placeholder
))
elif action == "search_knowledge_base":
# Supposer que le LLM a fourni la requête de recherche
dispatcher.dispatch(ToolCallRequested(
query_id=event.query_id,
tool_name="knowledge_base_search",
tool_args={"query": "articles liés sur les agents IA"} # Placeholder
))
except json.JSONDecodeError:
print(f"[{event.query_id}] Réponse LLM non valide JSON. Envoi pour révision humaine.")
dispatcher.dispatch(FinalResponseReady(
query_id=event.query_id,
response_content="Échec de l'analyse de la sortie LLM, nécessite un humain. Réponse originale du LLM : " + event.response_text,
action_taken="human_review_needed"
))
# --- Gestionnaire pour les appels d'outils ---
def handle_tool_call_request(event: ToolCallRequested):
print(f"[{event.query_id}] Demande d'appel d'outil : {event.tool_name} avec args {event.tool_args}")
# Simuler l'exécution de l'outil
if event.tool_name == "calendar_scheduler":
# Dans un système réel, cela appellerait une API réelle
print(f"Planification de l'appel pour {event.tool_args.get('client_email')}...")
time.sleep(1) # Simuler un délai réseau
if random.random() > 0.1: # Taux de succès de 90%
dispatcher.dispatch(ToolCallSucceeded(
query_id=event.query_id,
tool_name=event.tool_name,
tool_args=event.tool_args,
result={"status": "scheduled", "meeting_link": "https://meet.google.com/abc-xyz"}
))
else:
dispatcher.dispatch(ToolCallFailed(
query_id=event.query_id,
tool_name=event.tool_name,
tool_args=event.tool_args,
error_message="Erreur API Calendrier ou occupé"
))
# ... autres outils ...
# --- Gestionnaire générique de journalisation ---
def log_all_events(event: AgentEvent):
print(f"LOG : {type(event).__name__} - {event.query_id} - {event.timestamp}")
# --- Inscription des gestionnaires ---
dispatcher = EventDispatcher()
dispatcher.register_handler(UserQueryReceived, handle_user_query)
dispatcher.register_handler(LLMResponseReceived, handle_llm_response)
dispatcher.register_handler(ToolCallRequested, handle_tool_call_request)
# ... autres gestionnaires pour ToolCallSucceeded, ToolCallFailed, etc.
dispatcher.register_handler(AgentEvent, log_all_events) # Gestionnaire générique pour tous les événements
Ceci est un exemple très simplifié, mais vous pouvez voir comment chaque pièce est indépendante. Le `handle_user_query` ne sait pas *comment* la requête LLM sera envoyée, juste qu’il doit émettre un événement `LLMRequestSent`. De même, le `handle_llm_response` ne se soucie pas de qui a envoyé le prompt original ; il traite simplement la réponse et décide de la suite à donner.
Simulation des appels LLM et d’outils
Pour un système réel, `LLMRequestSent` déclencherait un composant qui appelle effectivement l’API LLM, puis émet `LLMResponseReceived` lorsque le résultat revient. C’est là que `asyncio` ou un simple pool de threads peut être utile pour des appels LLM ou des exécutions d’outils simultanés sans bloquer la boucle d’événements.
import asyncio
import json
import random
import time
# ... (Définitions d'événements et EventDispatcher ci-dessus) ...
# Mock de l'API LLM
async def mock_llm_call(prompt: str) -> str:
print(f" [Mock LLM] Traitement du prompt : {prompt[:80]}...")
await asyncio.sleep(random.uniform(1.0, 3.0)) # Simuler la latence du LLM
# Logique de mock très basique pour notre cas d'utilisation
if "schedule_call" in prompt:
return json.dumps({
"draft_response": "Merci pour votre demande ! J'aimerais discuter davantage. Que diriez-vous de planifier un rapide appel la semaine prochaine ?",
"suggested_action": "schedule_call"
})
elif "search_knowledge_base" in prompt:
return json.dumps({
"draft_response": "Excellente question ! J'ai rédigé une réponse et j'ai également recherché quelques articles pertinents.",
"suggested_action": "search_knowledge_base"
})
else:
return json.dumps({
"draft_response": "Merci de nous avoir contactés ! J'ai examiné votre demande et rédigé une réponse initiale.",
"suggested_action": "none"
})
# Composant Agent LLM (écoute LLMRequestSent, émet LLMResponseReceived)
async def llm_agent_component(event: LLMRequestSent, dispatcher: EventDispatcher):
response_text = await mock_llm_call(event.prompt)
# Dans un système réel, vous analyseriez les appels d'outils de la réponse LLM
tool_calls = [] # Placeholder
dispatcher.dispatch(LLMResponseReceived(
query_id=event.query_id,
model_name=event.model_name,
response_text=response_text,
tool_calls=tool_calls,
metadata={"original_prompt_event": event.timestamp}
))
# Inscription du gestionnaire asynchrone
dispatcher.register_handler(LLMRequestSent, lambda e: asyncio.create_task(llm_agent_component(e, dispatcher)))
# ... (autres gestionnaires ci-dessus) ...
# Pour exécuter un exemple :
async def main():
query_id = "user_email_123"
dispatcher.dispatch(UserQueryReceived(
query_id=query_id,
content="J'ai besoin d'un article sur les agents IA orientés événements et d'un appel de suivi.",
timestamp=time.time()
))
# Donnez un peu de temps pour traiter les événements
await asyncio.sleep(10)
print("\n--- Traitement terminé pour user_email_123 ---\n")
query_id_2 = "user_email_456"
dispatcher.dispatch(UserQueryReceived(
query_id=query_id_2,
content="Peux-tu résumer mes articles passés sur les architectures d'apprentissage profond ?",
timestamp=time.time()
))
await asyncio.sleep(10)
print("\n--- Traitement terminé pour user_email_456 ---\n")
if __name__ == "__main__":
asyncio.run(main())
J’ai introduit `asyncio.create_task` pour permettre au `llm_agent_component` de s’exécuter simultanément avec d’autres gestionnaires ou dispatches ultérieurs. C’est ici que les architectures orientées événements vraiment brillent pour la performance et la réactivité dans les agents IA.
Prise de conscience actionnable pour votre prochain projet d’agent
- Commencez simple, pensez événements : Même pour un petit agent, esquissez les événements clés qui se produisent. Qu’est-ce qui déclenche quoi ? Quelle information doit être transmise ?
- Définissez des schémas d’événements clairs : Utilisez des `dataclasses` ou des modèles Pydantic pour vos événements. Cela garantit la cohérence et facilite le débogage. Incluez toujours un `query_id` ou `correlation_id`.
- Séparez les préoccupations : Chaque gestionnaire doit faire une seule chose bien. Ne tentez pas de condenser trop de logique dans un seul gestionnaire. Si un gestionnaire doit faire un appel externe, il doit émettre un événement de requête et attendre l’événement de réponse correspondant.
- Adoptez l’asynchronicité : Les interactions des agents IA (appels LLM, exécution d’outils) sont intrinsèquement asynchrones. Utilisez `asyncio` ou un cadre similaire pour gérer ces actions de manière simultanée sans bloquer votre boucle d’événements.
- Intégrez l’observabilité : Un journal d’événements générique (comme mon `log_all_events`) est incroyablement précieux. Vous pouvez facilement transmettre ces événements à un système de surveillance ou simplement les imprimer pour le développement. Ce flux d’événements devient le journal du « processus de pensée » interne de votre agent.
- Gestion des erreurs avec des événements : Au lieu de `try/except` profondément imbriqués, émettez des événements `ErrorEvent` ou `ToolCallFailed`. D’autres gestionnaires peuvent alors écouter spécifiquement ces événements pour mettre en œuvre une logique de réessai, des solutions de secours ou des demandes d’intervention humaine.
Passer à un modèle orienté événements a complètement changé ma façon de penser à la construction d’agents. Cela m’a éloigné de l’anticipation de chaque chemin possible dans un flux linéaire et m’a amené à construire un système qui réagit intelligemment à son environnement et à ses propres opérations internes. C’est une façon plus résiliente, évolutive et franchement, plus agréable de créer des agents IA complexes.
Essayez-le pour votre prochain projet d’agent. Vous pourriez vous retrouver à dénouer ce code spaghetti plus vite que vous ne le pensez !
🕒 Published: