diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 31e519b9f57..cdf1b2e5aa3 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1230,12 +1230,20 @@ where } if let Some(liquidity_manager) = liquidity_manager.as_ref() { - log_trace!(logger, "Persisting LiquidityManager..."); let fut = async { - liquidity_manager.get_lm().persist().await.map_err(|e| { - log_error!(logger, "Persisting LiquidityManager failed: {}", e); - e - }) + liquidity_manager + .get_lm() + .persist() + .await + .map(|did_persist| { + if did_persist { + log_trace!(logger, "Persisted LiquidityManager."); + } + }) + .map_err(|e| { + log_error!(logger, "Persisting LiquidityManager failed: {}", e); + e + }) }; futures.set_e(Box::pin(fut)); } diff --git a/lightning-liquidity/src/events/event_queue.rs b/lightning-liquidity/src/events/event_queue.rs index cd1162cee31..0d6e3a0ec54 100644 --- a/lightning-liquidity/src/events/event_queue.rs +++ b/lightning-liquidity/src/events/event_queue.rs @@ -129,12 +129,12 @@ where EventQueueNotifierGuard(self) } - pub async fn persist(&self) -> Result<(), lightning::io::Error> { + pub async fn persist(&self) -> Result { let fut = { let mut state_lock = self.state.lock().unwrap(); if !state_lock.needs_persist { - return Ok(()); + return Ok(false); } state_lock.needs_persist = false; @@ -153,7 +153,7 @@ where e })?; - Ok(()) + Ok(true) } } diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index dda9922686d..82037653780 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -1782,15 +1782,16 @@ where }) } - pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> { + pub(crate) async fn persist(&self) -> Result { // TODO: We should eventually persist in parallel, however, when we do, we probably want to // introduce some batching to upper-bound the number of requests inflight at any given // time. + let mut did_persist = false; if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { // If we're not the first event processor to get here, just return early, the increment // we just did will be treated as "go around again" at the end. - return Ok(()); + return Ok(did_persist); } loop { @@ -1816,6 +1817,7 @@ where for counterparty_node_id in need_persist.into_iter() { debug_assert!(!need_remove.contains(&counterparty_node_id)); self.persist_peer_state(counterparty_node_id).await?; + did_persist = true; } for counterparty_node_id in need_remove { @@ -1850,6 +1852,7 @@ where } if let Some(future) = future_opt { future.await?; + did_persist = true; } else { self.persist_peer_state(counterparty_node_id).await?; } @@ -1864,7 +1867,7 @@ where break; } - Ok(()) + Ok(did_persist) } pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) { diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 8b1f0ec70cb..adf3da2baef 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -244,15 +244,17 @@ where }) } - pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> { + pub(crate) async fn persist(&self) -> Result { // TODO: We should eventually persist in parallel, however, when we do, we probably want to // introduce some batching to upper-bound the number of requests inflight at any given // time. + let mut did_persist = false; + if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { // If we're not the first event processor to get here, just return early, the increment // we just did will be treated as "go around again" at the end. - return Ok(()); + return Ok(did_persist); } loop { @@ -277,6 +279,7 @@ where for client_id in need_persist.into_iter() { debug_assert!(!need_remove.contains(&client_id)); self.persist_peer_state(client_id).await?; + did_persist = true; } for client_id in need_remove { @@ -311,6 +314,7 @@ where } if let Some(future) = future_opt { future.await?; + did_persist = true; } else { self.persist_peer_state(client_id).await?; } @@ -325,7 +329,7 @@ where break; } - Ok(()) + Ok(did_persist) } fn check_prune_stale_webhooks<'a>( diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 5d95d32d540..de84ee20897 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -670,23 +670,27 @@ where self.pending_events.get_and_clear_pending_events() } - /// Persists the state of the service handlers towards the given [`KVStore`] implementation. + /// Persists the state of the service handlers towards the given [`KVStore`] implementation if + /// needed. + /// + /// Returns `true` if it persisted sevice handler data. /// /// This will be regularly called by LDK's background processor if necessary and only needs to /// be called manually if it's not utilized. - pub async fn persist(&self) -> Result<(), lightning::io::Error> { + pub async fn persist(&self) -> Result { // TODO: We should eventually persist in parallel. - self.pending_events.persist().await?; + let mut did_persist = false; + did_persist |= self.pending_events.persist().await?; if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() { - lsps2_service_handler.persist().await?; + did_persist |= lsps2_service_handler.persist().await?; } if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() { - lsps5_service_handler.persist().await?; + did_persist |= lsps5_service_handler.persist().await?; } - Ok(()) + Ok(did_persist) } fn handle_lsps_message( @@ -1285,8 +1289,10 @@ where /// Persists the state of the service handlers towards the given [`KVStoreSync`] implementation. /// + /// Returns `true` if it persisted sevice handler data. + /// /// Wraps [`LiquidityManager::persist`]. - pub fn persist(&self) -> Result<(), lightning::io::Error> { + pub fn persist(&self) -> Result { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match Box::pin(self.inner.persist()).as_mut().poll(&mut ctx) {