-
Notifications
You must be signed in to change notification settings - Fork 461
LSPS2 service: Treat replayed HTLCIntercepted events idempotently
#4656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -644,6 +644,26 @@ impl PeerState { | |
| }); | ||
| } | ||
|
|
||
| fn remove_terminal_channel_state(&mut self, channel_id: ChannelId) -> Option<u64> { | ||
| let intercept_scid = self.intercept_scid_by_channel_id.get(&channel_id).copied()?; | ||
| let should_remove = self | ||
| .outbound_channels_by_intercept_scid | ||
| .get(&intercept_scid) | ||
| .and_then(|entry| entry.get_channel_id()) | ||
| .is_some_and(|existing_channel_id| existing_channel_id == channel_id); | ||
|
|
||
| if !should_remove { | ||
| return None; | ||
| } | ||
|
|
||
| self.outbound_channels_by_intercept_scid.remove(&intercept_scid); | ||
| self.intercept_scid_by_channel_id.remove(&channel_id); | ||
| self.intercept_scid_by_user_channel_id.retain(|_, iscid| *iscid != intercept_scid); | ||
| self.needs_persist = true; | ||
|
|
||
| Some(intercept_scid) | ||
| } | ||
|
|
||
| fn pending_requests_and_channels(&self) -> usize { | ||
| let pending_requests = self.pending_requests.len(); | ||
| let pending_outbound_channels = self | ||
|
|
@@ -1252,6 +1272,43 @@ where | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Forward [`Event::ChannelClosed`] event parameter into this function. | ||
| /// | ||
| /// Will prune terminal JIT channel state once the corresponding channel has closed. | ||
| /// | ||
| /// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed | ||
| pub async fn channel_closed(&self, channel_id: ChannelId) -> Result<(), APIError> { | ||
| let counterparty_node_id = | ||
| self.peer_by_channel_id.read().unwrap().get(&channel_id).copied(); | ||
| let Some(counterparty_node_id) = counterparty_node_id else { | ||
| return Ok(()); | ||
| }; | ||
|
|
||
| let removed_intercept_scid = { | ||
| let outer_state_lock = self.per_peer_state.read().unwrap(); | ||
| match outer_state_lock.get(&counterparty_node_id) { | ||
| Some(inner_state_lock) => { | ||
| let mut peer_state = inner_state_lock.lock().unwrap(); | ||
| peer_state.remove_terminal_channel_state(channel_id) | ||
| }, | ||
| None => None, | ||
| } | ||
| }; | ||
|
|
||
| if let Some(intercept_scid) = removed_intercept_scid { | ||
| self.peer_by_intercept_scid.write().unwrap().remove(&intercept_scid); | ||
| self.peer_by_channel_id.write().unwrap().remove(&channel_id); | ||
| self.persist().await.map_err(|e| APIError::APIMisuseError { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a question here. As I understand, |
||
| err: format!( | ||
| "Failed to persist peer state after channel {} closed: {}", | ||
| channel_id, e | ||
| ), | ||
| })?; | ||
|
Comment on lines
+1301
to
+1306
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see we use this error elsewhere when persistence fails. Do we expect the caller to retry? |
||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Abandons a pending JIT‐open flow for `user_channel_id`, removing all local state. | ||
| /// | ||
| /// This removes the intercept SCID, any outbound channel state, and associated | ||
|
|
@@ -2270,6 +2327,25 @@ where | |
| } | ||
| } | ||
|
|
||
| /// Forward [`Event::ChannelClosed`] event parameter into this function. | ||
| /// | ||
| /// Wraps [`LSPS2ServiceHandler::channel_closed`]. | ||
| /// | ||
| /// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed | ||
| pub fn channel_closed(&self, channel_id: ChannelId) -> Result<(), APIError> { | ||
| let mut fut = pin!(self.inner.channel_closed(channel_id)); | ||
|
|
||
| let mut waker = dummy_waker(); | ||
| let mut ctx = task::Context::from_waker(&mut waker); | ||
| match fut.as_mut().poll(&mut ctx) { | ||
| task::Poll::Ready(result) => result, | ||
| task::Poll::Pending => { | ||
| // In a sync context, we can't wait for the future to complete. | ||
| unreachable!("Should not be pending in a sync context"); | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| /// Wraps [`LSPS2ServiceHandler::channel_needs_manual_broadcast`]. | ||
| pub fn channel_needs_manual_broadcast( | ||
| &self, user_channel_id: u128, counterparty_node_id: &PublicKey, | ||
|
|
@@ -2361,6 +2437,8 @@ mod tests { | |
|
|
||
| use bitcoin::{absolute::LockTime, transaction::Version}; | ||
| use core::str::FromStr; | ||
| use lightning::io::Cursor; | ||
| use lightning::util::ser::{Readable, Writeable}; | ||
|
|
||
| const MAX_VALUE_MSAT: u64 = 21_000_000_0000_0000_000; | ||
|
|
||
|
|
@@ -2764,6 +2842,118 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn replayed_intercepted_htlc_after_persist_is_idempotent() { | ||
| let payment_size_msat = Some(500_000_000); | ||
| let opening_fee_params = LSPS2OpeningFeeParams { | ||
| min_fee_msat: 10_000_000, | ||
| proportional: 10_000, | ||
| valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), | ||
| min_lifetime: 4032, | ||
| max_client_to_self_delay: 2016, | ||
| min_payment_size_msat: 10_000_000, | ||
| max_payment_size_msat: 1_000_000_000, | ||
| promise: "ignore".to_string(), | ||
| }; | ||
| let intercept_scid = 42; | ||
| let user_channel_id = 43; | ||
| let htlc = InterceptedHTLC { | ||
| intercept_id: InterceptId([1; 32]), | ||
| expected_outbound_amount_msat: 500_000_000, | ||
| payment_hash: PaymentHash([2; 32]), | ||
| }; | ||
|
|
||
| let mut jit_channel = | ||
| OutboundJITChannel::new(payment_size_msat, opening_fee_params, user_channel_id, false); | ||
| assert!(matches!( | ||
| jit_channel.htlc_intercepted(htlc).unwrap(), | ||
| Some(HTLCInterceptedAction::OpenChannel(_)) | ||
| )); | ||
|
|
||
| let mut peer_state = PeerState::new(); | ||
| peer_state.intercept_scid_by_user_channel_id.insert(user_channel_id, intercept_scid); | ||
| peer_state.insert_outbound_channel(intercept_scid, jit_channel); | ||
|
|
||
| let encoded_peer_state = peer_state.encode(); | ||
| let mut decoded_peer_state = PeerState::read(&mut Cursor::new(encoded_peer_state)).unwrap(); | ||
| let decoded_jit_channel = decoded_peer_state | ||
| .outbound_channels_by_intercept_scid | ||
| .get_mut(&intercept_scid) | ||
| .unwrap(); | ||
|
|
||
| assert!(decoded_jit_channel.htlc_intercepted(htlc).unwrap().is_none()); | ||
|
|
||
| let ForwardPaymentAction(_, fee_payment) = | ||
| decoded_jit_channel.channel_ready(ChannelId([3; 32])).unwrap(); | ||
| assert_eq!(fee_payment.htlcs, vec![htlc]); | ||
| } | ||
|
|
||
| #[test] | ||
| fn removes_terminal_state_for_closed_channel() { | ||
| let opening_fee_params = LSPS2OpeningFeeParams { | ||
| min_fee_msat: 10_000_000, | ||
| proportional: 10_000, | ||
| valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), | ||
| min_lifetime: 4032, | ||
| max_client_to_self_delay: 2016, | ||
| min_payment_size_msat: 10_000_000, | ||
| max_payment_size_msat: 1_000_000_000, | ||
| promise: "ignore".to_string(), | ||
| }; | ||
| let stale_intercept_scid = 42; | ||
| let stale_user_channel_id = 43; | ||
| let stale_channel_id = ChannelId([44; 32]); | ||
| let live_intercept_scid = 45; | ||
| let live_user_channel_id = 46; | ||
| let live_channel_id = ChannelId([47; 32]); | ||
|
|
||
| let mut stale_jit_channel = | ||
| OutboundJITChannel::new(None, opening_fee_params.clone(), stale_user_channel_id, false); | ||
| stale_jit_channel.state = | ||
| OutboundJITChannelState::PaymentForwarded { channel_id: stale_channel_id }; | ||
| let mut live_jit_channel = | ||
| OutboundJITChannel::new(None, opening_fee_params, live_user_channel_id, false); | ||
| live_jit_channel.state = | ||
| OutboundJITChannelState::PaymentForwarded { channel_id: live_channel_id }; | ||
|
|
||
| let mut peer_state = PeerState::new(); | ||
| peer_state.insert_outbound_channel(stale_intercept_scid, stale_jit_channel); | ||
| peer_state.insert_outbound_channel(live_intercept_scid, live_jit_channel); | ||
| peer_state | ||
| .intercept_scid_by_user_channel_id | ||
| .insert(stale_user_channel_id, stale_intercept_scid); | ||
| peer_state | ||
| .intercept_scid_by_user_channel_id | ||
| .insert(live_user_channel_id, live_intercept_scid); | ||
| peer_state.intercept_scid_by_channel_id.insert(stale_channel_id, stale_intercept_scid); | ||
| peer_state.intercept_scid_by_channel_id.insert(live_channel_id, live_intercept_scid); | ||
| peer_state.needs_persist = false; | ||
|
|
||
| assert_eq!( | ||
| peer_state.remove_terminal_channel_state(stale_channel_id), | ||
| Some(stale_intercept_scid) | ||
| ); | ||
| assert!(!peer_state | ||
| .outbound_channels_by_intercept_scid | ||
| .contains_key(&stale_intercept_scid)); | ||
| assert!(peer_state.outbound_channels_by_intercept_scid.contains_key(&live_intercept_scid)); | ||
| assert!(!peer_state.intercept_scid_by_user_channel_id.contains_key(&stale_user_channel_id)); | ||
| assert_eq!( | ||
| peer_state.intercept_scid_by_user_channel_id.get(&live_user_channel_id), | ||
| Some(&live_intercept_scid) | ||
| ); | ||
| assert!(!peer_state.intercept_scid_by_channel_id.contains_key(&stale_channel_id)); | ||
| assert_eq!( | ||
| peer_state.intercept_scid_by_channel_id.get(&live_channel_id), | ||
| Some(&live_intercept_scid) | ||
| ); | ||
| assert!(peer_state.needs_persist); | ||
|
|
||
| peer_state.needs_persist = false; | ||
| assert_eq!(peer_state.remove_terminal_channel_state(stale_channel_id), None); | ||
| assert!(!peer_state.needs_persist); | ||
| } | ||
|
|
||
| #[test] | ||
| fn broadcast_not_allowed_after_non_paying_fee_payment_claimed() { | ||
| let min_fee_msat: u64 = 12345; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -256,13 +256,15 @@ where | |
| /// - [`Event::ChannelReady`] to [`LSPS2ServiceHandler::channel_ready`] | ||
| /// - [`Event::HTLCHandlingFailed`] to [`LSPS2ServiceHandler::htlc_handling_failed`] | ||
| /// - [`Event::PaymentForwarded`] to [`LSPS2ServiceHandler::payment_forwarded`] | ||
| /// - [`Event::ChannelClosed`] to [`LSPS2ServiceHandler::channel_closed`] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a pending changelog for this? |
||
| /// | ||
| /// [`PeerManager`]: lightning::ln::peer_handler::PeerManager | ||
| /// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler | ||
| /// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted | ||
| /// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady | ||
| /// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed | ||
| /// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded | ||
| /// [`Event::ChannelClosed`]: lightning::events::Event::ChannelClosed | ||
| pub struct LiquidityManager< | ||
| ES: EntropySource + Clone, | ||
| NS: NodeSigner + Clone, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elsewhere we return
APIError::APIMisuseError. Should we do the same here?