diff --git a/lightning-persister/src/fs_store/common.rs b/lightning-persister/src/fs_store/common.rs index 6eaa0dbc455..885f806b344 100644 --- a/lightning-persister/src/fs_store/common.rs +++ b/lightning-persister/src/fs_store/common.rs @@ -470,6 +470,94 @@ impl FilesystemStoreInner { Ok(keys) } + + fn list_all_keys( + &self, use_empty_ns_dir: bool, + ) -> Result, lightning::io::Error> { + let prefixed_dest = &self.data_dir; + if !prefixed_dest.exists() { + return Ok(Vec::new()); + } + + let mut keys = Vec::new(); + + 'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? { + let primary_entry = primary_entry?; + let primary_path = primary_entry.path(); + if dir_entry_is_store_artifact(&primary_path) { + continue 'primary_loop; + } + + if dir_entry_is_key(&primary_entry)? { + let primary_namespace = String::new(); + let secondary_namespace = String::new(); + let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest, false)?; + keys.push((primary_namespace, secondary_namespace, key)); + continue 'primary_loop; + } + + // The primary_entry is actually also a directory. + 'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? { + let secondary_entry = secondary_entry?; + let secondary_path = secondary_entry.path(); + if dir_entry_is_store_artifact(&secondary_path) { + continue 'secondary_loop; + } + + if dir_entry_is_key(&secondary_entry)? { + let primary_namespace = get_key_from_dir_entry_path( + &primary_path, + prefixed_dest, + use_empty_ns_dir, + )?; + let secondary_namespace = String::new(); + let key = get_key_from_dir_entry_path(&secondary_path, &primary_path, false)?; + keys.push((primary_namespace, secondary_namespace, key)); + continue 'secondary_loop; + } + + // The secondary_entry is actually also a directory. + for tertiary_entry in fs::read_dir(&secondary_path)? { + let tertiary_entry = tertiary_entry?; + let tertiary_path = tertiary_entry.path(); + if dir_entry_is_store_artifact(&tertiary_path) { + continue; + } + + if dir_entry_is_key(&tertiary_entry)? { + let primary_namespace = get_key_from_dir_entry_path( + &primary_path, + prefixed_dest, + use_empty_ns_dir, + )?; + let secondary_namespace = get_key_from_dir_entry_path( + &secondary_path, + &primary_path, + use_empty_ns_dir, + )?; + let key = + get_key_from_dir_entry_path(&tertiary_path, &secondary_path, false)?; + keys.push((primary_namespace, secondary_namespace, key)); + } else { + debug_assert!( + false, + "Failed to list keys of path {}: only two levels of namespaces are supported", + PrintableString(tertiary_path.to_str().unwrap_or_default()) + ); + let msg = format!( + "Failed to list keys of path {}: only two levels of namespaces are supported", + PrintableString(tertiary_path.to_str().unwrap_or_default()) + ); + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::Other, + msg, + )); + } + } + } + } + Ok(keys) + } } impl FilesystemStoreState { @@ -640,92 +728,26 @@ impl FilesystemStoreState { } } - pub(crate) fn list_all_keys_impl( + #[cfg(feature = "tokio")] + pub(crate) fn list_all_keys_async( &self, use_empty_ns_dir: bool, - ) -> Result, lightning::io::Error> { - let prefixed_dest = &self.inner.data_dir; - if !prefixed_dest.exists() { - return Ok(Vec::new()); - } - - let mut keys = Vec::new(); - - 'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? { - let primary_entry = primary_entry?; - let primary_path = primary_entry.path(); - if dir_entry_is_store_artifact(&primary_path) { - continue 'primary_loop; - } - - if dir_entry_is_key(&primary_entry)? { - let primary_namespace = String::new(); - let secondary_namespace = String::new(); - let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest, false)?; - keys.push((primary_namespace, secondary_namespace, key)); - continue 'primary_loop; - } - - // The primary_entry is actually also a directory. - 'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? { - let secondary_entry = secondary_entry?; - let secondary_path = secondary_entry.path(); - if dir_entry_is_store_artifact(&secondary_path) { - continue 'secondary_loop; - } - - if dir_entry_is_key(&secondary_entry)? { - let primary_namespace = get_key_from_dir_entry_path( - &primary_path, - prefixed_dest, - use_empty_ns_dir, - )?; - let secondary_namespace = String::new(); - let key = get_key_from_dir_entry_path(&secondary_path, &primary_path, false)?; - keys.push((primary_namespace, secondary_namespace, key)); - continue 'secondary_loop; - } - - // The secondary_entry is actually also a directory. - for tertiary_entry in fs::read_dir(&secondary_path)? { - let tertiary_entry = tertiary_entry?; - let tertiary_path = tertiary_entry.path(); - if dir_entry_is_store_artifact(&tertiary_path) { - continue; - } + ) -> impl Future, lightning::io::Error>> + 'static + Send + { + let this = Arc::clone(&self.inner); - if dir_entry_is_key(&tertiary_entry)? { - let primary_namespace = get_key_from_dir_entry_path( - &primary_path, - prefixed_dest, - use_empty_ns_dir, - )?; - let secondary_namespace = get_key_from_dir_entry_path( - &secondary_path, - &primary_path, - use_empty_ns_dir, - )?; - let key = - get_key_from_dir_entry_path(&tertiary_path, &secondary_path, false)?; - keys.push((primary_namespace, secondary_namespace, key)); - } else { - debug_assert!( - false, - "Failed to list keys of path {}: only two levels of namespaces are supported", - PrintableString(tertiary_path.to_str().unwrap_or_default()) - ); - let msg = format!( - "Failed to list keys of path {}: only two levels of namespaces are supported", - PrintableString(tertiary_path.to_str().unwrap_or_default()) - ); - return Err(lightning::io::Error::new( - lightning::io::ErrorKind::Other, - msg, - )); - } - } - } + async move { + tokio::task::spawn_blocking(move || this.list_all_keys(use_empty_ns_dir)) + .await + .unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) } - Ok(keys) + } + + pub(crate) fn list_all_keys_impl( + &self, use_empty_ns_dir: bool, + ) -> Result, lightning::io::Error> { + self.inner.list_all_keys(use_empty_ns_dir) } } diff --git a/lightning-persister/src/fs_store/v1.rs b/lightning-persister/src/fs_store/v1.rs index 7f47c59a362..4f24d8d961f 100644 --- a/lightning-persister/src/fs_store/v1.rs +++ b/lightning-persister/src/fs_store/v1.rs @@ -1,7 +1,7 @@ //! Objects related to [`FilesystemStore`] live here. use crate::fs_store::common::FilesystemStoreState; -use lightning::util::persist::{KVStoreSync, MigratableKVStore}; +use lightning::util::persist::{KVStoreSync, MigratableKVStoreSync}; use std::path::PathBuf; @@ -88,15 +88,27 @@ impl KVStore for FilesystemStore { } } -impl MigratableKVStore for FilesystemStore { +impl MigratableKVStoreSync for FilesystemStore { fn list_all_keys(&self) -> Result, lightning::io::Error> { self.state.list_all_keys_impl(false) } } +#[cfg(feature = "tokio")] +impl lightning::util::persist::MigratableKVStore for FilesystemStore { + fn list_all_keys( + &self, + ) -> impl Future, lightning::io::Error>> + 'static + Send + { + self.state.list_all_keys_async(false) + } +} + #[cfg(test)] mod tests { use super::*; + #[cfg(feature = "tokio")] + use crate::test_utils::do_test_data_migration_async; use crate::test_utils::{ do_read_write_remove_list_persist, do_test_data_migration, do_test_store, }; @@ -221,6 +233,20 @@ mod tests { do_test_data_migration(&mut source_store, &mut target_store); } + #[cfg(feature = "tokio")] + #[tokio::test] + async fn test_data_migration_async() { + let mut source_temp_path = std::env::temp_dir(); + source_temp_path.push("test_data_migration_source_async"); + let source_store = FilesystemStore::new(source_temp_path); + + let mut target_temp_path = std::env::temp_dir(); + target_temp_path.push("test_data_migration_target_async"); + let target_store = FilesystemStore::new(target_temp_path); + + do_test_data_migration_async(&source_store, &target_store).await; + } + #[test] fn test_if_monitors_is_not_dir() { let store = FilesystemStore::new("test_monitors_is_not_dir".into()); diff --git a/lightning-persister/src/fs_store/v2.rs b/lightning-persister/src/fs_store/v2.rs index 6154d22d35c..fe1fdf60c7a 100644 --- a/lightning-persister/src/fs_store/v2.rs +++ b/lightning-persister/src/fs_store/v2.rs @@ -4,7 +4,7 @@ use crate::fs_store::common::{ }; use lightning::util::persist::{ - KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStoreSync, PaginatedListResponse, + KVStoreSync, MigratableKVStoreSync, PageToken, PaginatedKVStoreSync, PaginatedListResponse, }; use std::fs; @@ -315,12 +315,22 @@ impl PaginatedKVStore for FilesystemStoreV2 { } } -impl MigratableKVStore for FilesystemStoreV2 { +impl MigratableKVStoreSync for FilesystemStoreV2 { fn list_all_keys(&self) -> Result, lightning::io::Error> { self.inner.list_all_keys_impl(true) } } +#[cfg(feature = "tokio")] +impl lightning::util::persist::MigratableKVStore for FilesystemStoreV2 { + fn list_all_keys( + &self, + ) -> impl Future, lightning::io::Error>> + 'static + Send + { + self.inner.list_all_keys_async(true) + } +} + /// Formats a page token from mtime (millis since epoch) and key. pub(crate) fn format_page_token(mtime_millis: u64, key: &str) -> String { format!("{mtime_millis:016}:{key}") @@ -351,6 +361,8 @@ pub(crate) fn parse_page_token(token: &str) -> lightning::io::Result<(u64, Strin mod tests { use super::*; use crate::fs_store::common::EMPTY_NAMESPACE_DIR; + #[cfg(feature = "tokio")] + use crate::test_utils::do_test_data_migration_async; use crate::test_utils::{ do_read_write_remove_list_persist, do_test_data_migration, do_test_store, }; @@ -445,6 +457,20 @@ mod tests { do_test_data_migration(&mut source_store, &mut target_store); } + #[cfg(feature = "tokio")] + #[tokio::test] + async fn test_data_migration_async() { + let mut source_temp_path = std::env::temp_dir(); + source_temp_path.push("test_data_migration_source_async_v2"); + let source_store = FilesystemStoreV2::new(source_temp_path).unwrap(); + + let mut target_temp_path = std::env::temp_dir(); + target_temp_path.push("test_data_migration_target_async_v2"); + let target_store = FilesystemStoreV2::new(target_temp_path).unwrap(); + + do_test_data_migration_async(&source_store, &target_store).await; + } + #[test] fn test_filesystem_store_v2() { // Create the nodes, giving them FilesystemStoreV2s for data stores. diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs index b8f3eb0bd99..34e0619b34a 100644 --- a/lightning-persister/src/test_utils.rs +++ b/lightning-persister/src/test_utils.rs @@ -1,7 +1,7 @@ use lightning::events::ClosureReason; use lightning::ln::functional_test_utils::*; use lightning::util::persist::{ - migrate_kv_store_data, read_channel_monitors, KVStoreSync, MigratableKVStore, + migrate_kv_store_data, read_channel_monitors, KVStoreSync, MigratableKVStoreSync, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, }; use lightning::util::test_utils; @@ -59,15 +59,11 @@ pub(crate) fn do_read_write_remove_list_persist( assert_eq!(listed_keys.len(), 0); } -pub(crate) fn do_test_data_migration( - source_store: &mut S, target_store: &mut T, -) { - // We fill the source with some bogus keys. - let dummy_data = vec![42u8; 32]; +fn data_migration_test_keys() -> Vec<(String, String, String)> { let num_primary_namespaces = 3; let num_secondary_namespaces = 3; let num_keys = 3; - let mut expected_keys = Vec::new(); + let mut keys = Vec::new(); for i in 0..num_primary_namespaces { let primary_namespace = if i == 0 { String::new() @@ -83,13 +79,25 @@ pub(crate) fn do_test_data_migration for k in 0..num_keys { let key = format!("testkey{}", KVSTORE_NAMESPACE_KEY_ALPHABET.chars().nth(k).unwrap()); - source_store - .write(&primary_namespace, &secondary_namespace, &key, dummy_data.clone()) - .unwrap(); - expected_keys.push((primary_namespace.clone(), secondary_namespace.clone(), key)); + keys.push((primary_namespace.clone(), secondary_namespace.clone(), key)); } } } + + keys +} + +pub(crate) fn do_test_data_migration( + source_store: &mut S, target_store: &mut T, +) { + // We fill the source with some bogus keys. + let dummy_data = vec![42u8; 32]; + let mut expected_keys = data_migration_test_keys(); + for (primary_namespace, secondary_namespace, key) in &expected_keys { + source_store + .write(primary_namespace, secondary_namespace, key, dummy_data.clone()) + .unwrap(); + } expected_keys.sort(); expected_keys.dedup(); @@ -108,6 +116,47 @@ pub(crate) fn do_test_data_migration } } +#[cfg(feature = "tokio")] +pub(crate) async fn do_test_data_migration_async< + S: lightning::util::persist::MigratableKVStore, + T: lightning::util::persist::MigratableKVStore, +>( + source_store: &S, target_store: &T, +) { + use lightning::util::persist::{migrate_kv_store_data_async, KVStore, MigratableKVStore}; + + // We fill the source with some bogus keys. + let dummy_data = vec![42u8; 32]; + let mut expected_keys = data_migration_test_keys(); + for (primary_namespace, secondary_namespace, key) in &expected_keys { + KVStore::write( + source_store, + primary_namespace, + secondary_namespace, + key, + dummy_data.clone(), + ) + .await + .unwrap(); + } + expected_keys.sort(); + expected_keys.dedup(); + + let mut source_list = MigratableKVStore::list_all_keys(source_store).await.unwrap(); + source_list.sort(); + assert_eq!(source_list, expected_keys); + + migrate_kv_store_data_async(source_store, target_store).await.unwrap(); + + let mut target_list = MigratableKVStore::list_all_keys(target_store).await.unwrap(); + target_list.sort(); + assert_eq!(target_list, expected_keys); + + for (p, s, k) in expected_keys.iter() { + assert_eq!(KVStore::read(target_store, p, s, k).await.unwrap(), dummy_data.clone()); + } +} + // Integration-test the given KVStore implementation. Test relaying a few payments and check that // the persisted data is updated the appropriate number of times. pub(crate) fn do_test_store(store_0: &K, store_1: &K) { diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 95d6032e130..bf8a0cf8342 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -554,9 +554,9 @@ pub trait PaginatedKVStore: KVStore { ) -> impl Future> + 'static + MaybeSend; } -/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] +/// Provides additional interface methods that are required for [`KVStoreSync`]-to-[`KVStoreSync`] /// data migration. -pub trait MigratableKVStore: KVStoreSync { +pub trait MigratableKVStoreSync: KVStoreSync { /// Returns *all* known keys as a list of `primary_namespace`, `secondary_namespace`, `key` tuples. /// /// This is useful for migrating data from [`KVStoreSync`] implementation to [`KVStoreSync`] @@ -567,6 +567,128 @@ pub trait MigratableKVStore: KVStoreSync { fn list_all_keys(&self) -> Result, io::Error>; } +/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] +/// data migration. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MigratableKVStore: KVStore { + /// Returns *all* known keys as a list of `primary_namespace`, `secondary_namespace`, `key` tuples. + /// + /// This is useful for migrating data from [`KVStore`] implementation to [`KVStore`] + /// implementation. + /// + /// Must exhaustively return all entries known to the store to ensure no data is missed, but + /// may return the items in arbitrary order. + fn list_all_keys( + &self, + ) -> impl Future, io::Error>> + 'static + MaybeSend; +} + +impl MigratableKVStore for K +where + K: Deref, + K::Target: MigratableKVStore, +{ + fn list_all_keys( + &self, + ) -> impl Future, io::Error>> + 'static + MaybeSend + { + self.deref().list_all_keys() + } +} + +/// This is not exported to bindings users as async is only supported in Rust. +impl MigratableKVStore for KVStoreSyncWrapper +where + K::Target: MigratableKVStoreSync, +{ + fn list_all_keys( + &self, + ) -> impl Future, io::Error>> + 'static + MaybeSend + { + let res = self.0.list_all_keys(); + + async move { res } + } +} + +type MigrationKey = (String, String, String); + +trait MigrationKVStore { + fn list_all_keys( + &self, + ) -> impl Future, io::Error>> + MaybeSend; + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + MaybeSend; + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + MaybeSend; +} + +struct MigrationKVStoreSyncAdapter<'a, K: ?Sized>(&'a K); + +impl MigrationKVStore for MigrationKVStoreSyncAdapter<'_, K> { + fn list_all_keys( + &self, + ) -> impl Future, io::Error>> + MaybeSend { + let res = MigratableKVStoreSync::list_all_keys(self.0); + + async move { res } + } + + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + MaybeSend { + let res = KVStoreSync::read(self.0, primary_namespace, secondary_namespace, key); + + async move { res } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + MaybeSend { + let res = KVStoreSync::write(self.0, primary_namespace, secondary_namespace, key, buf); + + async move { res } + } +} + +struct MigrationKVStoreAsyncAdapter<'a, K: ?Sized>(&'a K); + +impl MigrationKVStore for MigrationKVStoreAsyncAdapter<'_, K> { + fn list_all_keys( + &self, + ) -> impl Future, io::Error>> + MaybeSend { + MigratableKVStore::list_all_keys(self.0) + } + + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + MaybeSend { + KVStore::read(self.0, primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + MaybeSend { + KVStore::write(self.0, primary_namespace, secondary_namespace, key, buf) + } +} + +async fn migrate_kv_store_data_inner( + source_store: S, target_store: T, +) -> Result<(), io::Error> { + let keys_to_migrate = source_store.list_all_keys().await?; + + for (primary_namespace, secondary_namespace, key) in &keys_to_migrate { + let data = source_store.read(primary_namespace, secondary_namespace, key).await?; + target_store.write(primary_namespace, secondary_namespace, key, data).await?; + } + + Ok(()) +} + /// Migrates all data from one store to another. /// /// This operation assumes that `target_store` is empty, i.e., any data present under copied keys @@ -575,17 +697,31 @@ pub trait MigratableKVStore: KVStoreSync { /// /// Will abort and return an error if any IO operation fails. Note that in this case the /// `target_store` might get left in an intermediate state. -pub fn migrate_kv_store_data( +pub fn migrate_kv_store_data( source_store: &mut S, target_store: &mut T, ) -> Result<(), io::Error> { - let keys_to_migrate = source_store.list_all_keys()?; - - for (primary_namespace, secondary_namespace, key) in &keys_to_migrate { - let data = source_store.read(primary_namespace, secondary_namespace, key)?; - target_store.write(primary_namespace, secondary_namespace, key, data)?; - } + poll_sync_future(migrate_kv_store_data_inner( + MigrationKVStoreSyncAdapter(source_store), + MigrationKVStoreSyncAdapter(target_store), + )) +} - Ok(()) +/// Migrates all data from one asynchronous store to another. +/// +/// This operation assumes that `target_store` is empty, i.e., any data present under copied keys +/// might get overriden. User must ensure `source_store` is not modified during operation, +/// otherwise no consistency guarantees can be given. +/// +/// Will abort and return an error if any IO operation fails. Note that in this case the +/// `target_store` might get left in an intermediate state. +pub async fn migrate_kv_store_data_async( + source_store: &S, target_store: &T, +) -> Result<(), io::Error> { + migrate_kv_store_data_inner( + MigrationKVStoreAsyncAdapter(source_store), + MigrationKVStoreAsyncAdapter(target_store), + ) + .await } impl Persist for K {