Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,12 +1230,20 @@ where
}

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
log_trace!(logger, "Persisting LiquidityManager...");
let fut = async {
liquidity_manager.get_lm().persist().await.map_err(|e| {
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
e
})
liquidity_manager
.get_lm()
.persist()
.await
.map(|did_persist| {
if did_persist {
log_trace!(logger, "Persisted LiquidityManager.");
}
})
.map_err(|e| {
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
e
})
};
futures.set_e(Box::pin(fut));
}
Expand Down
6 changes: 3 additions & 3 deletions lightning-liquidity/src/events/event_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ where
EventQueueNotifierGuard(self)
}

pub async fn persist(&self) -> Result<(), lightning::io::Error> {
pub async fn persist(&self) -> Result<bool, lightning::io::Error> {
let fut = {
let mut state_lock = self.state.lock().unwrap();

if !state_lock.needs_persist {
return Ok(());
return Ok(false);
}

state_lock.needs_persist = false;
Expand All @@ -153,7 +153,7 @@ where
e
})?;

Ok(())
Ok(true)
}
}

Expand Down
9 changes: 6 additions & 3 deletions lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1782,15 +1782,16 @@ where
})
}

pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
pub(crate) async fn persist(&self) -> Result<bool, lightning::io::Error> {
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
// introduce some batching to upper-bound the number of requests inflight at any given
// time.
let mut did_persist = false;

if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
// If we're not the first event processor to get here, just return early, the increment
// we just did will be treated as "go around again" at the end.
return Ok(());
return Ok(did_persist);
}

loop {
Expand All @@ -1816,6 +1817,7 @@ where
for counterparty_node_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&counterparty_node_id));
self.persist_peer_state(counterparty_node_id).await?;
did_persist = true;
}

for counterparty_node_id in need_remove {
Expand Down Expand Up @@ -1850,6 +1852,7 @@ where
}
if let Some(future) = future_opt {
future.await?;
did_persist = true;
} else {
self.persist_peer_state(counterparty_node_id).await?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: did_persist is not set to true here. When a peer was initially flagged for removal but is no longer prunable (it got new state while we were persisting), needs_persist is forced to true on line 1845, and persist_peer_state here will actually write to the KV store (it checks needs_persist, finds it true, clears it, and persists). So real persistence happens, but the return value won't reflect it.

Low severity since it only affects logging, but the return value is inaccurate.

Suggested change
self.persist_peer_state(counterparty_node_id).await?;
self.persist_peer_state(counterparty_node_id).await?;
did_persist = true;

}
Expand All @@ -1864,7 +1867,7 @@ where
break;
}

Ok(())
Ok(did_persist)
}

pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
Expand Down
10 changes: 7 additions & 3 deletions lightning-liquidity/src/lsps5/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,17 @@ where
})
}

pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
pub(crate) async fn persist(&self) -> Result<bool, lightning::io::Error> {
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
// introduce some batching to upper-bound the number of requests inflight at any given
// time.

let mut did_persist = false;

if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
// If we're not the first event processor to get here, just return early, the increment
// we just did will be treated as "go around again" at the end.
return Ok(());
return Ok(did_persist);
}

loop {
Expand All @@ -277,6 +279,7 @@ where
for client_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&client_id));
self.persist_peer_state(client_id).await?;
did_persist = true;
}

for client_id in need_remove {
Expand Down Expand Up @@ -311,6 +314,7 @@ where
}
if let Some(future) = future_opt {
future.await?;
did_persist = true;
} else {
self.persist_peer_state(client_id).await?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same bug as in lsps2: did_persist is not set to true when persist_peer_state actually writes data in the need_remove fallback path. When a peer was initially flagged for removal but is no longer prunable, needs_persist is forced to true on line 307, and persist_peer_state here will actually persist. The return value will be inaccurate.

Suggested change
self.persist_peer_state(client_id).await?;
self.persist_peer_state(client_id).await?;
did_persist = true;

}
Expand All @@ -325,7 +329,7 @@ where
break;
}

Ok(())
Ok(did_persist)
}

fn check_prune_stale_webhooks<'a>(
Expand Down
20 changes: 13 additions & 7 deletions lightning-liquidity/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,23 +670,27 @@ where
self.pending_events.get_and_clear_pending_events()
}

/// Persists the state of the service handlers towards the given [`KVStore`] implementation.
/// Persists the state of the service handlers towards the given [`KVStore`] implementation if
/// needed.
///
/// Returns `true` if it persisted sevice handler data.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "sevice" → "service"

///
/// This will be regularly called by LDK's background processor if necessary and only needs to
/// be called manually if it's not utilized.
pub async fn persist(&self) -> Result<(), lightning::io::Error> {
pub async fn persist(&self) -> Result<bool, lightning::io::Error> {
// TODO: We should eventually persist in parallel.
self.pending_events.persist().await?;
let mut did_persist = false;
did_persist |= self.pending_events.persist().await?;

if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
lsps2_service_handler.persist().await?;
did_persist |= lsps2_service_handler.persist().await?;
}

if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
lsps5_service_handler.persist().await?;
did_persist |= lsps5_service_handler.persist().await?;
}

Ok(())
Ok(did_persist)
}

fn handle_lsps_message(
Expand Down Expand Up @@ -1285,8 +1289,10 @@ where

/// Persists the state of the service handlers towards the given [`KVStoreSync`] implementation.
///
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same typo: "sevice" → "service"

/// Returns `true` if it persisted sevice handler data.
///
/// Wraps [`LiquidityManager::persist`].
pub fn persist(&self) -> Result<(), lightning::io::Error> {
pub fn persist(&self) -> Result<bool, lightning::io::Error> {
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match Box::pin(self.inner.persist()).as_mut().poll(&mut ctx) {
Expand Down