From 4c0b480e24886556a967822063557d6fdd3b1038 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 1 Apr 2026 17:45:18 -0500 Subject: [PATCH] Paginate by sort_order instead of key order Clients often need to fetch recent entries without iterating over the entire keyspace. Ordering by a monotonic insertion counter lets callers retrieve the newest records first and stop early, which is not possible with lexicographic key ordering. A BIGSERIAL sort_order column is added to vss_db. New rows get a monotonically increasing value from its sequence, and list queries order by sort_order DESC. Because sort_order is UNIQUE, the page token collapses to a single integer with no tiebreaker needed. A composite index on (user_token, store_id, sort_order DESC) INCLUDE (key, version) keeps list queries as index-only scans. Pre-existing rows receive sequence values in heap-scan order during the column rewrite, so their list ordering will not reflect creation time; new rows onward do. Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 3 +- proto/vss.proto | 4 +- rust/api/src/kv_store_tests.rs | 98 ++++- rust/api/src/types.rs | 4 +- rust/impls/src/migrations.rs | 2 + .../src/postgres/sql/v0_create_vss_db.sql | 1 + rust/impls/src/postgres_store.rs | 352 +++++++++++++++++- 7 files changed, 424 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 0685e1b5..b4519b91 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,8 @@ integration with different cloud providers. As long as the API contract is imple effortlessly switch between different instances of VSS. VSS ships with a PostgreSQL implementation by default and can be hosted in your favorite infrastructure/cloud provider -(AWS/GCP) and its backend storage can be switched with some other implementation for KeyValueStore if needed. +(AWS/GCP). The backend storage can be switched with another implementation, but it must support ordering by creation +time, a simple key-value store is not sufficient. ### Project Status diff --git a/proto/vss.proto b/proto/vss.proto index c0533e04..c0bacdde 100644 --- a/proto/vss.proto +++ b/proto/vss.proto @@ -210,7 +210,7 @@ message ListKeyVersionsRequest { // Server response for `ListKeyVersions` API. message ListKeyVersionsResponse { - // Fetched keys and versions. + // Fetched keys and versions, ordered by creation time (newest first). // Even though this API reuses the `KeyValue` struct, the `value` sub-field will not be set by the server. repeated KeyValue key_versions = 1; @@ -218,6 +218,8 @@ message ListKeyVersionsResponse { // Use this value to query for next-page of paginated `ListKeyVersions` operation, by specifying // this value as the `page_token` in the next request. // + // Following AIP-158 (https://google.aip.dev/158): + // // If `next_page_token` is empty (""), then the "last page" of results has been processed and // there is no more data to be retrieved. // diff --git a/rust/api/src/kv_store_tests.rs b/rust/api/src/kv_store_tests.rs index 3e3ad1d6..0a829f03 100644 --- a/rust/api/src/kv_store_tests.rs +++ b/rust/api/src/kv_store_tests.rs @@ -53,6 +53,8 @@ macro_rules! define_kv_store_tests { create_test!(list_should_honour_page_size_and_key_prefix_if_provided); create_test!(list_should_return_zero_global_version_when_global_versioning_not_enabled); create_test!(list_should_limit_max_page_size); + create_test!(list_should_return_results_ordered_by_creation_time); + create_test!(list_should_paginate_by_creation_time_with_prefix); }; } @@ -393,12 +395,11 @@ pub trait KvStoreTestSuite { }, }; - if current_page.key_versions.is_empty() { - break; - } - all_key_versions.extend(current_page.key_versions); - next_page_token = current_page.next_page_token; + match current_page.next_page_token { + Some(token) if !token.is_empty() => next_page_token = Some(token), + _ => break, + } } if let Some(k1_response) = all_key_versions.iter().find(|kv| kv.key == "k1") { @@ -444,13 +445,12 @@ pub trait KvStoreTestSuite { }, }; - if current_page.key_versions.is_empty() { - break; - } - assert!(current_page.key_versions.len() <= page_size as usize); all_key_versions.extend(current_page.key_versions); - next_page_token = current_page.next_page_token; + match current_page.next_page_token { + Some(token) if !token.is_empty() => next_page_token = Some(token), + _ => break, + } } let unique_keys: std::collections::HashSet = @@ -490,12 +490,11 @@ pub trait KvStoreTestSuite { Some(next_page_token) => ctx.list(Some(next_page_token), None, None).await?, }; - if current_page.key_versions.is_empty() { - break; - } - all_key_versions.extend(current_page.key_versions); - next_page_token = current_page.next_page_token; + match current_page.next_page_token { + Some(token) if !token.is_empty() => next_page_token = Some(token), + _ => break, + } } let unique_keys: std::collections::HashSet = @@ -506,6 +505,66 @@ pub trait KvStoreTestSuite { Ok(()) } + async fn list_should_return_results_ordered_by_creation_time() -> Result<(), VssError> { + let kv_store = Self::create_store().await; + let ctx = TestContext::new(&kv_store); + + ctx.put_objects(Some(0), vec![kv("z_first", "v1", 0)]).await?; + ctx.put_objects(Some(1), vec![kv("a_third", "v1", 0)]).await?; + ctx.put_objects(Some(2), vec![kv("m_second", "v1", 0)]).await?; + + let page = ctx.list(None, None, None).await?; + assert_eq!(page.global_version, Some(3)); + let keys: Vec<&str> = page.key_versions.iter().map(|kv| kv.key.as_str()).collect(); + + // Results should be in reverse creation order (newest first), not alphabetical. + assert_eq!(keys, vec!["m_second", "a_third", "z_first"]); + + Ok(()) + } + + async fn list_should_paginate_by_creation_time_with_prefix() -> Result<(), VssError> { + let kv_store = Self::create_store().await; + let ctx = TestContext::new(&kv_store); + + // Insert prefixed keys in reverse-alphabetical order with a page_size of 1 + // to force multiple pages and verify cross-page ordering. + ctx.put_objects(Some(0), vec![kv("pfx_z", "v1", 0)]).await?; + ctx.put_objects(Some(1), vec![kv("pfx_a", "v1", 0)]).await?; + ctx.put_objects(Some(2), vec![kv("other", "v1", 0)]).await?; + ctx.put_objects(Some(3), vec![kv("pfx_m", "v1", 0)]).await?; + + let mut next_page_token: Option = None; + let mut all_keys: Vec = Vec::new(); + + loop { + let current_page = match next_page_token.take() { + None => { + let page = ctx.list(None, Some(1), Some("pfx_".to_string())).await?; + assert_eq!(page.global_version, Some(4)); + page + }, + Some(token) => { + let page = ctx.list(Some(token), Some(1), Some("pfx_".to_string())).await?; + assert!(page.global_version.is_none()); + page + }, + }; + + assert!(current_page.key_versions.len() <= 1); + all_keys.extend(current_page.key_versions.into_iter().map(|kv| kv.key)); + match current_page.next_page_token { + Some(token) if !token.is_empty() => next_page_token = Some(token), + _ => break, + } + } + + // Should get prefixed keys in reverse creation order (newest first), excluding "other". + assert_eq!(all_keys, vec!["pfx_m", "pfx_a", "pfx_z"]); + + Ok(()) + } + async fn list_should_limit_max_page_size() -> Result<(), VssError> { let kv_store = Self::create_store().await; let ctx = TestContext::new(&kv_store); @@ -524,16 +583,15 @@ pub trait KvStoreTestSuite { None => ctx.list(None, None, None).await?, Some(next_page_token) => ctx.list(Some(next_page_token), None, None).await?, }; - if current_page.key_versions.is_empty() { - break; - } - assert!( current_page.key_versions.len() < vss_arbitrary_page_size_max as usize, "Page size exceeds the maximum allowed size" ); all_key_versions.extend(current_page.key_versions); - next_page_token = current_page.next_page_token; + match current_page.next_page_token { + Some(token) if !token.is_empty() => next_page_token = Some(token), + _ => break, + } } assert_eq!(all_key_versions.len(), total_kv_objects as usize); diff --git a/rust/api/src/types.rs b/rust/api/src/types.rs index c4b79c39..954afe94 100644 --- a/rust/api/src/types.rs +++ b/rust/api/src/types.rs @@ -212,7 +212,7 @@ pub struct ListKeyVersionsRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListKeyVersionsResponse { - /// Fetched keys and versions. + /// Fetched keys and versions, ordered by creation time (newest first). /// Even though this API reuses the `KeyValue` struct, the `value` sub-field will not be set by the server. #[prost(message, repeated, tag = "1")] pub key_versions: ::prost::alloc::vec::Vec, @@ -220,6 +220,8 @@ pub struct ListKeyVersionsResponse { /// Use this value to query for next-page of paginated `ListKeyVersions` operation, by specifying /// this value as the `page_token` in the next request. /// + /// Following AIP-158 (): + /// /// If `next_page_token` is empty (""), then the "last page" of results has been processed and /// there is no more data to be retrieved. /// diff --git a/rust/impls/src/migrations.rs b/rust/impls/src/migrations.rs index a39981ba..0a8fa16d 100644 --- a/rust/impls/src/migrations.rs +++ b/rust/impls/src/migrations.rs @@ -35,6 +35,8 @@ pub(crate) const MIGRATIONS: &[&str] = &[ PRIMARY KEY (user_token, store_id, key) );", "ALTER TABLE vss_db DROP CONSTRAINT IF EXISTS vss_db_store_id_check;", + "ALTER TABLE vss_db ADD COLUMN IF NOT EXISTS sort_order BIGSERIAL NOT NULL UNIQUE;", + "CREATE INDEX IF NOT EXISTS idx_vss_db_sort_order ON vss_db (user_token, store_id, sort_order DESC) INCLUDE (key, version);", ]; #[cfg(test)] pub(crate) const DUMMY_MIGRATION: &str = "SELECT 1 WHERE FALSE;"; diff --git a/rust/impls/src/postgres/sql/v0_create_vss_db.sql b/rust/impls/src/postgres/sql/v0_create_vss_db.sql index 5842dc41..a105e7cf 100644 --- a/rust/impls/src/postgres/sql/v0_create_vss_db.sql +++ b/rust/impls/src/postgres/sql/v0_create_vss_db.sql @@ -6,5 +6,6 @@ CREATE TABLE vss_db ( version bigint NOT NULL, created_at TIMESTAMP WITH TIME ZONE, last_updated_at TIMESTAMP WITH TIME ZONE, + sort_order BIGSERIAL NOT NULL UNIQUE, PRIMARY KEY (user_token, store_id, key) ); diff --git a/rust/impls/src/postgres_store.rs b/rust/impls/src/postgres_store.rs index daaf437f..1e2ef68a 100644 --- a/rust/impls/src/postgres_store.rs +++ b/rust/impls/src/postgres_store.rs @@ -33,6 +33,30 @@ pub(crate) struct VssDbRecord { const KEY_COLUMN: &str = "key"; const VALUE_COLUMN: &str = "value"; const VERSION_COLUMN: &str = "version"; +const SORT_ORDER_COLUMN: &str = "sort_order"; + +const CURRENT_PAGE_TOKEN_VERSION: char = '0'; + +/// Page token is encoded as a version byte and the `sort_order` value, separated by `:`. +/// Example: `0:12345`. +fn encode_page_token(sort_order: i64) -> String { + format!("{}:{}", CURRENT_PAGE_TOKEN_VERSION, sort_order) +} + +fn decode_page_token(token: &str) -> Result { + let invalid = || VssError::InvalidRequestError("Invalid page token".to_string()); + let mut parts = token.splitn(2, ':'); + let version = parts.next().ok_or_else(invalid)?; + let sort_order_str = parts.next().ok_or_else(invalid)?; + if version.len() != 1 || version.chars().next() != Some(CURRENT_PAGE_TOKEN_VERSION) { + return Err(invalid()); + } + let sort_order = sort_order_str.parse::().map_err(|_| invalid())?; + if sort_order < 0 { + return Err(invalid()); + } + Ok(sort_order) +} /// The maximum number of key versions that can be returned in a single page. /// @@ -659,25 +683,61 @@ where global_version = Some(get_response.value.unwrap().version); } + // When page_size is 0, we can decide how many items to return. + // Honor the 0 and just give the `global_version`. + if page_size == 0 { + return Ok(ListKeyVersionsResponse { + key_versions: vec![], + next_page_token: Some(String::new()), + global_version, + }); + } + let limit = min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as i64; + // Fetch one extra to determine if there are more pages. + let fetch_limit = limit + 1; let conn = self.pool.get().await?; - let stmt = "SELECT key, version FROM vss_db WHERE user_token = $1 AND store_id = $2 AND key > $3 AND key LIKE $4 ORDER BY key LIMIT $5"; - let key_like = format!("{}%", key_prefix.as_deref().unwrap_or_default()); - let page_token_param = page_token.as_deref().unwrap_or_default(); - let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = - vec![&user_token, &store_id, &page_token_param, &key_like, &limit]; - let rows = conn - .query(stmt, ¶ms) - .await - .map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))?; + let rows = if let Some(ref token) = page_token { + let page_sort_order = decode_page_token(token)?; + let stmt = "SELECT key, version, sort_order FROM vss_db WHERE user_token = $1 AND store_id = $2 AND sort_order < $3 AND key LIKE $4 AND key != $5 ORDER BY sort_order DESC LIMIT $6"; + let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![ + &user_token, + &store_id, + &page_sort_order, + &key_like, + &GLOBAL_VERSION_KEY, + &fetch_limit, + ]; + conn.query(stmt, ¶ms) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))? + } else { + let stmt = "SELECT key, version, sort_order FROM vss_db WHERE user_token = $1 AND store_id = $2 AND key LIKE $3 AND key != $4 ORDER BY sort_order DESC LIMIT $5"; + let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = + vec![&user_token, &store_id, &key_like, &GLOBAL_VERSION_KEY, &fetch_limit]; + conn.query(stmt, ¶ms) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))? + }; + + let limit_usize = limit as usize; + let has_more = rows.len() > limit_usize; + + let next_page_token = if has_more { + let last = &rows[limit_usize - 1]; + let last_sort_order: i64 = last.get(SORT_ORDER_COLUMN); + Some(encode_page_token(last_sort_order)) + } else { + Some(String::new()) + }; let key_versions: Vec<_> = rows .iter() - .filter(|&row| row.get::<&str, &str>(KEY_COLUMN) != GLOBAL_VERSION_KEY) + .take(limit_usize) .map(|row| KeyValue { key: row.get(KEY_COLUMN), value: Bytes::new(), @@ -685,22 +745,22 @@ where }) .collect(); - let mut next_page_token = Some("".to_string()); - if !key_versions.is_empty() { - next_page_token = key_versions.get(key_versions.len() - 1).map(|kv| kv.key.to_string()); - } - Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version }) } } #[cfg(test)] mod tests { - use super::{drop_database, DUMMY_MIGRATION, MIGRATIONS}; + use super::{ + decode_page_token, drop_database, encode_page_token, CURRENT_PAGE_TOKEN_VERSION, + DUMMY_MIGRATION, MIGRATIONS, + }; use crate::postgres_store::PostgresPlaintextBackend; use api::define_kv_store_tests; use api::kv_store::KvStore; - use api::types::{DeleteObjectRequest, GetObjectRequest, KeyValue, PutObjectRequest}; + use api::types::{ + DeleteObjectRequest, GetObjectRequest, KeyValue, ListKeyVersionsRequest, PutObjectRequest, + }; use bytes::Bytes; use tokio::sync::OnceCell; @@ -713,6 +773,28 @@ mod tests { static START: OnceCell<()> = OnceCell::const_new(); + async fn put_test_key( + store: &PostgresPlaintextBackend, user_token: &str, store_id: &str, key: &str, + global_version: Option, + ) { + store + .put( + user_token.to_string(), + PutObjectRequest { + store_id: store_id.to_string(), + global_version, + transaction_items: vec![KeyValue { + key: key.to_string(), + value: Bytes::from_static(b"v1"), + version: 0, + }], + delete_items: vec![], + }, + ) + .await + .unwrap(); + } + define_kv_store_tests!(PostgresKvStoreTest, PostgresPlaintextBackend, { let vss_db = "postgres_kv_store_tests"; START @@ -884,4 +966,240 @@ mod tests { drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap(); } + + #[tokio::test] + async fn list_orders_by_sort_order_desc() { + let vss_db = "list_orders_by_sort_order_desc"; + let _ = drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await; + + { + let store = + PostgresPlaintextBackend::new(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db).await.unwrap(); + let (start, end) = store.migrate_vss_database(MIGRATIONS).await.unwrap(); + assert_eq!(start, MIGRATIONS_START); + assert_eq!(end, MIGRATIONS_END); + + for key in ["c_key", "a_key", "b_key"] { + put_test_key(&store, "token", "store", key, None).await; + } + + let resp = store + .list_key_versions( + "token".to_string(), + ListKeyVersionsRequest { + store_id: "store".to_string(), + page_token: None, + page_size: None, + key_prefix: None, + }, + ) + .await + .unwrap(); + + let keys: Vec<&str> = resp.key_versions.iter().map(|kv| kv.key.as_str()).collect(); + assert_eq!(keys, vec!["b_key", "a_key", "c_key"]); + + let mut all_keys = Vec::new(); + let mut token = None; + loop { + let page = store + .list_key_versions( + "token".to_string(), + ListKeyVersionsRequest { + store_id: "store".to_string(), + page_token: token, + page_size: Some(1), + key_prefix: None, + }, + ) + .await + .unwrap(); + for kv in &page.key_versions { + all_keys.push(kv.key.clone()); + } + match page.next_page_token { + Some(ref t) if !t.is_empty() => token = page.next_page_token, + _ => break, + } + } + assert_eq!(all_keys, vec!["b_key", "a_key", "c_key"]); + } + + drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap(); + } + + #[tokio::test] + async fn list_zero_page_size_should_return_only_global_version() { + let vss_db = "list_zero_page_size_should_return_only_global_version"; + let _ = drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await; + + { + let store = + PostgresPlaintextBackend::new(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db).await.unwrap(); + let (start, end) = store.migrate_vss_database(MIGRATIONS).await.unwrap(); + assert_eq!(start, MIGRATIONS_START); + assert_eq!(end, MIGRATIONS_END); + + for i in 0..3 { + put_test_key(&store, "token", "store", &format!("k{}", i), Some(i)).await; + } + + let resp = store + .list_key_versions( + "token".to_string(), + ListKeyVersionsRequest { + store_id: "store".to_string(), + page_token: None, + page_size: Some(0), + key_prefix: None, + }, + ) + .await + .unwrap(); + + assert!(resp.key_versions.is_empty()); + assert_eq!(resp.global_version, Some(3)); + assert_eq!(resp.next_page_token.filter(|t| !t.is_empty()), None); + } + + drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap(); + } + + #[tokio::test] + async fn list_should_return_empty_page_token_when_exact_fit() { + let vss_db = "list_should_return_empty_page_token_when_exact_fit"; + let _ = drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await; + + { + let store = + PostgresPlaintextBackend::new(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db).await.unwrap(); + let (start, end) = store.migrate_vss_database(MIGRATIONS).await.unwrap(); + assert_eq!(start, MIGRATIONS_START); + assert_eq!(end, MIGRATIONS_END); + + for i in 0..5 { + put_test_key(&store, "token", "store", &format!("k{}", i), Some(i)).await; + } + + let resp = store + .list_key_versions( + "token".to_string(), + ListKeyVersionsRequest { + store_id: "store".to_string(), + page_token: None, + page_size: Some(5), + key_prefix: None, + }, + ) + .await + .unwrap(); + + assert_eq!(resp.key_versions.len(), 5); + assert_eq!(resp.global_version, Some(5)); + // can be empty string or None to signify end of items + assert_eq!(resp.next_page_token.filter(|t| !t.is_empty()), None); + } + + drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap(); + } + + #[tokio::test] + async fn list_should_return_empty_page_token_on_last_non_empty_page() { + let vss_db = "list_should_return_empty_page_token_on_last_non_empty_page"; + let _ = drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await; + + { + let store = + PostgresPlaintextBackend::new(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db).await.unwrap(); + let (start, end) = store.migrate_vss_database(MIGRATIONS).await.unwrap(); + assert_eq!(start, MIGRATIONS_START); + assert_eq!(end, MIGRATIONS_END); + + for i in 0..6 { + put_test_key(&store, "token", "store", &format!("k{}", i), Some(i)).await; + } + + let first_page = store + .list_key_versions( + "token".to_string(), + ListKeyVersionsRequest { + store_id: "store".to_string(), + page_token: None, + page_size: Some(5), + key_prefix: None, + }, + ) + .await + .unwrap(); + + assert_eq!(first_page.key_versions.len(), 5); + assert_eq!(first_page.global_version, Some(6)); + assert!(first_page.next_page_token.as_ref().is_some_and(|token| !token.is_empty())); + + let second_page = store + .list_key_versions( + "token".to_string(), + ListKeyVersionsRequest { + store_id: "store".to_string(), + page_token: first_page.next_page_token, + page_size: Some(5), + key_prefix: None, + }, + ) + .await + .unwrap(); + + assert_eq!(second_page.key_versions.len(), 1); + assert_eq!(second_page.next_page_token.filter(|t| !t.is_empty()), None); + assert!(second_page.global_version.is_none()); + } + + drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap(); + } + + #[test] + fn page_token_roundtrips() { + let token = encode_page_token(12345); + assert_eq!(decode_page_token(&token).unwrap(), 12345); + } + + #[test] + fn page_token_has_expected_format() { + let token = encode_page_token(42); + assert_eq!(token, format!("{}:42", CURRENT_PAGE_TOKEN_VERSION)); + } + + #[test] + fn page_token_rejects_empty_input() { + assert!(decode_page_token("").is_err()); + } + + #[test] + fn page_token_rejects_unknown_version() { + for bad_version in ['1', '9', 'a'] { + assert_ne!(bad_version, CURRENT_PAGE_TOKEN_VERSION); + let token = format!("{}:12345", bad_version); + assert!(decode_page_token(&token).is_err()); + } + } + + #[test] + fn page_token_rejects_missing_version_separator() { + let token = format!("{}12345", CURRENT_PAGE_TOKEN_VERSION); + assert!(decode_page_token(&token).is_err()); + } + + #[test] + fn page_token_rejects_non_numeric_sort_order() { + let token = format!("{}:abc", CURRENT_PAGE_TOKEN_VERSION); + assert!(decode_page_token(&token).is_err()); + } + + #[test] + fn page_token_rejects_negative_sort_order() { + for bad in ["-1", "-67", &i64::MIN.to_string()] { + let token = format!("{}:{}", CURRENT_PAGE_TOKEN_VERSION, bad); + assert!(decode_page_token(&token).is_err()); + } + } }