feat: migrate task queues to nats jetstream and add nats broker integration#244
Open
feat: migrate task queues to nats jetstream and add nats broker integration#244
Conversation
**Added:** - Introduced NATS JetStream as the primary broker for all task, tool, and result queues across orchestrator and worker agents - Implemented new `ares-core/src/nats.rs` with subject taxonomy, stream definitions, and broker abstraction for JetStream - Added Ansible `nats` role to provision, configure, and manage NATS JetStream server (with docs and systemd integration) - Updated EC2 setup scripts to install, configure, and manage `nats-server` alongside Redis - Added NATS connection URLs to environment, templates, and container docs - Documented NATS deployment, configuration, and usage in infra and agent docs **Changed:** - All core task, tool, and blue-team queues now use NATS JetStream subjects, replacing Redis List/BRPOP patterns for durable work queues - Orchestrator, workers, and blue agents now require and connect to both Redis (state) and NATS (queues) - Tool dispatch and result collection now use NATS request/reply with inbox subjects, removing the need for dedicated TCP connections for blocking calls - Blue investigation queue and results moved to NATS JetStream subjects - Task status, heartbeats, operation locks, and persistent state remain on Redis - Updated orchestrator, worker, and tool-executor modules to poll and publish via JetStream consumers/producers with explicit acks and bounded redelivery - Refactored orchestrator config, state, and queue code to thread NATS broker handles throughout and ensure streams on startup - Updated all container and agent documentation to mention NATS as required infra - Updated Ansible playbooks and role templates to deploy NATS and wire up environment variables for all agents - Updated diagrams, markdown, and infrastructure docs to show NATS as the broker - Updated Cargo manifests to include `async-nats`, `futures`, and `bytes` dependencies in all crates **Removed:** - Redis-backed work queue code paths, including BRPOP/LPUSH for tasks and tools - Obsolete Redis-only queue length and result-polling implementations - Legacy Redis-only tool dispatcher and result handler logic - All Redis pubsub notification usage for state updates (now NATS core pub) - Unused Redis key prefix constants and result queue definitions in code and docs
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #244 +/- ##
==========================================
+ Coverage 75.10% 75.69% +0.59%
==========================================
Files 383 384 +1
Lines 81465 83028 +1563
==========================================
+ Hits 61187 62852 +1665
+ Misses 20278 20176 -102 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…er, and blue queue **Added:** - Unit tests for credential extraction logic in tool dispatcher, including cases for various tools, username/domain presence, and field aliases - Tests for traceparent and operation_id serialization in ToolExecRequest - Tests for ToolExecResponse discovery field handling and default behaviors - Extensive integration tests for push_realtime_discoveries, covering host, credential, hash, vulnerability, share, user, trust, and various error cases - Unit tests for AuthThrottle covering limits, credential separation, and window expiry logic - Tests for set_task_status in result_handler, including overwriting, merging, and handling non-object extras - Unit tests for BlueTaskQueueCore covering serialization, heartbeat, active investigation, alert/model retrieval, and error handling for missing NATS **Changed:** - push_realtime_discoveries and set_task_status made generic over Redis connection type to support mock connections in tests - BlueTaskQueue refactored to use a generic BlueTaskQueueCore with production and test implementations, enabling better unit testability - Imports updated to include ConnectionLike and support new generic types
**Added:** - Added extensive unit tests for `task_queue` covering result checks, batch error handling, lock extension, status management, and serialization behaviors - Added tests for `redis_dispatcher.rs` helper functions, including dispatch error and timeout result formatting, and subject/stream configuration - Added tests for `is_transient_broker_error` logic, task status TTL, message priority overrides, and task result serialization in `task_loop` - Added tests for `tool_executor` helpers: unavailable tool responses, error classification, and discoveries serialization logic - Added tests for NATS subject and stream formatting, retention, and uniqueness in the core NATS module, including environment variable fallback handling **Changed:** - Refactored tool dispatcher to use helper functions for error and timeout result construction, ensuring consistent formatting and easier testability - Replaced inline connection error detection in worker task loop with `is_transient_broker_error` helper for improved maintainability - Refactored tool executor to use helper functions for unavailable tool responses and discoveries serialization, improving clarity and test coverage **Removed:** - Removed inline duplicate logic for error result and discoveries handling in tool dispatcher and tool executor, consolidating into reusable functions
…or testability **Added:** - Introduced helper functions in `task_queue.rs` to build task messages, select task subjects based on priority, and determine final status strings, allowing unit testing of wire message shapes and subject routing logic - Added public functions in `blue_task_queue.rs` to serialize/deserialize task and result messages, enabling easier unit testing without a broker - Implemented builder functions in `redis_dispatcher.rs` for call IDs, tool exec requests, and tool result conversions for improved testability - Added free functions in `result_handler.rs` for building task results from agent outcomes, supporting test coverage of branching logic - Provided construction helpers in `tool_executor.rs` for tool exit errors and response objects, allowing isolated unit tests of response shape logic - Added comprehensive unit tests for all new helper functions and message builders in affected modules **Changed:** - Refactored `TaskQueueCore::submit_task` and related logic to use extracted helper functions for message building and subject selection, improving clarity and testability - Updated `RedisToolDispatcher` to use new builder functions for call IDs and tool exec requests, reducing duplication and improving unit test coverage - Changed `process_task` in `result_handler.rs` to delegate result building and status computation to an extracted function, simplifying main logic - Refactored tool execution response construction in `tool_executor.rs` to use dedicated builder functions, clarifying error and success handling - Modified `BlueTaskQueueCore` methods to use new serialization/deserialization helpers, increasing code clarity and maintainability **Removed:** - Eliminated inlined message construction, subject routing, and status logic from main queue, dispatcher, and worker flows in favor of extracted functions - Removed duplicate code for serializing/deserializing messages within queue implementations, consolidating in free functions for testability
…ility **Added:** - Introduced `build_running_status_extra` and `build_final_status_extra` helper functions to encapsulate construction of status "extra_fields" payloads and ensure field consistency between producer and consumer - Added `busy_current_task` function to standardize formatting of `WorkerStatus.current_task` field - Added `count_discovery_entries` function to count non-empty discovery arrays per type, supporting clearer and unit-testable discovery reporting logic - Implemented comprehensive unit tests for new helper functions to verify payload structure, metadata consistency, and edge case handling **Changed:** - Refactored `process_task` in `result_handler.rs` to use new helper functions for status "extra_fields" payloads, improving maintainability and reducing field duplication - Updated `run_tool_exec_loop` in `tool_executor.rs` to use the new `busy_current_task` helper, enforcing consistent task status formatting - Modified discovery trace emission to use `count_discovery_entries`, replacing inline logic with reusable, tested function for clarity and correctness **Removed:** - Eliminated repeated manual construction of status payload objects in favor of the new helper functions, reducing code duplication and risk of inconsistency
4811845 to
ff10d2b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Key Changes:
ares_core::natsmodule for broker/subject taxonomy and connectionAdded:
ares_core::natsmodule for all agentqueues, tool RPC, results, and investigation requests
provisioning (new Ansible role
nats)NATS_URL/ARES_NATS_URLsupport to all orchestrator, worker, and blue agentconfigs and environment files
and result channels
startup, with automatic retry and error handling
Changed:
and pull consumers (both in orchestrator and worker)
Redis BRPOP/LPUSH
heartbeats, task status, and deferred throttling state
and container templates to require and use NATS
without explicit policy or override, adds safety buffer to prevent AD lockouts
NATS_URL in environment or config
Removed: