Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ impl Default for Config {
maintenance_task_interval_ms: 6000,
max_delayed_task_allowed_sec: 3600,
max_message_size: 5000000,
grpc_max_message_size: 10 * 1024 * 1024, // 10MB
grpc_max_message_size: 12 * 1024 * 1024, // 12MB
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this on purpose?

vacuum_page_count: None,
full_vacuum_on_start: true,
full_vacuum_on_upkeep: true,
Expand Down
11 changes: 7 additions & 4 deletions src/store/adapters/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,8 @@ impl ActivationStore for PostgresStore {
.await
}

/// Get the age of the oldest pending activation in seconds.
/// Only activations with status=pending and processing_attempts=0 are considered
/// Get the age of the oldest pending/claimed activation in seconds.
/// Only activations with status=pending/claimed and processing_attempts=0 are considered
/// as we are interested in latency to the *first* attempt.
/// Tasks with delay_until set, will have their age adjusted based on their
/// delay time. No tasks = 0 lag
Expand All @@ -640,9 +640,12 @@ impl ActivationStore for PostgresStore {
let mut query_builder = QueryBuilder::new(
"SELECT received_at, delay_until
FROM inflight_taskactivations
WHERE status = ",
WHERE status IN (",
);
query_builder.push_bind(ActivationStatus::Pending.to_string());
let mut separated = query_builder.separated(", ");
separated.push_bind(ActivationStatus::Pending.to_string());
separated.push_bind(ActivationStatus::Claimed.to_string());
query_builder.push(")");
query_builder.push(" AND processing_attempts = 0");

self.add_partition_condition(&mut query_builder, false);
Expand Down
7 changes: 4 additions & 3 deletions src/store/adapters/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,22 +675,23 @@ impl ActivationStore for SqliteStore {
Ok(result.rows_affected())
}

/// Get the age of the oldest pending activation in seconds.
/// Only activations with status=pending and processing_attempts=0 are considered
/// Get the age of the oldest pending/claimed activation in seconds.
/// Only activations with status=pending/claimed and processing_attempts=0 are considered
/// as we are interested in latency to the *first* attempt.
/// Tasks with delay_until set, will have their age adjusted based on their
/// delay time. No tasks = 0 lag
async fn pending_activation_max_lag(&self, now: &DateTime<Utc>) -> f64 {
let result = match sqlx::query(
"SELECT received_at, delay_until
FROM inflight_taskactivations
WHERE status = $1
WHERE (status = $1 or status = $2)
AND processing_attempts = 0
ORDER BY received_at ASC
LIMIT 1
",
)
.bind(ActivationStatus::Pending)
.bind(ActivationStatus::Claimed)
.fetch_optional(&self.read_pool)
.await
{
Expand Down
34 changes: 20 additions & 14 deletions src/store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1897,7 +1897,7 @@ async fn test_pending_activation_max_lag_no_pending(#[case] adapter: &str) {
processing[0].status = ActivationStatus::Processing;
assert!(store.store(processing).await.is_ok());

// No pending activations, max lag is 0
// No pending or claimed activations, max lag is 0
assert_eq!(0.0, store.pending_activation_max_lag(&now).await);
store.remove_db().await.unwrap();
}
Expand All @@ -1910,10 +1910,12 @@ async fn test_pending_activation_max_lag_use_oldest(#[case] adapter: &str) {
let now = Utc::now();
let store = create_test_store(adapter).await;

let mut pending = make_activations(2);
pending[0].received_at = now - Duration::from_secs(10);
pending[1].received_at = now - Duration::from_secs(500);
assert!(store.store(pending).await.is_ok());
let mut activations = make_activations(3);
activations[0].received_at = now - Duration::from_secs(10);
activations[1].received_at = now - Duration::from_secs(500);
activations[2].status = ActivationStatus::Claimed;
activations[2].received_at = now - Duration::from_secs(50);
assert!(store.store(activations).await.is_ok());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there also be a test for the claimed task having the highest latency?

let result = store.pending_activation_max_lag(&now).await;
assert!(11.0 < result, "Should not get the small record");
Expand All @@ -1929,11 +1931,14 @@ async fn test_pending_activation_max_lag_ignore_processing_attempts(#[case] adap
let now = Utc::now().round_subsecs(0);
let store = create_test_store(adapter).await;

let mut pending = make_activations(2);
pending[0].received_at = now - Duration::from_secs(10);
pending[1].received_at = now - Duration::from_secs(500);
pending[1].processing_attempts = 1;
assert!(store.store(pending).await.is_ok());
let mut activations = make_activations(3);
activations[0].received_at = now - Duration::from_secs(10);
activations[1].received_at = now - Duration::from_secs(500);
activations[1].processing_attempts = 1;
activations[2].status = ActivationStatus::Claimed;
activations[2].received_at = now - Duration::from_secs(500);
activations[2].processing_attempts = 1;
assert!(store.store(activations).await.is_ok());

let result = store.pending_activation_max_lag(&now).await;
assert_eq!(result, 10.0, "max lag: {result:?}");
Expand All @@ -1948,12 +1953,13 @@ async fn test_pending_activation_max_lag_account_for_delayed(#[case] adapter: &s
let now = Utc::now();
let store = create_test_store(adapter).await;

let mut pending = make_activations(2);
let mut activations = make_activations(3);
// delayed tasks are received well before they become pending
// the lag of a delayed task should begin *after* the delay has passed.
pending[0].received_at = now - Duration::from_secs(520);
pending[0].delay_until = Some(now - Duration::from_millis(22020));
assert!(store.store(pending).await.is_ok());
activations[0].received_at = now - Duration::from_secs(520);
activations[0].delay_until = Some(now - Duration::from_millis(22020));
activations[1].status = ActivationStatus::Claimed;
assert!(store.store(activations).await.is_ok());

let result = store.pending_activation_max_lag(&now).await;
assert!(22.00 < result, "result: {result}");
Expand Down
Loading