From 9a77bdcb2367240d9c5504c6b3363778d599ebd4 Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Fri, 5 Jun 2026 12:54:37 -0400 Subject: [PATCH 1/2] fix: Update latency check to use CLAIMED status There was a chance that the latency metric would be very spiky if a message was moving between the PENDING and CLAIMED status. Update the check to use both statuses so the latency stays consistent. --- src/store/adapters/postgres.rs | 11 +++++++---- src/store/adapters/sqlite.rs | 7 ++++--- src/store/tests.rs | 34 ++++++++++++++++++++-------------- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 67611b68..82495f18 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -630,8 +630,8 @@ impl ActivationStore for PostgresStore { .await } - /// Get the age of the oldest pending activation in seconds. - /// Only activations with status=pending and processing_attempts=0 are considered + /// Get the age of the oldest pending/claimed activation in seconds. + /// Only activations with status=pending/claimed and processing_attempts=0 are considered /// as we are interested in latency to the *first* attempt. /// Tasks with delay_until set, will have their age adjusted based on their /// delay time. No tasks = 0 lag @@ -640,9 +640,12 @@ impl ActivationStore for PostgresStore { let mut query_builder = QueryBuilder::new( "SELECT received_at, delay_until FROM inflight_taskactivations - WHERE status = ", + WHERE status IN (", ); - query_builder.push_bind(ActivationStatus::Pending.to_string()); + let mut separated = query_builder.separated(", "); + separated.push_bind(ActivationStatus::Pending.to_string()); + separated.push_bind(ActivationStatus::Claimed.to_string()); + query_builder.push(")"); query_builder.push(" AND processing_attempts = 0"); self.add_partition_condition(&mut query_builder, false); diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index 5691ba85..b6517b21 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -675,8 +675,8 @@ impl ActivationStore for SqliteStore { Ok(result.rows_affected()) } - /// Get the age of the oldest pending activation in seconds. - /// Only activations with status=pending and processing_attempts=0 are considered + /// Get the age of the oldest pending/claimed activation in seconds. + /// Only activations with status=pending/claimed and processing_attempts=0 are considered /// as we are interested in latency to the *first* attempt. /// Tasks with delay_until set, will have their age adjusted based on their /// delay time. No tasks = 0 lag @@ -684,13 +684,14 @@ impl ActivationStore for SqliteStore { let result = match sqlx::query( "SELECT received_at, delay_until FROM inflight_taskactivations - WHERE status = $1 + WHERE (status = $1 or status = $2) AND processing_attempts = 0 ORDER BY received_at ASC LIMIT 1 ", ) .bind(ActivationStatus::Pending) + .bind(ActivationStatus::Claimed) .fetch_optional(&self.read_pool) .await { diff --git a/src/store/tests.rs b/src/store/tests.rs index 4754ece6..ba7e707d 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -1897,7 +1897,7 @@ async fn test_pending_activation_max_lag_no_pending(#[case] adapter: &str) { processing[0].status = ActivationStatus::Processing; assert!(store.store(processing).await.is_ok()); - // No pending activations, max lag is 0 + // No pending or claimed activations, max lag is 0 assert_eq!(0.0, store.pending_activation_max_lag(&now).await); store.remove_db().await.unwrap(); } @@ -1910,10 +1910,12 @@ async fn test_pending_activation_max_lag_use_oldest(#[case] adapter: &str) { let now = Utc::now(); let store = create_test_store(adapter).await; - let mut pending = make_activations(2); - pending[0].received_at = now - Duration::from_secs(10); - pending[1].received_at = now - Duration::from_secs(500); - assert!(store.store(pending).await.is_ok()); + let mut activations = make_activations(3); + activations[0].received_at = now - Duration::from_secs(10); + activations[1].received_at = now - Duration::from_secs(500); + activations[2].status = ActivationStatus::Claimed; + activations[2].received_at = now - Duration::from_secs(50); + assert!(store.store(activations).await.is_ok()); let result = store.pending_activation_max_lag(&now).await; assert!(11.0 < result, "Should not get the small record"); @@ -1929,11 +1931,14 @@ async fn test_pending_activation_max_lag_ignore_processing_attempts(#[case] adap let now = Utc::now().round_subsecs(0); let store = create_test_store(adapter).await; - let mut pending = make_activations(2); - pending[0].received_at = now - Duration::from_secs(10); - pending[1].received_at = now - Duration::from_secs(500); - pending[1].processing_attempts = 1; - assert!(store.store(pending).await.is_ok()); + let mut activations = make_activations(3); + activations[0].received_at = now - Duration::from_secs(10); + activations[1].received_at = now - Duration::from_secs(500); + activations[1].processing_attempts = 1; + activations[2].status = ActivationStatus::Claimed; + activations[2].received_at = now - Duration::from_secs(500); + activations[2].processing_attempts = 1; + assert!(store.store(activations).await.is_ok()); let result = store.pending_activation_max_lag(&now).await; assert_eq!(result, 10.0, "max lag: {result:?}"); @@ -1948,12 +1953,13 @@ async fn test_pending_activation_max_lag_account_for_delayed(#[case] adapter: &s let now = Utc::now(); let store = create_test_store(adapter).await; - let mut pending = make_activations(2); + let mut activations = make_activations(3); // delayed tasks are received well before they become pending // the lag of a delayed task should begin *after* the delay has passed. - pending[0].received_at = now - Duration::from_secs(520); - pending[0].delay_until = Some(now - Duration::from_millis(22020)); - assert!(store.store(pending).await.is_ok()); + activations[0].received_at = now - Duration::from_secs(520); + activations[0].delay_until = Some(now - Duration::from_millis(22020)); + activations[1].status = ActivationStatus::Claimed; + assert!(store.store(activations).await.is_ok()); let result = store.pending_activation_max_lag(&now).await; assert!(22.00 < result, "result: {result}"); From d607ee05178538836086b790ba3a7ba9b92e171f Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Fri, 5 Jun 2026 13:34:33 -0400 Subject: [PATCH 2/2] update max size --- src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 5813a93e..71efaed9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -609,7 +609,7 @@ impl Default for Config { maintenance_task_interval_ms: 6000, max_delayed_task_allowed_sec: 3600, max_message_size: 5000000, - grpc_max_message_size: 10 * 1024 * 1024, // 10MB + grpc_max_message_size: 12 * 1024 * 1024, // 12MB vacuum_page_count: None, full_vacuum_on_start: true, full_vacuum_on_upkeep: true,