The application processes user prompts through a pipeline of workers, queues, and services. Each component is responsible for a single concern, following the Single Responsibility Principle.
consumer.py) – Supervises worker processes and tracks shared state. Receives raw prompts via REST API entry point and enqueues them into the prompt-queue.services/prompt_queue.py) – A Redis-backed queue that holds prompt jobs. Workers consume jobs from this queue.workers/digest_worker.py) – Primary worker that:
ThreadConversationService (Redis-backed).SessionService and triggers episodic memory generation when a topic switch is detected.ModeRouterService.FrontalCortexService:
memory-chunker-queue.workers/memory_chunker_worker.py) – Enriches individual exchanges with memory chunks:
WorldStateService.workers/episodic_memory_worker.py) – Builds episodes from a sequence of exchanges:
EpisodicStorageService.services/cognitive_drift_engine.py) – Default Mode Network service. During idle periods (all queues empty), generates spontaneous thoughts via spreading activation and LLM synthesis. Stores as drift gists that surface in frontal cortex context.services/routing_stability_regulator_service.py) – Single authority for mode router weight mutation. Runs on 24h cycle, reads pressure signals, applies bounded corrections.services/routing_reflection_service.py) – Idle-time peer review of routing decisions via strong LLM. Feeds dimensional analysis into pressure signals consumed by the regulator.WorkerBase._update_shared_state merges per-worker updates into a shared dictionary managed by the WorkerManager. This avoids global locks and keeps the worker pool lightweight.RoutingStabilityRegulator (24h cycle) mutates weights, preventing tug-of-war between independent monitors.ThreadConversationService._calculate_new_confidence uses a bounded reinforcement formula new = current + (new_confidence - current) * 0.5.| Stage | Failure Mode | Recovery |
|---|---|---|
| Classification | Embedding service down | Falls back to default topic (“general”) with low confidence |
| Mode Routing | LLM tiebreaker fails | Uses deterministic scores only (no tiebreak) |
| Generation | LLM timeout / malformed JSON | Retries once; if still fails, returns a generic “I had trouble thinking” message |
| Memory Chunker | LLM returns invalid JSON | Re-raises JSONDecodeError; exchange is stored without memory enrichment |
| Episodic Memory | Not all chunks available | Waits with backoff; proceeds with available chunks after deadline |
| Tool Worker | Tool container crash | Critic evaluates partial result; escalates to user if consequential |
| Component | Target | Notes |
|---|---|---|
| Topic Classification | <5ms | Embedding lookup, deterministic |
| Mode Routing | <5ms | Mathematical scoring, no LLM |
| LLM Tiebreaker (rare) | ~200ms | Only when top-2 modes within margin |
| Response Generation | 2–15s | Depends on model and context length |
| Memory Chunker | 1–5s | Background, non-blocking |
| Episodic Memory | 2–8s | Background, non-blocking |
When the mode router selects ACT, the digest worker enters an autonomous action loop:
ActDispatcherService routes each action to the registered innate skill handler (recall, memorize, schedule, etc.)CriticService evaluates the result via a lightweight LLM call. Safe actions (recall, memorize) get silent correction. Consequential actions (schedule, persistent_task) pause for user confirmation if confidence is lowThe /chat endpoint uses Server-Sent Events for real-time streaming:
POST /chat with {text, source}text/event-stream with X-Request-ID headerdigest_worker with the request UUIDsse:{uuid} Redis pub/sub channelstatus:processing → status:thinking → message:{response} → doneoutput:{request_id} key as fallback[Listener] → [Consumer] → [Prompt Queue] → [Digest Worker] →
|--(classification)--> [TopicConversationService] → [Conversation JSON]
|--(routing)---------> [ModeRouterService] → {mode, confidence, scores} (~5ms)
|--(generation)------> [FrontalCortexService] → mode-specific prompt → LLM
| (ACT mode → action loop → re-route → terminal)
|--(memory_chunk_job)→ [Memory Chunker Queue] → [Memory Chunker Worker] →
[Conversation JSON] (enriched)
|--(episode_job)----> [Episodic Memory Queue] → [Episodic Memory Worker] →
PostgreSQL Episodes Table
[Routing Stability Regulator] ← reads routing_decisions (24h cycle)
→ adjusts configs/generated/mode_router_config.json
[Routing Reflection Service] ← reads reflection-queue (idle-time)
→ writes routing_decisions.reflection → feeds pressure to regulator