From 1de6886003d238ee0ea00a2b7a01e2d74a2be503 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 4 Jun 2026 12:21:27 +0200 Subject: [PATCH 1/4] Remove unnecessary Arc --- src/kafka/consumer.rs | 26 +++++++++----------------- src/kafka/deserialize.rs | 6 ++---- src/kafka/deserialize_activation.rs | 20 +++++++------------- src/kafka/deserialize_raw.rs | 14 ++++++-------- 4 files changed, 24 insertions(+), 42 deletions(-) diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index 3d445a37..1548ef0c 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -454,7 +454,7 @@ impl MessageQueue for StreamPartitionQueue { #[instrument(skip_all)] pub async fn map( queue: impl MessageQueue, - transform: impl Fn(Arc) -> Result, + transform: impl Fn(&OwnedMessage) -> Result, ok: mpsc::Sender<(iter::Once, T)>, err: mpsc::Sender, shutdown: CancellationToken, @@ -475,16 +475,10 @@ pub async fn map( let Some(msg) = val else { break; }; - let msg = Arc::new(msg.detach()?); - match transform(msg.clone()) { + let msg = msg.detach()?; + match transform(&msg) { Ok(transformed) => { - if ok.send(( - iter::once( - Arc::try_unwrap(msg) - .expect("msg should only have a single strong ref"), - ), - transformed, - )).await.is_err() { + if ok.send((iter::once(msg), transformed)).await.is_err() { debug!("Receive half of ok channel is closed, shutting down..."); break; } @@ -497,11 +491,9 @@ pub async fn map( "Failed to map message: {:?}", e, ); - err.send( - Arc::try_unwrap(msg).expect("msg should only have a single strong ref"), - ) - .await - .expect("reduce_err is not available"); + err.send(msg) + .await + .expect("reduce_err is not available"); } } } @@ -1836,7 +1828,7 @@ mod tests { ), map: - |_: Arc| Ok(()), + |_: &OwnedMessage| Ok(()), reduce: NoopReducer::new(), NoopReducer::new(), @@ -1853,7 +1845,7 @@ mod tests { ), map: - |_: Arc| Ok(()), + |_: &OwnedMessage| Ok(()), reduce: NoopReducer::new(), }); diff --git a/src/kafka/deserialize.rs b/src/kafka/deserialize.rs index 61f484b1..3219dde0 100644 --- a/src/kafka/deserialize.rs +++ b/src/kafka/deserialize.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use anyhow::Error; use rdkafka::Message; use rdkafka::message::OwnedMessage; @@ -40,12 +38,12 @@ impl DeserializeConfig { /// In raw mode, raw Kafka bytes are wrapped into a TaskActivation. /// In normal mode, Kafka messages are expected to contain encoded TaskActivation protos. /// Messages from the retry topic are always deserialized as activations. -pub fn new(config: DeserializeConfig) -> impl Fn(Arc) -> Result { +pub fn new(config: DeserializeConfig) -> impl Fn(&OwnedMessage) -> Result { let raw_deserializer = config.raw_config.map(deserialize_raw::new); let activation_deserializer = deserialize_activation::new(config.activation_config); let retry_topic = config.retry_topic; - move |msg: Arc| { + move |msg: &OwnedMessage| { // Messages from the retry topic are always activations if let Some(ref retry_topic) = retry_topic && msg.topic() == retry_topic diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 9ab1d11e..50655858 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use std::time::Duration; use anyhow::{Error, anyhow}; @@ -35,8 +34,8 @@ pub fn bucket_from_id(id: &str) -> i16 { pub fn new( config: DeserializeActivationConfig, -) -> impl Fn(Arc) -> Result { - move |msg: Arc| { +) -> impl Fn(&OwnedMessage) -> Result { + move |msg: &OwnedMessage| { let Some(payload) = msg.payload() else { return Err(anyhow!("Message has no payload")); }; @@ -169,8 +168,7 @@ mod tests { 0, None, ); - let arc_message = Arc::new(message); - let inflight_opt = deserializer(arc_message); + let inflight_opt = deserializer(&message); assert!(inflight_opt.is_ok()); let inflight = inflight_opt.unwrap(); @@ -215,8 +213,7 @@ mod tests { 0, None, ); - let arc_message = Arc::new(message); - let inflight_opt = deserializer(arc_message); + let inflight_opt = deserializer(&message); assert!(inflight_opt.is_ok()); let inflight = inflight_opt.unwrap(); @@ -262,8 +259,7 @@ mod tests { 0, None, ); - let arc_message = Arc::new(message); - let inflight_opt = deserializer(arc_message); + let inflight_opt = deserializer(&message); assert!(inflight_opt.is_ok()); let inflight = inflight_opt.unwrap(); @@ -309,8 +305,7 @@ mod tests { 0, None, ); - let arc_message = Arc::new(message); - let inflight_opt = deserializer(arc_message); + let inflight_opt = deserializer(&message); assert!(inflight_opt.is_ok()); let inflight = inflight_opt.unwrap(); @@ -357,8 +352,7 @@ mod tests { 0, None, ); - let arc_message = Arc::new(message); - let inflight_opt = deserializer(arc_message); + let inflight_opt = deserializer(&message); assert!(inflight_opt.is_ok()); let inflight = inflight_opt.unwrap(); diff --git a/src/kafka/deserialize_raw.rs b/src/kafka/deserialize_raw.rs index 40c53a02..e43d5411 100644 --- a/src/kafka/deserialize_raw.rs +++ b/src/kafka/deserialize_raw.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use anyhow::Error; use chrono::{DateTime, Utc}; @@ -102,8 +101,8 @@ fn encode_raw_params(raw_bytes: &[u8]) -> Vec { /// Create a deserializer closure for raw mode. /// Wraps raw Kafka message bytes into a TaskActivation with msgpack-encoded parameters_bytes. -pub fn new(config: RawConfig) -> impl Fn(Arc) -> Result { - move |msg: Arc| { +pub fn new(config: RawConfig) -> impl Fn(&OwnedMessage) -> Result { + move |msg: &OwnedMessage| { // Whether a message without payload is valid is technically not up to taskbroker, and we // can't DLQ messages here. It's easier to convert it to an empty bytestring and let the // task fail. Failed tasks can be DLQed in upkeep.rs @@ -130,7 +129,7 @@ pub fn new(config: RawConfig) -> impl Fn(Arc) -> Result impl Fn(Arc) -> Result Date: Thu, 4 Jun 2026 12:27:56 +0200 Subject: [PATCH 2/4] Remove some activation clones in push --- src/kafka/activation_batcher.rs | 2 +- src/kafka/activation_writer.rs | 3 +-- src/push/thread.rs | 9 ++++----- src/upkeep.rs | 12 +++++------- src/worker.rs | 16 ++++++++-------- 5 files changed, 19 insertions(+), 23 deletions(-) diff --git a/src/kafka/activation_batcher.rs b/src/kafka/activation_batcher.rs index 9e608c5f..86c905cd 100644 --- a/src/kafka/activation_batcher.rs +++ b/src/kafka/activation_batcher.rs @@ -134,7 +134,7 @@ impl Reducer for ActivationBatcher { "taskname" => task_name.clone(), ) .increment(1); - self.forward_batch.push(t.activation.clone()); + self.forward_batch.push(t.activation); return Ok(()); } } diff --git a/src/kafka/activation_writer.rs b/src/kafka/activation_writer.rs index 2f024910..31feedd1 100644 --- a/src/kafka/activation_writer.rs +++ b/src/kafka/activation_writer.rs @@ -149,7 +149,6 @@ impl Reducer for ActivationWriter { let insert_id = Utc::now().timestamp_millis(); debug!("Preparing insert {:?}", insert_id); - let batch = self.batch.clone().unwrap(); let write_to_store_start = Instant::now(); let res = self.store.store(batch.clone()).await; @@ -158,7 +157,7 @@ impl Reducer for ActivationWriter { match res { Ok(entries) => { - self.batch.take(); + let batch = self.batch.take().unwrap(); let lag = Utc::now() - batch .iter() diff --git a/src/push/thread.rs b/src/push/thread.rs index 2ea49d7a..297438a6 100644 --- a/src/push/thread.rs +++ b/src/push/thread.rs @@ -52,8 +52,7 @@ impl PushThread { } async fn push_task(&mut self, activation: Activation) { - // Store the ID for later since `push_task` claims ownership over `activation` - let id = activation.id.clone(); + let id = &activation.id; // First, determine the correct worker service let Some(worker) = self.workers.get_mut(&activation.application) else { @@ -70,7 +69,7 @@ impl PushThread { // Then, push the task to that service let result = timed!( - worker.push_task(activation.clone()), + worker.push_task(&activation), "worker.push_task.duration" ); @@ -86,7 +85,7 @@ impl PushThread { // Revert claimed task back to pending if let Err(e) = self .store - .set_status(&id, ActivationStatus::Pending, None, None) + .set_status(id, ActivationStatus::Pending, None, None) .await { metrics::counter!("push.undo_claim", "result" => "error").increment(1); @@ -124,7 +123,7 @@ impl PushThread { // Finally, mark the activation as processing let result = timed!( - self.store.mark_activation_processing(&id), + self.store.mark_activation_processing(id), "push.mark_activation_processing.duration" ); diff --git a/src/upkeep.rs b/src/upkeep.rs index d2a069f3..f94d1ead 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -167,7 +167,7 @@ pub async fn do_upkeep( async move { let activation = TaskActivation::decode(&inflight.activation as &[u8]).unwrap(); - let serialized = create_retry_activation(&activation).encode_to_vec(); + let serialized = create_retry_activation(activation).encode_to_vec(); let delivery = producer .send( FutureRecord::<(), Vec>::to(&target_topic).payload(&serialized), @@ -541,9 +541,7 @@ pub async fn do_upkeep( /// Create a new activation that is a 'retry' of the passed activation /// The retry_state.attempts is advanced as part of the retry state machine. #[instrument(skip_all)] -fn create_retry_activation(activation: &TaskActivation) -> TaskActivation { - let mut new_activation = activation.clone(); - +fn create_retry_activation(mut new_activation: TaskActivation) -> TaskActivation { let now = Utc::now(); new_activation.id = Uuid::new_v4().into(); new_activation.received_at = Some(Timestamp { @@ -634,7 +632,7 @@ mod tests { delay_on_retry: Some(60), }); - let retry = create_retry_activation(&activation); + let retry = create_retry_activation(activation); assert_eq!(retry.delay, Some(60)); assert_eq!( retry.retry_state, @@ -661,7 +659,7 @@ mod tests { delay_on_retry: Some(60), }); - let retry = create_retry_activation(&activation); + let retry = create_retry_activation(activation); assert_eq!(retry.delay, Some(60)); assert_eq!( retry.retry_state, @@ -688,7 +686,7 @@ mod tests { delay_on_retry: None, }); - let retry = create_retry_activation(&activation); + let retry = create_retry_activation(activation); assert_eq!(retry.delay, None); assert_eq!( retry.retry_state, diff --git a/src/worker.rs b/src/worker.rs index 2942ce41..d1437693 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -40,7 +40,7 @@ fn sentry_signature_hex(secret: &str, grpc_path: &str, message: &[u8]) -> String #[async_trait] pub trait WorkerClient: Send + Sync { /// Send a single activation to the worker service. - async fn push_task(&mut self, activation: Activation) -> Result<()>; + async fn push_task(&mut self, activation: &Activation) -> Result<()>; } /// Wrapper around worker connection that provides authentication and timeouts. @@ -78,7 +78,7 @@ impl Worker { #[async_trait] impl WorkerClient for Worker { #[framed] - async fn push_task(&mut self, activation: Activation) -> Result<()> { + async fn push_task(&mut self, activation: &Activation) -> Result<()> { metrics::counter!("worker.push_task.attempts").increment(1); // Try to decode activation @@ -134,9 +134,9 @@ impl MockWorkerClient { #[cfg(test)] #[async_trait] impl WorkerClient for MockWorkerClient { - async fn push_task(&mut self, activation: Activation) -> Result<()> { + async fn push_task(&mut self, activation: &Activation) -> Result<()> { TaskActivation::decode(&activation.activation as &[u8]).map_err(|e| anyhow!(e))?; - self.pushed.push(activation.id); + self.pushed.push(activation.id.clone()); if self.fail { return Err(anyhow!("mock send failure")); @@ -159,7 +159,7 @@ struct NotifyingWorkerClient { #[cfg(test)] #[async_trait] impl WorkerClient for NotifyingWorkerClient { - async fn push_task(&mut self, _activation: Activation) -> Result<()> { + async fn push_task(&mut self, _activation: &Activation) -> Result<()> { self.notify.notify_one(); if self.fail { @@ -195,7 +195,7 @@ mod tests { let activation = make_activations(1).remove(0); let mut worker = MockWorkerClient::new(false); - let result = worker.push_task(activation.clone()).await; + let result = worker.push_task(&activation).await; assert!(result.is_ok(), "push_task should succeed"); assert_eq!(worker.pushed, vec![activation.id]); } @@ -206,7 +206,7 @@ mod tests { activation.activation = vec![1, 2, 3, 4]; let mut worker = MockWorkerClient::new(false); - let result = worker.push_task(activation).await; + let result = worker.push_task(&activation).await; assert!(result.is_err(), "invalid payload should fail decoding"); assert!( @@ -220,7 +220,7 @@ mod tests { let activation = make_activations(1).remove(0); let mut worker = MockWorkerClient::new(true); - let result = worker.push_task(activation.clone()).await; + let result = worker.push_task(&activation).await; assert!(result.is_err(), "worker push errors should propagate"); assert_eq!(worker.pushed, vec![activation.id]); } From 8fc856a0ff4778455f69c0d97e3d67780ece811b Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 4 Jun 2026 13:31:13 +0200 Subject: [PATCH 3/4] make tablerow borrow from activation, activationstore borrows activation --- benches/store_bench.rs | 4 +- src/fetch/tests.rs | 2 +- src/grpc/server_tests.rs | 18 ++--- src/kafka/activation_writer.rs | 6 +- src/push/tests.rs | 2 +- src/push/thread.rs | 5 +- src/store/adapters/postgres.rs | 129 ++++++++++++++++++++++----------- src/store/adapters/sqlite.rs | 115 +++++++++++++++++++---------- src/store/tests.rs | 100 ++++++++++++------------- src/store/traits.rs | 2 +- src/upkeep.rs | 26 +++---- 11 files changed, 244 insertions(+), 165 deletions(-) diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 448e6d02..e1619d73 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -41,7 +41,7 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { let namespace = generate_unique_namespace(); for chunk in make_activations_with_namespace(namespace.clone(), num_activations).chunks(1024) { - store.store(chunk.to_vec()).await.unwrap(); + store.store(chunk).await.unwrap(); } assert_eq!( @@ -106,7 +106,7 @@ async fn set_status(num_activations: u32, num_workers: u32) { let namespace = generate_unique_namespace(); for chunk in make_activations_with_namespace(namespace, num_activations).chunks(1024) { - store.store(chunk.to_vec()).await.unwrap(); + store.store(chunk).await.unwrap(); } assert_eq!( diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index c04d3196..795790d5 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -68,7 +68,7 @@ impl ActivationStore for MockStore { unimplemented!() } - async fn store(&self, _batch: Vec) -> Result { + async fn store(&self, _batch: &[Activation]) -> Result { unimplemented!() } diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index bd571d31..ca6836e3 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -139,7 +139,7 @@ async fn test_get_task_success(#[case] adapter: &str) { let config = create_config(); let activations = make_activations(1); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store: store.clone(), @@ -179,7 +179,7 @@ async fn test_get_task_with_application_success(#[case] adapter: &str) { activations[1].activation = payload.encode_to_vec(); activations[1].application = "hammers".into(); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store, @@ -213,7 +213,7 @@ async fn test_get_task_with_namespace_requires_application(#[case] adapter: &str let activations = make_activations(2); let namespace = activations[0].namespace.clone(); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store, @@ -243,7 +243,7 @@ async fn test_set_task_status_success(#[case] adapter: &str) { let config = create_config(); let activations = make_activations(2); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store, @@ -297,7 +297,7 @@ async fn test_set_task_status_with_application(#[case] adapter: &str) { activations[1].activation = payload.encode_to_vec(); activations[1].application = "hammers".into(); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store, @@ -344,7 +344,7 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) { activations[1].activation = payload.encode_to_vec(); activations[1].application = "hammers".into(); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store, @@ -381,7 +381,7 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte let activations = make_activations(2); let namespace = activations[0].namespace.clone(); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store, @@ -420,7 +420,7 @@ async fn test_set_task_status_forwards_to_update_channel(#[case] adapter: &str) let (update_tx, mut update_rx) = mpsc::channel::(8); let activations = make_activations(2); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store: store.clone(), @@ -477,7 +477,7 @@ async fn test_set_task_status_update_channel_closed_returns_internal() { drop(update_rx); let activations = make_activations(1); - store.store(activations).await.unwrap(); + store.store(&activations).await.unwrap(); let service = TaskbrokerServer { store, diff --git a/src/kafka/activation_writer.rs b/src/kafka/activation_writer.rs index 31feedd1..c9b64fe5 100644 --- a/src/kafka/activation_writer.rs +++ b/src/kafka/activation_writer.rs @@ -150,7 +150,7 @@ impl Reducer for ActivationWriter { debug!("Preparing insert {:?}", insert_id); let write_to_store_start = Instant::now(); - let res = self.store.store(batch.clone()).await; + let res = self.store.store(batch).await; // If every "preparing" has a matching "completed" we are good debug!("Completed insert {:?}", insert_id); @@ -477,7 +477,7 @@ mod tests { .status(ActivationStatus::Processing) .build(TaskActivationBuilder::new()); - store.store(vec![existing_activation]).await.unwrap(); + store.store(&[existing_activation]).await.unwrap(); let mut writer = ActivationWriter::new(store.clone(), writer_config); let batch = vec![ @@ -537,7 +537,7 @@ mod tests { write_failure_backoff_ms: 4000, }; let first_round = make_activations(200); - store.store(first_round).await.unwrap(); + store.store(&first_round).await.unwrap(); assert!(store.db_size().await.unwrap() > 50_000); // Make more activations that won't be stored. diff --git a/src/push/tests.rs b/src/push/tests.rs index 217bc710..d7948ca1 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -29,7 +29,7 @@ impl MockStore { #[async_trait] impl ActivationStore for MockStore { - async fn store(&self, _batch: Vec) -> Result { + async fn store(&self, _batch: &[Activation]) -> Result { Ok(0) } diff --git a/src/push/thread.rs b/src/push/thread.rs index 297438a6..bc89fea3 100644 --- a/src/push/thread.rs +++ b/src/push/thread.rs @@ -68,10 +68,7 @@ impl PushThread { }; // Then, push the task to that service - let result = timed!( - worker.push_task(&activation), - "worker.push_task.duration" - ); + let result = timed!(worker.push_task(&activation), "worker.push_task.duration"); if let Err(e) = result { metrics::counter!("worker.push_task", "result" => "error").increment(1); diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 4cfdac14..da125d05 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::HashMap; use std::str::FromStr; use std::sync::RwLock; @@ -5,8 +6,8 @@ use std::time::Instant; use sqlx::ConnectOptions; use sqlx::pool::PoolConnection; -use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; -use sqlx::{FromRow, Pool, Postgres, QueryBuilder, Transaction}; +use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions, PgRow}; +use sqlx::{FromRow, Pool, Postgres, QueryBuilder, Row, Transaction}; use anyhow::{Error, anyhow}; use async_backtrace::framed; @@ -23,10 +24,17 @@ use crate::store::retry::{RetryConfig, retry_query}; use crate::store::traits::ActivationStore; use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; -#[derive(Debug, FromRow)] -struct TableRow { - pub id: String, - pub activation: Vec, +/// Database representation of an [`Activation`], used for both reads and +/// writes. +/// +/// On the write path it is built with `TableRow::from(&Activation)` and +/// borrows the activation's strings and payload, so storing a batch does not +/// copy it. On the read path sqlx decodes a fully owned `TableRow<'static>`, +/// which is converted into an [`Activation`] without further copies. +#[derive(Debug)] +struct TableRow<'a> { + pub id: Cow<'a, str>, + pub activation: Cow<'a, [u8]>, pub partition: i32, pub offset: i64, pub added_at: DateTime, @@ -37,23 +45,20 @@ struct TableRow { pub processing_deadline_duration: i32, pub processing_deadline: Option>, pub claim_expires_at: Option>, - pub status: String, + pub status: Cow<'a, str>, pub at_most_once: bool, - pub application: String, - pub namespace: String, - pub taskname: String, - #[sqlx(try_from = "i32")] + pub application: Cow<'a, str>, + pub namespace: Cow<'a, str>, + pub taskname: Cow<'a, str>, pub on_attempts_exceeded: OnAttemptsExceeded, pub bucket: i16, } -impl TryFrom for TableRow { - type Error = anyhow::Error; - - fn try_from(value: Activation) -> Result { - Ok(Self { - id: value.id, - activation: value.activation, +impl<'a> From<&'a Activation> for TableRow<'a> { + fn from(value: &'a Activation) -> Self { + Self { + id: Cow::Borrowed(&value.id), + activation: Cow::Borrowed(&value.activation), partition: value.partition, offset: value.offset, added_at: value.added_at, @@ -64,22 +69,24 @@ impl TryFrom for TableRow { processing_deadline_duration: value.processing_deadline_duration, processing_deadline: value.processing_deadline, claim_expires_at: value.claim_expires_at, - status: value.status.to_string(), + status: Cow::Owned(value.status.to_string()), at_most_once: value.at_most_once, - application: value.application, - namespace: value.namespace, - taskname: value.taskname, + application: Cow::Borrowed(&value.application), + namespace: Cow::Borrowed(&value.namespace), + taskname: Cow::Borrowed(&value.taskname), on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, - }) + } } } -impl From for Activation { - fn from(value: TableRow) -> Self { +impl From> for Activation { + fn from(value: TableRow<'_>) -> Self { + // On the read path we're using TableRow<'static>, which already has + // owned strings inside. Therefore into_owned() does nothing. Self { - id: value.id, - activation: value.activation, + id: value.id.into_owned(), + activation: value.activation.into_owned(), status: ActivationStatus::from_str(&value.status).unwrap(), partition: value.partition, offset: value.offset, @@ -92,15 +99,50 @@ impl From for Activation { processing_deadline: value.processing_deadline, claim_expires_at: value.claim_expires_at, at_most_once: value.at_most_once, - application: value.application, - namespace: value.namespace, - taskname: value.taskname, + application: value.application.into_owned(), + namespace: value.namespace.into_owned(), + taskname: value.taskname.into_owned(), on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, } } } +/// Decode an owned row. Hand-written rather than derived because the derive +/// would bind the struct's lifetime parameter to the row's, which +/// `query_as`/`fetch_all` (which drop the row) cannot express. +impl FromRow<'_, PgRow> for TableRow<'static> { + fn from_row(row: &PgRow) -> Result { + Ok(Self { + id: Cow::Owned(row.try_get::("id")?), + activation: Cow::Owned(row.try_get::, _>("activation")?), + partition: row.try_get("partition")?, + offset: row.try_get("offset")?, + added_at: row.try_get("added_at")?, + received_at: row.try_get("received_at")?, + processing_attempts: row.try_get("processing_attempts")?, + expires_at: row.try_get("expires_at")?, + delay_until: row.try_get("delay_until")?, + processing_deadline_duration: row.try_get("processing_deadline_duration")?, + processing_deadline: row.try_get("processing_deadline")?, + claim_expires_at: row.try_get("claim_expires_at")?, + status: Cow::Owned(row.try_get::("status")?), + at_most_once: row.try_get("at_most_once")?, + application: Cow::Owned(row.try_get::("application")?), + namespace: Cow::Owned(row.try_get::("namespace")?), + taskname: Cow::Owned(row.try_get::("taskname")?), + on_attempts_exceeded: row + .try_get::("on_attempts_exceeded")? + .try_into() + .map_err(|err| sqlx::Error::ColumnDecode { + index: "on_attempts_exceeded".into(), + source: Box::new(err), + })?, + bucket: row.try_get("bucket")?, + }) + } +} + #[framed] pub async fn create_postgres_pool( connection: &PgConnectOptions, @@ -359,16 +401,11 @@ impl ActivationStore for PostgresStore { #[instrument(skip_all)] #[framed] - async fn store(&self, batch: Vec) -> Result { + async fn store(&self, batch: &[Activation]) -> Result { if batch.is_empty() { return Ok(0); } - let rows = batch - .into_iter() - .map(TableRow::try_from) - .collect::, _>>()?; - retry_query(&self.config.retry_config, "store", || async { let mut query_builder = QueryBuilder::::new( " @@ -397,9 +434,13 @@ impl ActivationStore for PostgresStore { ", ); let query = query_builder - .push_values(&rows, |mut b, row| { - b.push_bind(&row.id); - b.push_bind(&row.activation); + .push_values(batch.iter().map(TableRow::from), |mut b, row| { + b.push_bind(row.id); + // Cow<[u8]> has no Encode impl, so bind the variants directly. + match row.activation { + Cow::Borrowed(bytes) => b.push_bind(bytes), + Cow::Owned(bytes) => b.push_bind(bytes), + }; b.push_bind(row.partition); b.push_bind(row.offset); b.push_bind(row.added_at); @@ -418,11 +459,11 @@ impl ActivationStore for PostgresStore { } else { b.push("null"); } - b.push_bind(&row.status); + b.push_bind(row.status); b.push_bind(row.at_most_once); - b.push_bind(&row.application); - b.push_bind(&row.namespace); - b.push_bind(&row.taskname); + b.push_bind(row.application); + b.push_bind(row.namespace); + b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); b.push_bind(row.bucket); }) @@ -799,7 +840,7 @@ impl ActivationStore for PostgresStore { .execute(&mut *tx) .await?; - row.activation = updated_activation; + row.activation = Cow::Owned(updated_activation); } tx.commit().await?; diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index b1ff1fb0..3d6bbcfb 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::str::FromStr; use std::time::Instant; @@ -30,10 +31,17 @@ use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; use crate::store::types::{BucketRange, FailedTasksForwarder}; -#[derive(Debug, FromRow)] -pub struct TableRow { - pub id: String, - pub activation: Vec, +/// Database representation of an [`Activation`], used for both reads and +/// writes. +/// +/// On the write path it is built with `TableRow::from(&Activation)` and +/// borrows the activation's strings and payload, so storing a batch does not +/// copy it. On the read path sqlx decodes a fully owned `TableRow<'static>`, +/// which is converted into an [`Activation`] without further copies. +#[derive(Debug)] +pub struct TableRow<'a> { + pub id: Cow<'a, str>, + pub activation: Cow<'a, [u8]>, pub partition: i32, pub offset: i64, pub added_at: DateTime, @@ -44,23 +52,20 @@ pub struct TableRow { pub processing_deadline_duration: i32, pub processing_deadline: Option>, pub claim_expires_at: Option>, - pub status: String, + pub status: Cow<'a, str>, pub at_most_once: bool, - pub application: String, - pub namespace: String, - pub taskname: String, - #[sqlx(try_from = "i32")] + pub application: Cow<'a, str>, + pub namespace: Cow<'a, str>, + pub taskname: Cow<'a, str>, pub on_attempts_exceeded: OnAttemptsExceeded, pub bucket: i16, } -impl TryFrom for TableRow { - type Error = anyhow::Error; - - fn try_from(value: Activation) -> Result { - Ok(Self { - id: value.id, - activation: value.activation, +impl<'a> From<&'a Activation> for TableRow<'a> { + fn from(value: &'a Activation) -> Self { + Self { + id: Cow::Borrowed(&value.id), + activation: Cow::Borrowed(&value.activation), partition: value.partition, offset: value.offset, added_at: value.added_at, @@ -71,22 +76,24 @@ impl TryFrom for TableRow { processing_deadline_duration: value.processing_deadline_duration, processing_deadline: value.processing_deadline, claim_expires_at: value.claim_expires_at, - status: value.status.to_string(), + status: Cow::Owned(value.status.to_string()), at_most_once: value.at_most_once, - application: value.application, - namespace: value.namespace, - taskname: value.taskname, + application: Cow::Borrowed(&value.application), + namespace: Cow::Borrowed(&value.namespace), + taskname: Cow::Borrowed(&value.taskname), on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, - }) + } } } -impl From for Activation { - fn from(value: TableRow) -> Self { +impl From> for Activation { + fn from(value: TableRow<'_>) -> Self { + // On the read path we're using TableRow<'static>, which already has + // owned strings inside. Therefore into_owned() does nothing. Self { - id: value.id, - activation: value.activation, + id: value.id.into_owned(), + activation: value.activation.into_owned(), status: ActivationStatus::from_str(&value.status).unwrap(), partition: value.partition, offset: value.offset, @@ -99,15 +106,50 @@ impl From for Activation { processing_deadline: value.processing_deadline, claim_expires_at: value.claim_expires_at, at_most_once: value.at_most_once, - application: value.application, - namespace: value.namespace, - taskname: value.taskname, + application: value.application.into_owned(), + namespace: value.namespace.into_owned(), + taskname: value.taskname.into_owned(), on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, } } } +/// Decode an owned row. Hand-written rather than derived because the derive +/// would bind the struct's lifetime parameter to the row's, which +/// `query_as`/`fetch_all` (which drop the row) cannot express. +impl FromRow<'_, SqliteRow> for TableRow<'static> { + fn from_row(row: &SqliteRow) -> Result { + Ok(Self { + id: Cow::Owned(row.try_get::("id")?), + activation: Cow::Owned(row.try_get::, _>("activation")?), + partition: row.try_get("partition")?, + offset: row.try_get("offset")?, + added_at: row.try_get("added_at")?, + received_at: row.try_get("received_at")?, + processing_attempts: row.try_get("processing_attempts")?, + expires_at: row.try_get("expires_at")?, + delay_until: row.try_get("delay_until")?, + processing_deadline_duration: row.try_get("processing_deadline_duration")?, + processing_deadline: row.try_get("processing_deadline")?, + claim_expires_at: row.try_get("claim_expires_at")?, + status: Cow::Owned(row.try_get::("status")?), + at_most_once: row.try_get("at_most_once")?, + application: Cow::Owned(row.try_get::("application")?), + namespace: Cow::Owned(row.try_get::("namespace")?), + taskname: Cow::Owned(row.try_get::("taskname")?), + on_attempts_exceeded: row + .try_get::("on_attempts_exceeded")? + .try_into() + .map_err(|err| sqlx::Error::ColumnDecode { + index: "on_attempts_exceeded".into(), + source: Box::new(err), + })?, + bucket: row.try_get("bucket")?, + }) + } +} + pub async fn create_sqlite_pool(url: &str) -> Result<(Pool, Pool), Error> { if !Sqlite::database_exists(url).await? { Sqlite::create_database(url).await? @@ -443,7 +485,7 @@ impl ActivationStore for SqliteStore { } #[instrument(skip_all)] - async fn store(&self, batch: Vec) -> Result { + async fn store(&self, batch: &[Activation]) -> Result { if batch.is_empty() { return Ok(0); } @@ -474,15 +516,14 @@ impl ActivationStore for SqliteStore { ) ", ); - let rows = batch - .into_iter() - .map(TableRow::try_from) - .collect::, _>>()?; - let query = query_builder - .push_values(rows, |mut b, row| { + .push_values(batch.iter().map(TableRow::from), |mut b, row| { b.push_bind(row.id); - b.push_bind(row.activation); + // Cow<[u8]> has no Encode impl, so bind the variants directly. + match row.activation { + Cow::Borrowed(bytes) => b.push_bind(bytes), + Cow::Owned(bytes) => b.push_bind(bytes), + }; b.push_bind(row.partition); b.push_bind(row.offset); b.push_bind(row.added_at.timestamp()); @@ -759,7 +800,7 @@ impl ActivationStore for SqliteStore { .execute(&mut *tx) .await?; - row.activation = updated_activation; + row.activation = Cow::Owned(updated_activation); } tx.commit().await?; diff --git a/src/store/tests.rs b/src/store/tests.rs index 4754ece6..2bbec4ea 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -91,7 +91,7 @@ async fn test_store(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(2); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result = store.count().await; assert_eq!(result.unwrap(), 2); @@ -127,7 +127,7 @@ async fn test_count_depths(#[case] adapter: &str) { // Check counts for a store with four activations with varying statuses let batch = make_activations(4); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); store .set_status("id_0", ActivationStatus::Processing, None, None) @@ -198,7 +198,7 @@ async fn test_count_depths_per_partition_postgres() { make("p0_2", 0, 2), make("p1_0", 1, 3), ]; - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); store .set_status("p0_0", ActivationStatus::Processing, None, None) @@ -247,7 +247,7 @@ async fn test_store_duplicate_id_in_batch(#[case] adapter: &str) { batch[0].id = "id_0".into(); batch[1].id = "id_0".into(); - let first_result = store.store(batch).await; + let first_result = store.store(&batch).await; assert!( first_result.is_ok(), "{}", @@ -267,7 +267,7 @@ async fn test_store_duplicate_id_between_batches(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(2); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let first_count = store.count().await; assert_eq!(first_count.unwrap(), 2); @@ -275,7 +275,7 @@ async fn test_store_duplicate_id_between_batches(#[case] adapter: &str) { // Old batch and new should have conflicts assert_eq!(batch[0].id, new_batch[0].id); assert_eq!(batch[1].id, new_batch[1].id); - assert!(store.store(new_batch).await.is_ok()); + assert!(store.store(&new_batch).await.is_ok()); let second_count = store.count().await; assert_eq!(second_count.unwrap(), 2); @@ -290,7 +290,7 @@ async fn test_get_pending_activation(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(2); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result = store .claim_activation_for_pull(None, None) @@ -327,7 +327,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) { let mut batch = make_activations(2); batch[0].bucket = 10; batch[1].bucket = 20; - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let mut first = store .claim_activations_for_push(Some(1), Some((15, 25))) @@ -371,7 +371,7 @@ async fn test_get_pending_activation_with_race(#[case] adapter: &str) { for chunk in make_activations_with_namespace(namespace.clone(), NUM_CONCURRENT_WRITES).chunks(1024) { - store.store(chunk.to_vec()).await.unwrap(); + store.store(chunk).await.unwrap(); } let (tx, _) = broadcast::channel::<()>(1); @@ -417,7 +417,7 @@ async fn test_get_pending_activation_with_namespace(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].namespace = "other_namespace".into(); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let other_namespace = "other_namespace".to_string(); // Get activation from other namespace @@ -445,7 +445,7 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: & batch[1].namespace = "ns2".into(); batch[2].namespace = "ns3".into(); batch[3].namespace = "ns4".into(); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); // Get activation from multiple namespaces (should get oldest). // Use `claim_activations` so upkeep-style `None` application + namespaces is allowed (not `claim_activations_for_push`). @@ -474,7 +474,7 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case] let mut batch = make_activations(2); batch[1].namespace = "other_namespace".into(); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); // This is an invalid query as we don't want to allow clients to fetch tasks from any application let other_namespace = "other_namespace".to_string(); @@ -516,7 +516,7 @@ async fn test_get_pending_activation_skip_expires(#[case] adapter: &str) { let mut batch = make_activations(1); batch[0].expires_at = Some(Utc::now() - Duration::from_secs(100)); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result = store.claim_activation_for_pull(None, None).await; assert!(result.is_ok()); @@ -543,7 +543,7 @@ async fn test_get_pending_activation_earliest(#[case] adapter: &str) { let mut batch = make_activations(2); batch[0].added_at = Utc.with_ymd_and_hms(2024, 6, 24, 0, 0, 0).unwrap(); batch[1].added_at = Utc.with_ymd_and_hms(1998, 6, 24, 0, 0, 0).unwrap(); - let ret = store.store(batch.clone()).await; + let ret = store.store(&batch).await; assert!(ret.is_ok(), "{}", ret.err().unwrap().to_string()); let result = store @@ -567,7 +567,7 @@ async fn test_get_pending_activation_fetches_application(#[case] adapter: &str) let mut batch = make_activations(1); batch[0].application = "hammers".into(); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); // Getting an activation with no application filter should // include activations with application set. @@ -592,7 +592,7 @@ async fn test_get_pending_activation_with_application(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].application = "hammers".into(); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); // Get activation from a named application let result = store @@ -634,7 +634,7 @@ async fn test_get_pending_activation_with_application_and_namespace(#[case] adap batch[2].application = "hammers".into(); batch[2].namespace = "not-target".into(); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let target_ns = "target".to_string(); // Get activation from a named application @@ -669,7 +669,7 @@ async fn test_get_pending_activations_no_limit(#[case] adapter: &str) { const N: usize = 4; let batch = make_activations(N as u32); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let got = store.claim_activations_for_push(None, None).await.unwrap(); assert_eq!(got.len(), N); @@ -697,7 +697,7 @@ async fn test_get_pending_activations_limit_below_pending(#[case] adapter: &str) const X: i32 = 3; let batch = make_activations(N as u32); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let got = store .claim_activations_for_push(Some(X), None) @@ -731,7 +731,7 @@ async fn test_get_pending_activations_limit_above_pending(#[case] adapter: &str) const X: i32 = 10; let batch = make_activations(Y as u32); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let got = store .claim_activations_for_push(Some(X), None) @@ -761,7 +761,7 @@ async fn test_count_pending_activations(#[case] adapter: &str) { let mut batch = make_activations(3); batch[0].status = ActivationStatus::Processing; - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_eq!(store.count_pending_activations().await.unwrap(), 2); assert_counts( @@ -784,7 +784,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(2); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 2, @@ -882,7 +882,7 @@ async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].partition = 1; - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 1, @@ -981,7 +981,7 @@ async fn test_set_processing_deadline(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(1); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let deadline = Utc::now().round_subsecs(0); let result = store.set_processing_deadline("id_0", Some(deadline)).await; @@ -1003,7 +1003,7 @@ async fn test_delete_activation(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(2); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result = store.count().await; assert_eq!(result.unwrap(), 2); @@ -1030,7 +1030,7 @@ async fn test_get_retry_activations(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(2); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 2, @@ -1082,7 +1082,7 @@ async fn test_handle_processing_deadline(#[case] adapter: &str) { batch[1].status = ActivationStatus::Processing; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 1, @@ -1127,7 +1127,7 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); batch[1].status = ActivationStatus::Claimed; batch[1].processing_deadline = Some(Utc::now() + chrono::Duration::days(30)); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { processing: 1, @@ -1180,7 +1180,7 @@ async fn test_handle_processing_at_most_once(#[case] adapter: &str) { batch[1].at_most_once = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { processing: 2, @@ -1229,7 +1229,7 @@ async fn test_handle_processing_deadline_discard_after(#[case] adapter: &str) { }), ); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 1, @@ -1275,7 +1275,7 @@ async fn test_handle_processing_deadline_deadletter_after(#[case] adapter: &str) }), ); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 1, @@ -1320,7 +1320,7 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & delay_on_retry: None, }), ); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { processing: 1, @@ -1355,7 +1355,7 @@ async fn test_handle_claim_expiration_unsent_no_attempt_increment(#[case] adapte let mut batch = make_activations(1); batch[0].status = ActivationStatus::Claimed; batch[0].claim_expires_at = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let count = store.handle_claim_expiration().await.unwrap(); assert_eq!(count, 1); let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); @@ -1374,7 +1374,7 @@ async fn test_handle_claim_expiration_at_most_once_reverts_to_pending(#[case] ad batch[0].status = ActivationStatus::Claimed; batch[0].at_most_once = true; batch[0].claim_expires_at = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let count = store.handle_claim_expiration().await.unwrap(); assert_eq!(count, 1); let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); @@ -1403,7 +1403,7 @@ async fn test_processing_attempts_exceeded(#[case] adapter: &str) { batch[2].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); batch[2].processing_attempts = config.max_processing_attempts as i32; - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { complete: 1, @@ -1443,7 +1443,7 @@ async fn test_remove_completed(#[case] adapter: &str) { records[2].status = ActivationStatus::Complete; records[2].added_at += Duration::from_secs(2); - assert!(store.store(records.clone()).await.is_ok()); + assert!(store.store(&records).await.is_ok()); assert_counts( StatusCount { complete: 2, @@ -1509,7 +1509,7 @@ async fn test_remove_completed_multiple_gaps(#[case] adapter: &str) { records[3].status = ActivationStatus::Processing; records[3].added_at += Duration::from_secs(3); - assert!(store.store(records.clone()).await.is_ok()); + assert!(store.store(&records).await.is_ok()); assert_counts( StatusCount { complete: 2, @@ -1613,7 +1613,7 @@ async fn test_handle_failed_tasks(#[case] adapter: &str) { delay_on_retry: None, }), ); - assert!(store.store(records.clone()).await.is_ok()); + assert!(store.store(&records).await.is_ok()); assert_counts( StatusCount { failure: 4, @@ -1663,7 +1663,7 @@ async fn test_mark_completed(#[case] adapter: &str) { let store = create_test_store(adapter).await; let records = make_activations(3); - assert!(store.store(records.clone()).await.is_ok()); + assert!(store.store(&records).await.is_ok()); assert_counts( StatusCount { pending: 3, @@ -1704,7 +1704,7 @@ async fn test_handle_expires_at(#[case] adapter: &str) { batch[1].expires_at = Some(Utc::now() + (Duration::from_secs(5 * 60))); batch[2].expires_at = Some(Utc::now() - (Duration::from_secs(5 * 60))); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 3, @@ -1744,7 +1744,7 @@ async fn test_remove_killswitched(#[case] adapter: &str) { batch[2].taskname = "task_to_be_killswitched_two".to_string(); batch[4].taskname = "task_to_be_killswitched_three".to_string(); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 6, @@ -1796,7 +1796,7 @@ async fn test_clear(#[case] adapter: &str) { .build(TaskActivationBuilder::new()), ]; - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_counts( StatusCount { pending: 1, @@ -1821,7 +1821,7 @@ async fn test_full_vacuum(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(2); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result = store.full_vacuum_db().await; assert!(result.is_ok()); @@ -1836,7 +1836,7 @@ async fn test_vacuum_db_no_limit(#[case] adapter: &str) { let store = create_test_store(adapter).await; let batch = make_activations(2); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result = store.vacuum_db().await; assert!(result.is_ok()); @@ -1857,7 +1857,7 @@ async fn test_vacuum_db_incremental() { .expect("could not create store"); let batch = make_activations(2); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result = store.vacuum_db().await; assert!(result.is_ok()); @@ -1876,7 +1876,7 @@ async fn test_db_size(#[case] adapter: &str) { // Generate a large enough batch that we use another page. let batch = make_activations(50); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let second_size = store.db_size().await.unwrap(); assert!(second_size > first_size, "should have more bytes now"); @@ -1895,7 +1895,7 @@ async fn test_pending_activation_max_lag_no_pending(#[case] adapter: &str) { let mut processing = make_activations(1); processing[0].status = ActivationStatus::Processing; - assert!(store.store(processing).await.is_ok()); + assert!(store.store(&processing).await.is_ok()); // No pending activations, max lag is 0 assert_eq!(0.0, store.pending_activation_max_lag(&now).await); @@ -1913,7 +1913,7 @@ async fn test_pending_activation_max_lag_use_oldest(#[case] adapter: &str) { let mut pending = make_activations(2); pending[0].received_at = now - Duration::from_secs(10); pending[1].received_at = now - Duration::from_secs(500); - assert!(store.store(pending).await.is_ok()); + assert!(store.store(&pending).await.is_ok()); let result = store.pending_activation_max_lag(&now).await; assert!(11.0 < result, "Should not get the small record"); @@ -1933,7 +1933,7 @@ async fn test_pending_activation_max_lag_ignore_processing_attempts(#[case] adap pending[0].received_at = now - Duration::from_secs(10); pending[1].received_at = now - Duration::from_secs(500); pending[1].processing_attempts = 1; - assert!(store.store(pending).await.is_ok()); + assert!(store.store(&pending).await.is_ok()); let result = store.pending_activation_max_lag(&now).await; assert_eq!(result, 10.0, "max lag: {result:?}"); @@ -1953,7 +1953,7 @@ async fn test_pending_activation_max_lag_account_for_delayed(#[case] adapter: &s // the lag of a delayed task should begin *after* the delay has passed. pending[0].received_at = now - Duration::from_secs(520); pending[0].delay_until = Some(now - Duration::from_millis(22020)); - assert!(store.store(pending).await.is_ok()); + assert!(store.store(&pending).await.is_ok()); let result = store.pending_activation_max_lag(&now).await; assert!(22.00 < result, "result: {result}"); diff --git a/src/store/traits.rs b/src/store/traits.rs index 9040acb2..ad99922a 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -13,7 +13,7 @@ use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; pub trait ActivationStore: Send + Sync { /// CONSUMER OPERATIONS /// Store a batch of activations - async fn store(&self, batch: Vec) -> Result; + async fn store(&self, batch: &[Activation]) -> Result; fn assign_partitions(&self, partitions: Vec) -> Result<(), Error>; diff --git a/src/upkeep.rs b/src/upkeep.rs index f94d1ead..78a7e0b7 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -749,7 +749,7 @@ mod tests { records[0].activation = activation.encode_to_vec(); records[1].added_at += Duration::from_secs(1); - assert!(store.store(records.clone()).await.is_ok()); + assert!(store.store(&records).await.is_ok()); let result_context = do_upkeep( config.clone(), @@ -811,7 +811,7 @@ mod tests { // Make a task with a future processing deadline batch[1].status = ActivationStatus::Processing; batch[1].processing_deadline = Some(Utc::now() + TimeDelta::minutes(5)); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let _ = do_upkeep( config, @@ -849,7 +849,7 @@ mod tests { batch[1].status = ActivationStatus::Processing; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); // Should start off with one in processing assert_eq!( @@ -907,7 +907,7 @@ mod tests { batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); batch[1].processing_attempts = 0; - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); // Should start off with one in processing assert_eq!( @@ -977,7 +977,7 @@ mod tests { batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); batch[1].at_most_once = true; - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result_context = do_upkeep( config, @@ -1025,7 +1025,7 @@ mod tests { batch[2].processing_attempts = config.max_processing_attempts as i32; batch[2].added_at += Duration::from_secs(2); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result_context = do_upkeep( config, store.clone(), @@ -1088,7 +1088,7 @@ mod tests { ); records[0].status = ActivationStatus::Failure; records[1].added_at += Duration::from_secs(1); - assert!(store.store(records.clone()).await.is_ok()); + assert!(store.store(&records).await.is_ok()); let result_context = do_upkeep( config.clone(), @@ -1135,7 +1135,7 @@ mod tests { let mut batch = make_activations(2); batch[0].status = ActivationStatus::Failure; batch[1].added_at += Duration::from_secs(1); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result_context = do_upkeep( config, @@ -1187,7 +1187,7 @@ mod tests { batch[3].expires_at = Some(Utc::now() + Duration::from_secs(100)); batch[3].added_at += Duration::from_secs(1); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result_context = do_upkeep( config, store.clone(), @@ -1256,7 +1256,7 @@ mod tests { batch[1].status = ActivationStatus::Delay; batch[1].delay_until = Some(Utc::now() + Duration::from_secs(1)); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); assert_eq!( store .count_by_status(ActivationStatus::Delay) @@ -1359,7 +1359,7 @@ demoted_namespaces: batch[1].namespace = "bad_namespace2".to_string(); batch[4].namespace = "bad_namespace1".to_string(); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result_context = do_upkeep( config, @@ -1421,7 +1421,7 @@ demoted_namespaces: batch[2].taskname = "task_to_be_killswitched".to_string(); batch[4].taskname = "task_to_be_killswitched".to_string(); - assert!(store.store(batch).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let result_context = do_upkeep( config, @@ -1462,7 +1462,7 @@ demoted_namespaces: let mut last_vacuum = Instant::now() - Duration::from_secs(60); let batch = make_activations(2); - assert!(store.store(batch.clone()).await.is_ok()); + assert!(store.store(&batch).await.is_ok()); let _ = do_upkeep( config, From 1a08851377815dbba141ddf646de98c6a62b5d64 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 8 Jun 2026 11:25:14 +0200 Subject: [PATCH 4/4] push around a clone(), so that EagerUpdater doesnt need it --- src/push/thread.rs | 5 +---- src/push/updater.rs | 10 +++++----- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/push/thread.rs b/src/push/thread.rs index 6bf09463..e75b8318 100644 --- a/src/push/thread.rs +++ b/src/push/thread.rs @@ -123,10 +123,7 @@ impl PushThread { } // Finally, mark the activation as processing - let result = timed!( - self.updater.update(id.clone()), - "push.thread.update.duration" - ); + let result = timed!(self.updater.update(id), "push.thread.update.duration"); if let Err(e) = result { metrics::counter!("push.thread.update", "result" => "error").increment(1); diff --git a/src/push/updater.rs b/src/push/updater.rs index 731b94bc..52f48630 100644 --- a/src/push/updater.rs +++ b/src/push/updater.rs @@ -22,7 +22,7 @@ pub trait Updater: Send + Sync { } /// Update activation in some way given its ID. - async fn update(&self, id: String) -> Result<()>; + async fn update(&self, id: &str) -> Result<()>; /// Stop the updater. Useful for updaters that run a background task. fn stop(&self) {} @@ -192,7 +192,7 @@ impl Updater for LazyUpdater { Ok(()) } - async fn update(&self, id: String) -> Result<()> { + async fn update(&self, id: &str) -> Result<()> { // Lock the ID buffer let mut buffer = self.lock_buffer("update").await; @@ -217,7 +217,7 @@ impl Updater for LazyUpdater { } } - buffer.push(id); + buffer.push(id.to_string()); Ok(()) } @@ -240,8 +240,8 @@ impl EagerUpdater { #[async_trait] impl Updater for EagerUpdater { - async fn update(&self, id: String) -> Result<()> { - self.store.mark_activation_processing(&id).await + async fn update(&self, id: &str) -> Result<()> { + self.store.mark_activation_processing(id).await } }