From 48b02e39e0ff83c05b942244d7f188ba99e24b93 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 11:49:42 +0200 Subject: [PATCH 1/7] Minor refactor for config and admin client creation --- src/config.rs | 191 +++++++++++++++++++++++------------ src/kafka/deserialize.rs | 13 ++- src/kafka/deserialize_raw.rs | 48 ++++----- 3 files changed, 165 insertions(+), 87 deletions(-) diff --git a/src/config.rs b/src/config.rs index 5231b2d4..ea864609 100644 --- a/src/config.rs +++ b/src/config.rs @@ -89,6 +89,35 @@ impl ClusterConfig { || self.ssl_certificate_location.is_some() || self.ssl_key_location.is_some() } + + /// Apply this cluster's `bootstrap.servers` and any configured sasl/ssl + /// auth onto an rdkafka `ClientConfig`. Shared by the consumer, producer and + /// admin config builders so they all authenticate identically. + fn apply_to(&self, config: &mut ClientConfig) { + config.set("bootstrap.servers", self.address.clone()); + + if let Some(ref sasl_mechanism) = self.sasl_mechanism { + config.set("sasl.mechanism", sasl_mechanism); + } + if let Some(ref sasl_username) = self.sasl_username { + config.set("sasl.username", sasl_username); + } + if let Some(ref sasl_password) = self.sasl_password { + config.set("sasl.password", sasl_password); + } + if let Some(ref security_protocol) = self.security_protocol { + config.set("security.protocol", security_protocol); + } + if let Some(ref ssl_ca_location) = self.ssl_ca_location { + config.set("ssl.ca.location", ssl_ca_location); + } + if let Some(ref ssl_certificate_location) = self.ssl_certificate_location { + config.set("ssl.certificate.location", ssl_certificate_location); + } + if let Some(ref ssl_private_key_location) = self.ssl_key_location { + config.set("ssl.key.location", ssl_private_key_location); + } + } } #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] @@ -618,8 +647,9 @@ impl Config { let uses_new_format = !self.kafka_topics.is_empty() || !self.kafka_clusters.is_empty(); // Any explicitly-set deprecated field describing a cluster (the main - // consumed cluster or the deadletter cluster). kafka_deadletter_topic is - // NOT deprecated and is intentionally excluded. + // consumed cluster or the deadletter cluster) or the deprecated global + // raw mode. kafka_deadletter_topic is NOT deprecated and is intentionally + // excluded. let uses_legacy = self.kafka_topic.is_some() || self.kafka_cluster.is_some() || self.kafka_consumer_group.is_some() @@ -637,7 +667,10 @@ impl Config { || self.kafka_deadletter_sasl_password.is_some() || self.kafka_deadletter_ssl_ca_location.is_some() || self.kafka_deadletter_ssl_certificate_location.is_some() - || self.kafka_deadletter_ssl_key_location.is_some(); + || self.kafka_deadletter_ssl_key_location.is_some() + // Global raw mode is a deprecated legacy field; in the new format raw + // mode is configured per topic via kafka_topics..raw. + || self.raw_mode; if uses_new_format && uses_legacy { return Err(anyhow!( @@ -819,8 +852,25 @@ impl Config { })?; } - // Validate exactly one consumable topic - self.consumable_topic()?; + // Validate at least one consumable topic. + let consumable = self.consumable_topics()?; + + // Multi-topic consumption is only supported on the sqlite adapter for + // now. The postgres adapter filters claims by a single shared partition + // list, but those partition numbers aren't unique across topics, so the + // filter would mix partitions from different topics together. Note this + // filtering exists only to avoid lock contention between brokers, not for + // correctness; supporting multi-topic on postgres means reworking how we + // avoid that contention (e.g. filtering by (topic, partition) or another + // mechanism entirely). Reject the combination here, before any consumer + // spawns. + if consumable.len() > 1 && self.database_adapter == DatabaseAdapter::Postgres { + return Err(anyhow!( + "multi-topic consumption ({} consumable topics) is not supported with the \ + postgres database adapter; use the sqlite adapter or a single consumable topic", + consumable.len() + )); + } // The deadletter topic must be a declared topic so the producer can // resolve its cluster. In legacy mode it was added above; in the new @@ -840,8 +890,22 @@ impl Config { // would be published to the wrong brokers. In the legacy format the // retry topic is registered above; in the new format the user must // declare it in kafka_topics. - let (consumed_topic, _) = self.consumable_topic()?; - let retry_target = self.kafka_retry_topic.as_deref().unwrap_or(consumed_topic); + // With a single consumable topic, retries fall back to that topic when + // no retry topic is configured. With multiple consumable topics there is + // no unambiguous fallback, so a dedicated retry topic is mandatory (the + // design doc specifies a single retry topic shared across all consumed + // topics rather than per-topic retry routing). + let retry_target = match self.kafka_retry_topic.as_deref() { + Some(retry_topic) => retry_topic, + None if consumable.len() == 1 => consumable[0].0, + None => { + return Err(anyhow!( + "kafka_retry_topic is required when consuming from multiple topics ({} \ + consumable topics configured)", + consumable.len() + )); + } + }; let retry_topic_config = self.kafka_topics.get(retry_target).ok_or_else(|| { Box::new(figment::Error::from(format!( "kafka_retry_topic '{retry_target}' is not defined in kafka_topics" @@ -865,6 +929,28 @@ impl Config { Ok(()) } + /// Get all consumable (non-`produce_only`) topics and their configs, in + /// `kafka_topics` (BTreeMap) order. Returns an error only when there are no + /// consumable topics. This is the multi-topic entry point; prefer it over + /// [`Config::consumable_topic`] for anything that should support more than + /// one consumed topic. + pub fn consumable_topics(&self) -> Result, Box> { + let consumable: Vec<(&str, &TopicConfig)> = self + .kafka_topics + .iter() + .filter(|(_, cfg)| !cfg.produce_only) + .map(|(name, cfg)| (name.as_str(), cfg)) + .collect(); + + if consumable.is_empty() { + return Err(Box::new(figment::Error::from( + "no consumable topic configured (all topics have produce_only: true)".to_owned(), + ))); + } + + Ok(consumable) + } + /// Get the single consumable topic and its config. /// Returns an error if there are zero or multiple consumable topics. pub fn consumable_topic(&self) -> Result<(&str, &TopicConfig), Box> { @@ -902,13 +988,25 @@ impl Config { .ok_or_else(|| Box::new(figment::Error::from(format!("unknown cluster: {}", name)))) } - /// Convert the application Config into rdkafka::ClientConfig for consumer. - /// Uses the single consumable topic's cluster. + /// Convert the application Config into rdkafka::ClientConfig for the consumer + /// of the single consumable topic. /// Panics if config wasn't validated (call from_args, not extract directly). pub fn kafka_consumer_config(&self) -> ClientConfig { - let (_, topic_config) = self + let (topic_name, _) = self .consumable_topic() .expect("consumable_topic failed - was config validated?"); + self.kafka_consumer_config_for(topic_name) + } + + /// Convert the application Config into rdkafka::ClientConfig for the consumer + /// of a specific topic. Each consumed topic has its own consumer (own + /// `group.id` and cluster), so multi-topic spawns one consumer per topic. + /// Panics if `topic_name` isn't a declared topic (call from_args first). + pub fn kafka_consumer_config_for(&self, topic_name: &str) -> ClientConfig { + let topic_config = self + .kafka_topics + .get(topic_name) + .unwrap_or_else(|| panic!("unknown topic '{topic_name}' - was config validated?")); let cluster = self .cluster(&topic_config.cluster) .expect("cluster lookup failed - was config validated?"); @@ -925,9 +1023,9 @@ impl Config { .clone() .unwrap_or_else(|| self.kafka_auto_offset_reset.clone()); - let mut new_config = ClientConfig::new(); - let config = new_config - .set("bootstrap.servers", cluster.address.clone()) + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config .set("group.id", topic_config.consumer_group.clone()) .set("session.timeout.ms", session_timeout_ms.to_string()) .set("enable.partition.eof", "false") @@ -939,29 +1037,21 @@ impl Config { .set("auto.offset.reset", auto_offset_reset) .set("enable.auto.offset.store", "false"); - if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { - config.set("sasl.mechanism", sasl_mechanism); - } - if let Some(ref sasl_username) = cluster.sasl_username { - config.set("sasl.username", sasl_username); - } - if let Some(ref sasl_password) = cluster.sasl_password { - config.set("sasl.password", sasl_password); - } - if let Some(ref security_protocol) = cluster.security_protocol { - config.set("security.protocol", security_protocol); - } - if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { - config.set("ssl.ca.location", ssl_ca_location); - } - if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { - config.set("ssl.certificate.location", ssl_certificate_location); - } - if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { - config.set("ssl.key.location", ssl_private_key_location); - } + config + } + + /// Build an rdkafka::ClientConfig for an admin client on a named cluster. + /// Carries only `bootstrap.servers` + that cluster's auth (no consumer + /// settings), so topic creation targets the correct brokers. + /// Panics if the cluster isn't declared (call from_args first). + pub fn kafka_admin_config(&self, cluster_name: &str) -> ClientConfig { + let cluster = self + .cluster(cluster_name) + .expect("cluster lookup failed - was config validated?"); - config.clone() + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config } /// The cluster the deadletter / forwarding producer connects to: the @@ -985,33 +1075,10 @@ impl Config { pub fn kafka_producer_config(&self) -> ClientConfig { let cluster = self.kafka_producer_cluster(); - let mut new_config = ClientConfig::new(); - let config = new_config - .set("bootstrap.servers", cluster.address.clone()) - .set("message.max.bytes", format!("{}", self.max_message_size)); - if let Some(ref sasl_mechanism) = cluster.sasl_mechanism { - config.set("sasl.mechanism", sasl_mechanism); - } - if let Some(ref sasl_username) = cluster.sasl_username { - config.set("sasl.username", sasl_username); - } - if let Some(ref sasl_password) = cluster.sasl_password { - config.set("sasl.password", sasl_password); - } - if let Some(ref security_protocol) = cluster.security_protocol { - config.set("security.protocol", security_protocol); - } - if let Some(ref ssl_ca_location) = cluster.ssl_ca_location { - config.set("ssl.ca.location", ssl_ca_location); - } - if let Some(ref ssl_certificate_location) = cluster.ssl_certificate_location { - config.set("ssl.certificate.location", ssl_certificate_location); - } - if let Some(ref ssl_private_key_location) = cluster.ssl_key_location { - config.set("ssl.key.location", ssl_private_key_location); - } - - config.clone() + let mut config = ClientConfig::new(); + cluster.apply_to(&mut config); + config.set("message.max.bytes", format!("{}", self.max_message_size)); + config } } diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs index 0ab18d0e..61f484b1 100644 --- a/src/kafka/deserialize.rs +++ b/src/kafka/deserialize.rs @@ -18,10 +18,19 @@ pub struct DeserializeConfig { } impl DeserializeConfig { - pub fn from_config(config: &Config) -> Self { + /// Build the deserializer config for a single consumed topic. Raw mode is + /// taken from that topic's `kafka_topics..raw`, so each consumer + /// (legacy single-topic included) deserializes according to its own topic. + pub fn from_topic(config: &Config, topic_name: &str) -> Self { + let raw_config = config + .kafka_topics + .get(topic_name) + .and_then(|topic| topic.raw.as_ref()) + .map(|raw| RawConfig::from_topic(config, topic_name, raw)); + Self { activation_config: DeserializeActivationConfig::from_config(config), - raw_config: RawConfig::from_config(config), + raw_config, retry_topic: config.kafka_retry_topic.clone(), } } diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 89577850..34f1b505 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -10,7 +10,7 @@ use rdkafka::message::OwnedMessage; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use uuid::Uuid; -use crate::config::Config; +use crate::config::{Config, RawModeConfig}; use crate::store::activation::{Activation, ActivationStatus}; use super::deserialize_activation::bucket_from_id; @@ -29,43 +29,45 @@ pub struct RawConfig { } impl RawConfig { - pub fn from_config(config: &Config) -> Option { - if !config.raw_mode { - return None; - } - let application = config - .raw_application + /// Build the raw-mode deserializer config for a single topic from that + /// topic's [`RawModeConfig`]. The legacy global `raw_*` fields are migrated + /// into the topic's `raw` during config normalization, so this is the single + /// runtime source of truth for both legacy and multi-topic formats. + pub fn from_topic(config: &Config, topic_name: &str, raw: &RawModeConfig) -> Self { + let application = raw + .application .clone() - .expect("raw_application required when raw_mode is enabled"); + .expect("raw application required when a topic enables raw mode"); assert!( config.worker_map.contains_key(&application), - "raw_application '{}' must exist in worker_map", + "raw application '{}' must exist in worker_map", application ); + // A raw topic's messages aren't activations, so its retries must go to a + // separate (activation-encoded) retry topic, never back to itself. if let Some(ref retry_topic) = config.kafka_retry_topic { - let (main_topic, _) = config - .consumable_topic() - .expect("no consumable topic configured"); assert!( - retry_topic != main_topic, - "kafka_retry_topic cannot equal kafka_topic when raw_mode is enabled" + retry_topic != topic_name, + "kafka_retry_topic cannot equal raw topic '{topic_name}'" ); } - Some(Self { - namespace: config - .raw_namespace + Self { + namespace: raw + .namespace .clone() - .expect("raw_namespace required when raw_mode is enabled"), + .expect("raw namespace required when a topic enables raw mode"), application, - taskname: config - .raw_taskname + taskname: raw + .taskname .clone() - .expect("raw_taskname required when raw_mode is enabled"), - processing_deadline_duration: config.raw_processing_deadline_duration, - }) + .expect("raw taskname required when a topic enables raw mode"), + processing_deadline_duration: raw + .processing_deadline_duration + .expect("raw processing_deadline_duration required when a topic enables raw mode"), + } } } From a0e0351a6cc7274f43d94cdda46ab0199d203fe8 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:28:08 +0200 Subject: [PATCH 2/7] more plumbing, spawn many consumers --- src/config.rs | 135 ++++++++++++++++---------------- src/kafka/activation_batcher.rs | 25 +++--- src/kafka/admin.rs | 59 ++++++++++---- src/main.rs | 74 ++++++++--------- src/test_utils.rs | 6 +- src/upkeep.rs | 46 ++++++----- 6 files changed, 194 insertions(+), 151 deletions(-) diff --git a/src/config.rs b/src/config.rs index ea864609..aa26df2d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -890,22 +890,39 @@ impl Config { // would be published to the wrong brokers. In the legacy format the // retry topic is registered above; in the new format the user must // declare it in kafka_topics. - // With a single consumable topic, retries fall back to that topic when - // no retry topic is configured. With multiple consumable topics there is - // no unambiguous fallback, so a dedicated retry topic is mandatory (the - // design doc specifies a single retry topic shared across all consumed - // topics rather than per-topic retry routing). - let retry_target = match self.kafka_retry_topic.as_deref() { - Some(retry_topic) => retry_topic, - None if consumable.len() == 1 => consumable[0].0, - None => { + // Normalize the retry topic so downstream code can always rely on it + // being set: + // - explicitly configured: used as-is. + // - single non-raw consumed topic: retries loop back to that topic. + // - raw mode: raw messages aren't activations, so retries must go to a + // separate activation-encoded topic; a retry topic is mandatory. + // - multiple consumed topics: no unambiguous fallback, so a single + // shared retry topic is mandatory. + if self.kafka_retry_topic.is_none() { + let has_raw = consumable.iter().any(|(_, cfg)| cfg.raw.is_some()); + let count = consumable.len(); + let single_topic = (count == 1).then(|| consumable[0].0.to_owned()); + + if has_raw { return Err(anyhow!( - "kafka_retry_topic is required when consuming from multiple topics ({} \ - consumable topics configured)", - consumable.len() + "kafka_retry_topic must be set explicitly when a consumed topic uses raw mode" )); } - }; + match single_topic { + Some(topic) => self.kafka_retry_topic = Some(topic), + None => { + return Err(anyhow!( + "kafka_retry_topic is required when consuming from multiple topics ({} \ + consumable topics configured)", + count + )); + } + } + } + let retry_target = self + .kafka_retry_topic + .as_deref() + .expect("kafka_retry_topic is set above"); let retry_topic_config = self.kafka_topics.get(retry_target).ok_or_else(|| { Box::new(figment::Error::from(format!( "kafka_retry_topic '{retry_target}' is not defined in kafka_topics" @@ -931,9 +948,9 @@ impl Config { /// Get all consumable (non-`produce_only`) topics and their configs, in /// `kafka_topics` (BTreeMap) order. Returns an error only when there are no - /// consumable topics. This is the multi-topic entry point; prefer it over - /// [`Config::consumable_topic`] for anything that should support more than - /// one consumed topic. + /// consumable topics. This is the sole accessor for consumed topics; callers + /// that only handle a single topic should iterate and select explicitly + /// rather than assuming exactly one. pub fn consumable_topics(&self) -> Result, Box> { let consumable: Vec<(&str, &TopicConfig)> = self .kafka_topics @@ -951,33 +968,14 @@ impl Config { Ok(consumable) } - /// Get the single consumable topic and its config. - /// Returns an error if there are zero or multiple consumable topics. - pub fn consumable_topic(&self) -> Result<(&str, &TopicConfig), Box> { - let mut consumable = self - .kafka_topics - .iter() - .filter(|(_, cfg)| !cfg.produce_only); - - let first = consumable.next().ok_or_else(|| { - Box::new(figment::Error::from( - "no consumable topic configured (all topics have produce_only: true)".to_owned(), - )) - })?; - - if consumable.next().is_some() { - let count = self - .kafka_topics - .values() - .filter(|t| !t.produce_only) - .count(); - return Err(Box::new(figment::Error::from(format!( - "multi-topic consumption is not yet supported: {} consumable topics configured, maximum is 1", - count - )))); - } - - Ok((first.0.as_str(), first.1)) + /// The topic retries are produced to. `normalize_and_validate` always sets + /// `kafka_retry_topic` (it defaults to the single consumed topic when only + /// one non-raw topic is configured), so this is infallible after validation. + /// Panics if config wasn't validated (call from_args, not extract directly). + pub fn retry_topic(&self) -> &str { + self.kafka_retry_topic + .as_deref() + .expect("kafka_retry_topic unset - was config validated?") } /// Get cluster config by name. @@ -988,16 +986,6 @@ impl Config { .ok_or_else(|| Box::new(figment::Error::from(format!("unknown cluster: {}", name)))) } - /// Convert the application Config into rdkafka::ClientConfig for the consumer - /// of the single consumable topic. - /// Panics if config wasn't validated (call from_args, not extract directly). - pub fn kafka_consumer_config(&self) -> ClientConfig { - let (topic_name, _) = self - .consumable_topic() - .expect("consumable_topic failed - was config validated?"); - self.kafka_consumer_config_for(topic_name) - } - /// Convert the application Config into rdkafka::ClientConfig for the consumer /// of a specific topic. Each consumed topic has its own consumer (own /// `group.id` and cluster), so multi-topic spawns one consumer per topic. @@ -1269,7 +1257,7 @@ mod tests { // kafka_consumer_group is unset in the yaml, so the legacy field // stays None and normalization applies the "taskworker" default. assert_eq!(config.kafka_consumer_group, None); - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "error-tasks"); assert_eq!(topic_config.consumer_group, "taskworker"); assert_eq!(config.kafka_auto_offset_reset, "earliest".to_owned()); @@ -1331,7 +1319,7 @@ mod tests { // Zero-config: legacy fields stay None, but normalization applies // the historical "taskworker" default as the consumable topic. assert_eq!(config.kafka_topic, None); - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "taskworker"); assert_eq!( config.cluster(&topic_config.cluster).unwrap().address, @@ -1384,7 +1372,7 @@ mod tests { fn test_kafka_consumer_config() { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("bootstrap.servers").unwrap(), @@ -1404,7 +1392,7 @@ mod tests { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("security.protocol").unwrap(), @@ -1439,7 +1427,7 @@ mod tests { let args = Args { config: None }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("taskworker"); assert_eq!( consumer_config.get("ssl.ca.location").unwrap(), @@ -1585,6 +1573,7 @@ mod tests { "config.yaml", r#" kafka_deadletter_topic: profiles-dlq +kafka_retry_topic: profiles-retry kafka_topics: profiles: @@ -1652,7 +1641,7 @@ kafka_clusters: assert!(!clusters.contains_key("default")); // Test consumable_topic() and cluster() helpers - let (topic_name, topic_config) = config.consumable_topic().unwrap(); + let (topic_name, topic_config) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); assert_eq!(topic_config.cluster, "profiles-cluster"); @@ -1711,7 +1700,7 @@ kafka_clusters: assert!(!clusters.contains_key("default")); // Test consumable_topic() helper - let (topic_name, _) = config.consumable_topic().unwrap(); + let (topic_name, _) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); Ok(()) @@ -1812,12 +1801,18 @@ kafka_clusters: } #[test] - fn test_multi_topic_rejects_multiple_consumable_topics() { + fn test_multi_topic_rejected_on_postgres() { Jail::expect_with(|jail| { - // Two consumable topics - the guard for this PR rejects this. + // Multiple consumable topics are allowed on sqlite but rejected on + // postgres, whose claim filtering can't distinguish partitions + // across topics. jail.create_file( "config.yaml", r#" +database_adapter: postgres +kafka_deadletter_topic: tasks-dlq +kafka_retry_topic: tasks-retry + kafka_topics: profiles: cluster: my-cluster @@ -1825,6 +1820,14 @@ kafka_topics: subscriptions: cluster: my-cluster consumer_group: taskbroker-subscriptions + tasks-retry: + cluster: my-cluster + consumer_group: taskbroker-retry + produce_only: true + tasks-dlq: + cluster: my-cluster + consumer_group: taskbroker-dlq + produce_only: true kafka_clusters: my-cluster: @@ -1838,7 +1841,7 @@ kafka_clusters: let err = Config::from_args(&args).unwrap_err(); assert!( err.to_string() - .contains("multi-topic consumption is not yet supported"), + .contains("not supported with the postgres database adapter"), "unexpected error: {}", err ); @@ -1889,7 +1892,7 @@ kafka_clusters: assert!(topics.get("profiles-dlq").unwrap().produce_only); // consumable_topic() returns the one consumable topic - let (topic_name, _) = config.consumable_topic().unwrap(); + let (topic_name, _) = config.consumable_topics().unwrap()[0]; assert_eq!(topic_name, "profiles"); Ok(()) @@ -2043,7 +2046,7 @@ kafka_clusters: config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("profiles"); // Per-topic values win over the globals. assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "12000"); @@ -2090,7 +2093,7 @@ kafka_clusters: config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); - let consumer_config = config.kafka_consumer_config(); + let consumer_config = config.kafka_consumer_config_for("profiles"); // No per-topic overrides, so the globals are used. assert_eq!(consumer_config.get("session.timeout.ms").unwrap(), "7000"); diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index a5a1ba73..f5748f5a 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -29,11 +29,14 @@ pub struct ActivationBatcherConfig { } impl ActivationBatcherConfig { - /// Convert from application configuration into ActivationBatcher config. - pub fn from_config(config: &Config) -> Self { - let (topic_name, topic_config) = config - .consumable_topic() - .expect("no consumable topic configured"); + /// Convert from application configuration into ActivationBatcher config for a + /// single consumed topic. Each consumer has its own batcher, so the topic is + /// passed explicitly rather than derived from "the" consumable topic. + pub fn from_topic(config: &Config, topic_name: &str) -> Self { + let topic_config = config + .kafka_topics + .get(topic_name) + .unwrap_or_else(|| panic!("unknown topic '{topic_name}'")); let cluster = config .cluster(&topic_config.cluster) .expect("cluster not found"); @@ -252,7 +255,7 @@ demoted_namespaces: config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -275,7 +278,7 @@ demoted_namespaces: config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -304,7 +307,7 @@ demoted_namespaces: let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -334,7 +337,7 @@ demoted_namespaces: let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); @@ -380,11 +383,11 @@ demoted_topic: taskworker-demoted"#; config.normalize_and_validate().unwrap(); let config = Arc::new(config); let mut batcher = ActivationBatcher::new( - ActivationBatcherConfig::from_config(&config), + ActivationBatcherConfig::from_topic(&config, config.consumable_topics().unwrap()[0].0), runtime_config, ); - let (_, topic_config) = config.consumable_topic().unwrap(); + let (_, topic_config) = config.consumable_topics().unwrap()[0]; let cluster_address = config .cluster(&topic_config.cluster) .unwrap() diff --git a/src/kafka/admin.rs b/src/kafka/admin.rs index 5cbfcb51..36f11e94 100644 --- a/src/kafka/admin.rs +++ b/src/kafka/admin.rs @@ -1,31 +1,56 @@ -use anyhow::Error; +use anyhow::{Error, anyhow}; use rdkafka::ClientConfig; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use rdkafka::types::RDKafkaErrorCode; use tracing::info; +/// Create the given topics on the cluster described by `admin_config` if they +/// don't already exist. `topics` pairs each topic name with its partition count. +/// +/// `admin_config` should carry only `bootstrap.servers` + auth (see +/// [`Config::kafka_admin_config`](crate::config::Config::kafka_admin_config)) so +/// the topics are created on the right cluster. A pre-existing topic +/// (`TopicAlreadyExists`) is treated as success; any other per-topic failure is +/// surfaced so a misconfigured or unreachable cluster fails loudly at startup. pub async fn create_missing_topics( - kafka_client_config: ClientConfig, - topic: &str, - default_topic_partitions: i32, + admin_config: ClientConfig, + topics: &[(&str, i32)], ) -> Result<(), Error> { - let admin_client: AdminClient<_> = kafka_client_config + if topics.is_empty() { + return Ok(()); + } + + let admin_client: AdminClient<_> = admin_config .create() - .expect("Unable to reate rdkafka admin client"); + .map_err(|e| anyhow!("Unable to create rdkafka admin client: {e}"))?; info!( - "Creating topic {:?} with {} partitions if it does not already exists", - topic, default_topic_partitions + "Creating topics {:?} if they do not already exist", + topics ); - admin_client - .create_topics( - &vec![NewTopic::new( - topic, - default_topic_partitions, - TopicReplication::Fixed(1), - )], - &AdminOptions::new(), - ) + let new_topics: Vec = topics + .iter() + .map(|(name, partitions)| NewTopic::new(name, *partitions, TopicReplication::Fixed(1))) + .collect(); + + let results = admin_client + .create_topics(&new_topics, &AdminOptions::new()) .await?; + // `create_topics` returns one result per topic; a request-level error is + // already propagated above by `?`. Tolerate topics that already exist, but + // surface every other per-topic failure (auth denied, invalid config, ...). + for result in results { + match result { + Ok(_) => {} + Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => { + info!("Topic {:?} already exists, skipping", topic); + } + Err((topic, code)) => { + return Err(anyhow!("Failed to create topic {:?}: {}", topic, code)); + } + } + } + Ok(()) } diff --git a/src/main.rs b/src/main.rs index dd3f9939..9f938ebe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -75,25 +75,18 @@ async fn main() -> Result<(), Error> { // If this is an environment where the topics might not exist, check and create them. if config.create_missing_topics { - let kafka_client_config = config.kafka_consumer_config(); - let (main_topic, _) = config - .consumable_topic() - .map_err(|e| anyhow!("invalid config: {}", e))?; - create_missing_topics( - kafka_client_config.clone(), - main_topic, - config.default_topic_partitions, - ) - .await?; - - // Create retry topic if configured - if let Some(ref retry_topic) = config.kafka_retry_topic { - create_missing_topics( - kafka_client_config, - retry_topic, - config.default_topic_partitions, - ) - .await?; + // Group every declared topic by its cluster so each is created on the + // right brokers (main, retry, deadletter and produce-only topics, which + // may live on different clusters). + let mut topics_by_cluster: BTreeMap<&str, Vec<(&str, i32)>> = BTreeMap::new(); + for (topic_name, topic_config) in &config.kafka_topics { + topics_by_cluster + .entry(topic_config.cluster.as_str()) + .or_default() + .push((topic_name.as_str(), config.default_topic_partitions)); + } + for (cluster, topics) in topics_by_cluster { + create_missing_topics(config.kafka_admin_config(cluster), &topics).await?; } } @@ -157,25 +150,30 @@ async fn main() -> Result<(), Error> { } }); - // Consumer from kafka - let consumer_task = taskbroker::tokio::spawn({ + // Consumer(s) from kafka. Each consumed topic gets its own consumer (own + // group.id and cluster), so we spawn one consumer task per consumable topic, + // all sharing the one activation store. + let consumer_topics: Vec = config + .consumable_topics() + .expect("invalid config: no consumable topic") + .into_iter() + .map(|(name, _)| name.to_owned()) + .collect(); + + let mut consumer_tasks: Vec<(String, JoinHandle>)> = Vec::new(); + for topic in consumer_topics { let consumer_store = store.clone(); let consumer_config = config.clone(); let runtime_config_manager = runtime_config_manager.clone(); + let task_topic = topic.clone(); - // Build list of topics to consume from - let (main_topic, _) = consumer_config - .consumable_topic() - .expect("invalid config: no consumable topic"); - let topics_to_consume = [main_topic.to_owned()]; - - async move { + let handle = taskbroker::tokio::spawn(async move { // The consumer has an internal thread that listens for cancellations, so it doesn't need // an outer select here like the other tasks. - let topic_refs: Vec<&str> = topics_to_consume.iter().map(|s| s.as_str()).collect(); + let topic_refs = [task_topic.as_str()]; start_consumer( &topic_refs, - &consumer_config.kafka_consumer_config(), + &consumer_config.kafka_consumer_config_for(&task_topic), consumer_store.clone(), processing_strategy!({ err: @@ -185,11 +183,11 @@ async fn main() -> Result<(), Error> { ), map: - deserialize::new(DeserializeConfig::from_config(&consumer_config)), + deserialize::new(DeserializeConfig::from_topic(&consumer_config, &task_topic)), reduce: ActivationBatcher::new( - ActivationBatcherConfig::from_config(&consumer_config), + ActivationBatcherConfig::from_topic(&consumer_config, &task_topic), runtime_config_manager.clone() ), ActivationWriter::new( @@ -200,8 +198,9 @@ async fn main() -> Result<(), Error> { }), ) .await - } - }); + }); + consumer_tasks.push((topic, handle)); + } // Status update flush task let (status_update_tx, status_update_task) = if config.batch_status_updates { @@ -337,10 +336,13 @@ async fn main() -> Result<(), Error> { .on_sigint() .on_signal(SignalKind::hangup()) .on_signal(SignalKind::quit()) - .on_completion(log_task_completion("consumer", consumer_task)) .on_completion(log_task_completion("upkeep_task", upkeep_task)) .on_completion(log_task_completion("maintenance_task", maintenance_task)); + for (topic, handle) in consumer_tasks { + departure = departure.on_completion(log_task_completion(format!("consumer:{topic}"), handle)); + } + if let Some(task) = grpc_server_task { departure = departure.on_completion(log_task_completion("grpc_server", task)); } diff --git a/src/test_utils.rs b/src/test_utils.rs index 31455a0f..0d13c4c8 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -354,12 +354,12 @@ pub fn create_producer(config: Arc) -> Arc { /// Reset a kafka topic by destroying it and recreating it. pub async fn reset_topic(config: Arc) { + let (main_topic, _) = config.consumable_topics().expect("no consumable topic")[0]; let admin_client: AdminClient<_> = config - .kafka_consumer_config() + .kafka_consumer_config_for(main_topic) .create() .expect("Could not create admin client"); - let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); let options = AdminOptions::default(); admin_client .delete_topics(&[main_topic, &config.kafka_deadletter_topic], &options) @@ -385,7 +385,7 @@ pub async fn consume_topic( num_records: usize, ) -> Vec { let consumer: StreamConsumer = config - .kafka_consumer_config() + .kafka_consumer_config_for(topic) .create() .expect("could not create consumer"); consumer.subscribe(&[topic]).expect("could not subscribe"); diff --git a/src/upkeep.rs b/src/upkeep.rs index e407cd10..f4812f25 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -155,13 +155,7 @@ pub async fn do_upkeep( // 1. Handle retry tasks let handle_retries_start = Instant::now(); if let Ok(retries) = store.get_retry_activations().await { - // Use retry topic if configured, otherwise fall back to main topic - let (main_topic, _) = config.consumable_topic().expect("no consumable topic"); - let main_topic_owned = main_topic.to_owned(); - let retry_target_topic = config - .kafka_retry_topic - .as_ref() - .unwrap_or(&main_topic_owned); + let retry_target_topic = config.retry_topic().to_owned(); // 2. Append retries to kafka let deliveries = retries @@ -322,23 +316,39 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to `runtime_config.demoted_topic` let demoted_namespaces = runtime_config.demoted_namespaces.clone(); - let (main_topic, main_topic_config) = config.consumable_topic().expect("no consumable topic"); - let main_cluster = config - .cluster(&main_topic_config.cluster) - .expect("cluster not found") - .address - .clone(); + let consumable = config.consumable_topics().expect("no consumable topic"); + // Default the forward cluster to the consumed cluster when there's exactly + // one consumed topic (legacy behavior). With multiple consumed topics there + // is no single consumed cluster, so fall back to the producer (deadletter) + // cluster, whose credentials the forward producer reuses anyway. + let default_forward_cluster = if consumable.len() == 1 { + config + .cluster(&consumable[0].1.cluster) + .expect("cluster not found") + .address + .clone() + } else { + config.kafka_producer_cluster().address.clone() + }; let forward_cluster = runtime_config .demoted_topic_cluster .clone() - .unwrap_or(main_cluster.clone()); + .unwrap_or(default_forward_cluster); let forward_topic = runtime_config .demoted_topic .clone() .unwrap_or(config.kafka_long_topic.clone()); - let same_cluster = forward_cluster == main_cluster; - let same_topic = forward_topic == main_topic; - if !(demoted_namespaces.is_empty() || (same_cluster && same_topic)) { + // Forwarding to a topic+cluster this broker itself consumes would loop the + // tasks straight back in, so treat that as a no-op. For a single consumed + // topic this is exactly the old `same_topic && same_cluster` guard. + let forwards_to_consumed = consumable.iter().any(|(name, topic_config)| { + *name == forward_topic + && config + .cluster(&topic_config.cluster) + .map(|c| c.address == forward_cluster) + .unwrap_or(false) + }); + if !(demoted_namespaces.is_empty() || forwards_to_consumed) { let forward_demoted_start = Instant::now(); // The forwarding producer reuses the deadletter cluster's credentials // (see Config::kafka_producer_config) and only overrides @@ -765,7 +775,7 @@ mod tests { assert_eq!(store.count().await.unwrap(), 1); assert_eq!(result_context.retried, 1); - let (main_topic, _) = config.consumable_topic().unwrap(); + let (main_topic, _) = config.consumable_topics().unwrap()[0]; let messages = consume_topic(config.clone(), main_topic, 1).await; assert_eq!(messages.len(), 1); let activation = &messages[0]; From 5a9b77ca4bb8dec03714f99eb68defe9684d8c85 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:33:22 +0200 Subject: [PATCH 3/7] add test --- Makefile | 5 + .../integration_tests/test_multi_topic.py | 134 ++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 integration_tests/integration_tests/test_multi_topic.py diff --git a/Makefile b/Makefile index ee63db0f..f912d4d3 100644 --- a/Makefile +++ b/Makefile @@ -69,6 +69,11 @@ test-upkeep-retry: build reset-kafka ## Run the upkeep retry integration test rm -r integration_tests/.tests_output/test_upkeep_retry .PHONY: test-upkeep-retry +test-multi-topic: build reset-kafka ## Run the multi-topic consumption integration test + python -m pytest integration_tests/integration_tests/test_multi_topic.py -s + rm -r integration_tests/.tests_output/test_multi_topic +.PHONY: test-multi-topic + test-upkeep-expiry: build reset-kafka ## Run the upkeep expiry integration test python -m pytest integration_tests/integration_tests/test_upkeep_expiry.py -s rm -r integration_tests/.tests_output/test_upkeep_expiry diff --git a/integration_tests/integration_tests/test_multi_topic.py b/integration_tests/integration_tests/test_multi_topic.py new file mode 100644 index 00000000..7b0e21e8 --- /dev/null +++ b/integration_tests/integration_tests/test_multi_topic.py @@ -0,0 +1,134 @@ +import signal +import subprocess +import time + +import pytest +import yaml + +from integration_tests.helpers import ( + TASKBROKER_BIN, + TESTS_OUTPUT_ROOT, + TaskbrokerConfig, + create_topic, + get_available_ports, + get_num_tasks_in_sqlite, + send_generic_messages_to_topic, +) + +TEST_OUTPUT_PATH = TESTS_OUTPUT_ROOT / "test_multi_topic" + + +def test_multi_topic_consumption() -> None: + """ + Verify that a single taskbroker configured (new `kafka_topics` format) with + two consumable topics consumes from BOTH into its one sqlite store. + + Each consumed topic gets its own rdkafka consumer (own group.id); both + pipelines write to the shared store. We produce N messages to each of two + topics and assert the broker ends up with 2*N tasks in sqlite. + """ + num_messages_per_topic = 1_000 + num_partitions = 4 + timeout = 60 + curr_time = int(time.time()) + + topic_a = f"multitopic-a-{curr_time}" + topic_b = f"multitopic-b-{curr_time}" + retry_topic = f"multitopic-retry-{curr_time}" + dlq_topic = f"multitopic-dlq-{curr_time}" + + # Pre-create the topics so the test exercises consumption, not topic + # creation. (create_missing_topics stays off.) + for topic in (topic_a, topic_b, retry_topic, dlq_topic): + create_topic(topic, num_partitions) + + TEST_OUTPUT_PATH.mkdir(parents=True, exist_ok=True) + db_name = f"db_multi_topic_{curr_time}" + db_path = str(TEST_OUTPUT_PATH / f"{db_name}.sqlite") + config_filename = f"config_multi_topic_{curr_time}.yml" + grpc_port = get_available_ports(1)[0] + + # New multi-topic config: two consumable topics + produce-only retry/dlq, + # all on a single cluster. + config_dict = { + "db_name": db_name, + "db_path": db_path, + "max_pending_count": 100_000, + "grpc_port": grpc_port, + "kafka_auto_offset_reset": "earliest", + "kafka_deadletter_topic": dlq_topic, + "kafka_retry_topic": retry_topic, + "kafka_clusters": { + "default": {"address": "127.0.0.1:9092"}, + }, + "kafka_topics": { + topic_a: {"cluster": "default", "consumer_group": f"{topic_a}-grp"}, + topic_b: {"cluster": "default", "consumer_group": f"{topic_b}-grp"}, + retry_topic: { + "cluster": "default", + "consumer_group": f"{retry_topic}-grp", + "produce_only": True, + }, + dlq_topic: { + "cluster": "default", + "consumer_group": f"{dlq_topic}-grp", + "produce_only": True, + }, + }, + } + + config_path = str(TEST_OUTPUT_PATH / config_filename) + with open(config_path, "w") as f: + yaml.safe_dump(config_dict, f) + + # A TaskbrokerConfig instance is only needed for the sqlite-counting helper, + # which reads db_name/db_path. + query_config = TaskbrokerConfig( + db_name=db_name, + db_path=db_path, + max_pending_count=100_000, + kafka_topic=topic_a, + kafka_deadletter_topic=dlq_topic, + kafka_consumer_group=f"{topic_a}-grp", + kafka_auto_offset_reset="earliest", + grpc_port=grpc_port, + ) + + log_path = str(TEST_OUTPUT_PATH / f"taskbroker_multi_topic_{curr_time}.log") + expected_total = num_messages_per_topic * 2 + + send_generic_messages_to_topic(topic_a, num_messages_per_topic) + send_generic_messages_to_topic(topic_b, num_messages_per_topic) + + process = None + try: + with open(log_path, "a") as log_file: + process = subprocess.Popen( + [str(TASKBROKER_BIN), "-c", config_path], + stderr=subprocess.STDOUT, + stdout=log_file, + ) + time.sleep(3) # give the broker time to start both consumers + + written = 0 + end = time.time() + timeout + while time.time() < end: + written = get_num_tasks_in_sqlite(query_config) + if written >= expected_total: + break + # the broker should still be alive while consuming + assert process.poll() is None, "taskbroker exited early" + time.sleep(1) + + assert written == expected_total, ( + f"expected {expected_total} tasks in sqlite " + f"({num_messages_per_topic} from each of two topics), got {written}" + ) + finally: + if process is not None: + process.send_signal(signal.SIGINT) + try: + assert process.wait(timeout=10) == 0 + except Exception: + process.kill() + raise From 8d8975e2109e3ececedd04dc87cb85e24c85aae2 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:35:07 +0200 Subject: [PATCH 4/7] add test to ci --- .github/workflows/ci.yml | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a34650f8..ad8520bc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -463,3 +463,38 @@ jobs: - name: Run failed tasks integration test run: | make test-failed-tasks + + multi-topic-integration-test: + name: Multi-topic integration test + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2 + + - name: Install cmake + uses: lukka/get-cmake@28983e0d3955dba2bb0a6810caae0c6cf268ec0c # latest + + - uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # pin@v1 + with: + toolchain: stable + profile: minimal + override: true + + - uses: swatinem/rust-cache@81d053bdb0871dcd3f10763c8cc60d0adc41762b # pin@v1 + with: + key: ${{ github.job }} + + - uses: astral-sh/setup-uv@5a7eac68fb9809dea845d802897dc5c723910fa3 # v7.1.3 + with: + version: '0.8.2' + # we just cache the venv-dir directly in action-setup-venv + enable-cache: false + + - uses: getsentry/action-setup-venv@5a80476d175edf56cb205b08bc58986fa99d1725 # v3.2.0 + with: + cache-dependency-path: uv.lock + install-cmd: uv sync --frozen --only-dev --active + + - name: Run multi-topic integration test + run: | + make test-multi-topic From bdb398bf5202261a481756d6edcb658b298c98a8 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:41:16 +0200 Subject: [PATCH 5/7] add deprecation warning for raw mode too --- src/config.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/config.rs b/src/config.rs index aa26df2d..8969f3b0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -710,6 +710,9 @@ impl Config { kafka_topics with a cluster reference instead" ); } + if self.raw_mode { + warn!("raw_mode is deprecated, use kafka_topics..raw instead"); + } let topic_name = self .kafka_topic From 58af65997f68bc8c5ee6456359a235317503cc88 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 12:51:34 +0200 Subject: [PATCH 6/7] fix lints --- integration_tests/integration_tests/test_multi_topic.py | 1 - src/kafka/admin.rs | 5 +---- src/main.rs | 3 ++- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/integration_tests/integration_tests/test_multi_topic.py b/integration_tests/integration_tests/test_multi_topic.py index 7b0e21e8..1706b68b 100644 --- a/integration_tests/integration_tests/test_multi_topic.py +++ b/integration_tests/integration_tests/test_multi_topic.py @@ -2,7 +2,6 @@ import subprocess import time -import pytest import yaml from integration_tests.helpers import ( diff --git a/src/kafka/admin.rs b/src/kafka/admin.rs index 36f11e94..61cfee57 100644 --- a/src/kafka/admin.rs +++ b/src/kafka/admin.rs @@ -24,10 +24,7 @@ pub async fn create_missing_topics( .create() .map_err(|e| anyhow!("Unable to create rdkafka admin client: {e}"))?; - info!( - "Creating topics {:?} if they do not already exist", - topics - ); + info!("Creating topics {:?} if they do not already exist", topics); let new_topics: Vec = topics .iter() .map(|(name, partitions)| NewTopic::new(name, *partitions, TopicReplication::Fixed(1))) diff --git a/src/main.rs b/src/main.rs index 9f938ebe..ec56cb1f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -340,7 +340,8 @@ async fn main() -> Result<(), Error> { .on_completion(log_task_completion("maintenance_task", maintenance_task)); for (topic, handle) in consumer_tasks { - departure = departure.on_completion(log_task_completion(format!("consumer:{topic}"), handle)); + departure = + departure.on_completion(log_task_completion(format!("consumer:{topic}"), handle)); } if let Some(task) = grpc_server_task { From b0325ee96b5984dff9a9ba13db09254d61d3e868 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 3 Jun 2026 14:46:24 +0200 Subject: [PATCH 7/7] feat(taskbroker): Add drain mode Add a `drain: true` config flag that lets a broker start with zero consumable topics (and thus no consumers) and just flush whatever is already in its store out to workers, while upkeep keeps running. Used to move a topic off a broker by editing config: the old broker keeps draining its DB. ref STREAM-1042 Co-Authored-By: Claude Opus 4.8 --- src/config.rs | 96 +++++++++++++++++++++++++++++++++++++++++++++- src/main.rs | 10 ++++- src/store/tests.rs | 26 +++++++++++++ src/upkeep.rs | 4 +- 4 files changed, 131 insertions(+), 5 deletions(-) diff --git a/src/config.rs b/src/config.rs index 8969f3b0..fb7d4129 100644 --- a/src/config.rs +++ b/src/config.rs @@ -502,6 +502,13 @@ pub struct Config { /// After normalization, this always contains at least the "default" cluster. #[serde(default)] pub kafka_clusters: BTreeMap, + + /// Run in drain mode: start with zero consumable topics (no consumers) and + /// just flush whatever is already in the store out to workers, then run + /// upkeep (retries/deadletters are still produced to their declared topics). + /// Used to move a topic off a broker by editing config: the old broker + /// keeps draining its DB. Without this, zero consumable topics is an error. + pub drain: bool, } impl Default for Config { @@ -603,6 +610,7 @@ impl Default for Config { raw_processing_deadline_duration: 30, kafka_topics: BTreeMap::new(), kafka_clusters: BTreeMap::new(), + drain: false, } } } @@ -855,8 +863,21 @@ impl Config { })?; } - // Validate at least one consumable topic. - let consumable = self.consumable_topics()?; + // Consumable (non-produce_only) topics; may be empty in drain mode. + let consumable: Vec<(&str, &TopicConfig)> = self + .kafka_topics + .iter() + .filter(|(_, cfg)| !cfg.produce_only) + .map(|(name, cfg)| (name.as_str(), cfg)) + .collect(); + // A broker with no consumers is only meaningful in drain mode (it just + // flushes its existing store); otherwise it's a misconfiguration. + if consumable.is_empty() && !self.drain { + return Err(anyhow!( + "no consumable topic configured (all topics have produce_only: true); \ + set drain: true to run a broker with no consumers" + )); + } // Multi-topic consumption is only supported on the sqlite adapter for // now. The postgres adapter filters claims by a single shared partition @@ -913,6 +934,14 @@ impl Config { } match single_topic { Some(topic) => self.kafka_retry_topic = Some(topic), + None if count == 0 => { + // Drain mode (the only way to reach zero consumable topics): + // retries are still produced, so a retry topic is required. + return Err(anyhow!( + "kafka_retry_topic is required in drain mode (no consumable topic to \ + fall back to)" + )); + } None => { return Err(anyhow!( "kafka_retry_topic is required when consuming from multiple topics ({} \ @@ -1902,6 +1931,69 @@ kafka_clusters: }); } + /// drain mode lets the broker start with zero consumable topics (only + /// produce-only retry/dlq topics remain) so it can flush its store. + const DRAIN_CONFIG: &str = r#" +drain: true +kafka_deadletter_topic: tasks-dlq +kafka_retry_topic: tasks-retry + +kafka_topics: + tasks-retry: + cluster: my-cluster + consumer_group: tasks-retry + produce_only: true + tasks-dlq: + cluster: my-cluster + consumer_group: tasks-dlq + produce_only: true + +kafka_clusters: + my-cluster: + address: 10.0.0.1:9092 +"#; + + #[test] + fn test_drain_mode_allows_zero_consumable_topics() { + Jail::expect_with(|jail| { + jail.create_file("config.yaml", DRAIN_CONFIG)?; + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let config = Config::from_args(&args).unwrap(); + + assert!(config.drain); + // No consumable topics: consumable_topics() errors, but the config + // is valid because drain mode permits it. + assert!(config.consumable_topics().is_err()); + assert_eq!(config.retry_topic(), "tasks-retry"); + + Ok(()) + }); + } + + #[test] + fn test_zero_consumable_topics_rejected_without_drain() { + Jail::expect_with(|jail| { + // Same config as drain, but drain disabled -> rejected. + jail.create_file( + "config.yaml", + &DRAIN_CONFIG.replace("drain: true", "drain: false"), + )?; + let args = Args { + config: Some("config.yaml".to_owned()), + }; + let err = Config::from_args(&args).unwrap_err(); + assert!( + err.to_string().contains("no consumable topic configured"), + "unexpected error: {}", + err + ); + + Ok(()) + }); + } + #[test] fn test_multi_topic_rejects_zero_consumable_topics() { Jail::expect_with(|jail| { diff --git a/src/main.rs b/src/main.rs index ec56cb1f..ce8b09f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -152,14 +152,20 @@ async fn main() -> Result<(), Error> { // Consumer(s) from kafka. Each consumed topic gets its own consumer (own // group.id and cluster), so we spawn one consumer task per consumable topic, - // all sharing the one activation store. + // all sharing the one activation store. In drain mode there are no consumable + // topics, so this is empty and we spawn no consumers — the broker just + // flushes its existing store via upkeep and the delivery path. let consumer_topics: Vec = config .consumable_topics() - .expect("invalid config: no consumable topic") + .unwrap_or_default() .into_iter() .map(|(name, _)| name.to_owned()) .collect(); + if consumer_topics.is_empty() { + info!("No consumable topics configured; running in drain mode (no consumers)"); + } + let mut consumer_tasks: Vec<(String, JoinHandle>)> = Vec::new(); for topic in consumer_topics { let consumer_store = store.clone(); diff --git a/src/store/tests.rs b/src/store/tests.rs index 4754ece6..31cca714 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -98,6 +98,32 @@ async fn test_store(#[case] adapter: &str) { store.remove_db().await.unwrap(); } +/// Drain mode (`drain: true`) runs with no consumers, so partitions are never +/// assigned and the assigned set stays empty. An empty assignment must mean "no +/// partition filter" so the broker can still flush everything already in its +/// store -- NOT "filter to nothing". This is what makes draining work, and it +/// matters most on postgres, the adapter that actually filters by partition. +#[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +async fn test_claim_drains_without_partition_assignment(#[case] adapter: &str) { + let store = create_test_store(adapter).await; + store.assign_partitions(vec![]).unwrap(); + + assert!(store.store(make_activations(3)).await.is_ok()); + + for _ in 0..3 { + let claimed = store.claim_activation_for_pull(None, None).await.unwrap(); + assert!( + claimed.is_some(), + "claim returned None with no partition assignment; a drain broker could \ + never flush its store" + ); + } + store.remove_db().await.unwrap(); +} + #[tokio::test] #[rstest] #[case::sqlite("sqlite")] diff --git a/src/upkeep.rs b/src/upkeep.rs index f4812f25..77be02e1 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -316,7 +316,9 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to `runtime_config.demoted_topic` let demoted_namespaces = runtime_config.demoted_namespaces.clone(); - let consumable = config.consumable_topics().expect("no consumable topic"); + // Empty in drain mode (no consumable topics); forwarding still drains + // demoted tasks out of the store to the configured forward topic. + let consumable = config.consumable_topics().unwrap_or_default(); // Default the forward cluster to the consumed cluster when there's exactly // one consumed topic (legacy behavior). With multiple consumed topics there // is no single consumed cluster, so fall back to the producer (deadletter)