From 557d9065c6a063388d5344fb9f5fd7468df7e5fc Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 18 Mar 2026 23:11:20 -0500 Subject: [PATCH 1/7] Add rbf_channel API for fee-bumping pending splices When a splice is already pending, the user needs a way to replace its funding transaction at a higher feerate. This adds rbf_channel() to handle that case and guards splice_in/splice_out against being called while a pending splice exists, directing users to rbf_channel instead. Also fixes signing for RBF replacements, which requires accessing outputs spent by unconfirmed transactions. Co-Authored-By: Claude Opus 4.6 (1M context) --- bindings/ldk_node.udl | 2 + src/lib.rs | 71 ++++++++++++++++++- src/wallet/mod.rs | 8 ++- tests/integration_tests_rust.rs | 118 +++++++++++++++++++++++++++++++- 4 files changed, 194 insertions(+), 5 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7e9e61f5d..8a92b4f0c 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -125,6 +125,8 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] + void rbf_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void force_close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, string? reason); diff --git a/src/lib.rs b/src/lib.rs index 614be098b..de18021ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1595,7 +1595,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending" + "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1716,7 +1716,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending" + "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1753,6 +1753,73 @@ impl Node { } } + /// Replace a pending splice's funding transaction with a higher-feerate version. + /// + /// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior + /// contribution is reused when possible; otherwise, coin selection is re-run. + /// + /// # Experimental API + /// + /// This API is experimental and may change in the future. + pub fn rbf_channel( + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, + ) -> Result<(), Error> { + let open_channels = + self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); + if let Some(channel_details) = + open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0) + { + let min_feerate = + self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding); + let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2); + + let funding_template = self + .channel_manager + .splice_channel(&channel_details.channel_id, &counterparty_node_id) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + })?; + + if funding_template.min_rbf_feerate().is_none() { + log_error!(self.logger, "Failed to RBF channel: no pending splice to replace"); + return Err(Error::ChannelSplicingFailed); + } + + let contribution = self + .runtime + .block_on(funding_template.rbf_prior_contribution( + None, + max_feerate, + Arc::clone(&self.wallet), + )) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {}", e); + Error::ChannelSplicingFailed + })?; + + self.channel_manager + .funding_contributed( + &channel_details.channel_id, + &counterparty_node_id, + contribution, + None, + ) + .map_err(|e| { + log_error!(self.logger, "Failed to RBF channel: {:?}", e); + Error::ChannelSplicingFailed + }) + } else { + log_error!( + self.logger, + "Channel not found for user_channel_id {} and counterparty {}", + user_channel_id, + counterparty_node_id + ); + Err(Error::ChannelSplicingFailed) + } + } + /// Manually sync the LDK and BDK wallets with the current chain state and update the fee rate /// cache. /// diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 13b1f384f..db212cd24 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -5,6 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use std::collections::HashMap; use std::future::Future; use std::ops::Deref; use std::str::FromStr; @@ -1082,9 +1083,12 @@ impl Wallet { let mut psbt = Psbt::from_unsigned_tx(unsigned_tx).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT: {}", e); })?; + // Use list_output rather than get_utxo to include outputs spent by unconfirmed + // transactions (e.g., a prior splice being replaced via RBF). + let mut wallet_outputs: HashMap = + locked_wallet.list_output().map(|o| (o.outpoint, o)).collect(); for (i, txin) in psbt.unsigned_tx.input.iter().enumerate() { - if let Some(utxo) = locked_wallet.get_utxo(txin.previous_output) { - debug_assert!(!utxo.is_spent); + if let Some(utxo) = wallet_outputs.remove(&txin.previous_output) { psbt.inputs[i] = locked_wallet.get_psbt_input(utxo, None, true).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT input: {}", e); })?; diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 1ea6c4584..a3388f595 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -26,7 +26,7 @@ use common::{ setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, wait_for_tx, TestChainSource, TestConfig, TestStoreType, TestSyncStore, }; -use electrsd::corepc_node::Node as BitcoinD; +use electrsd::corepc_node::{self, Node as BitcoinD}; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; @@ -1142,6 +1142,122 @@ async fn splice_channel() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn rbf_splice_channel() { + // Use a custom bitcoind config with a lower incrementalrelayfee so that the +25 sat/kwu + // (0.1 sat/vB) RBF feerate bump satisfies BIP125's absolute fee increase requirement. + let bitcoind_exe = std::env::var("BITCOIND_EXE") + .ok() + .or_else(|| corepc_node::downloaded_exe_path().ok()) + .expect( + "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature", + ); + let mut bitcoind_conf = corepc_node::Conf::default(); + bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); + bitcoind_conf.args.push("-incrementalrelayfee=0.00000100"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); + + let electrs_exe = std::env::var("ELECTRS_EXE") + .ok() + .or_else(electrsd::downloaded_exe_path) + .expect("you need to provide env var ELECTRS_EXE or specify an electrsd version feature"); + let mut electrsd_conf = electrsd::Conf::default(); + electrsd_conf.http_enabled = true; + electrsd_conf.network = "regtest"; + let electrsd = ElectrsD::with_conf(electrs_exe, &bitcoind, &electrsd_conf).unwrap(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // rbf_channel should fail when there's no pending splice + assert_eq!( + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()), + Err(NodeError::ChannelSplicingFailed), + ); + + // Initiate a splice-in to create a pending splice + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + // splice_in should fail when there's a pending splice (RBF guard) + assert_eq!( + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // splice_out should fail when there's a pending splice (RBF guard) + let address = node_a.onchain_payment().new_address().unwrap(); + assert_eq!( + node_a.splice_out(&user_channel_id_a, node_b.node_id(), &address, 100_000), + Err(NodeError::ChannelSplicingFailed), + ); + + // rbf_channel should succeed when there's a pending splice + node_b.rbf_channel(&user_channel_id_b, node_a.node_id()).unwrap(); + + let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + + // Wait for the RBF transaction to replace the original in the mempool + wait_for_tx(&electrsd.client, rbf_txo.txid).await; + + // Mine blocks and confirm the RBF splice + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify the RBF transaction is the one that locked, not the original + match node_a.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_b.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_a.event_handled().unwrap(); + }, + ref e => panic!("node_a got unexpected event: {:?}", e), + } + match node_b.next_event_async().await { + Event::ChannelReady { funding_txo, counterparty_node_id, .. } => { + assert_eq!(counterparty_node_id, Some(node_a.node_id())); + assert_eq!(funding_txo, Some(rbf_txo)); + node_b.event_handled().unwrap(); + }, + ref e => panic!("node_b got unexpected event: {:?}", e), + } + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); From 9cbdb063e372c5038e2f4f4dee3d39cd100e9b0b Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Fri, 5 Jun 2026 14:28:08 -0500 Subject: [PATCH 2/7] f - Rename rbf_channel to bump_channel_funding_fee Co-Authored-By: Claude --- bindings/ldk_node.udl | 2 +- src/lib.rs | 6 +++--- tests/integration_tests_rust.rs | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 8a92b4f0c..8198cfb34 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -125,7 +125,7 @@ interface Node { [Throws=NodeError] void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats); [Throws=NodeError] - void rbf_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); + void bump_channel_funding_fee([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] diff --git a/src/lib.rs b/src/lib.rs index de18021ae..f9edc32ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1595,7 +1595,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1716,7 +1716,7 @@ impl Node { if funding_template.prior_contribution().is_some() { log_error!( self.logger, - "Failed to splice channel: a prior splice contribution is pending; use rbf_channel instead" + "Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead" ); return Err(Error::ChannelSplicingFailed); } @@ -1761,7 +1761,7 @@ impl Node { /// # Experimental API /// /// This API is experimental and may change in the future. - pub fn rbf_channel( + pub fn bump_channel_funding_fee( &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, ) -> Result<(), Error> { let open_channels = diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index a3388f595..c2f19115e 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1194,9 +1194,9 @@ async fn rbf_splice_channel() { let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); - // rbf_channel should fail when there's no pending splice + // bump_channel_funding_fee should fail when there's no pending splice assert_eq!( - node_b.rbf_channel(&user_channel_id_b, node_a.node_id()), + node_b.bump_channel_funding_fee(&user_channel_id_b, node_a.node_id()), Err(NodeError::ChannelSplicingFailed), ); @@ -1219,8 +1219,8 @@ async fn rbf_splice_channel() { Err(NodeError::ChannelSplicingFailed), ); - // rbf_channel should succeed when there's a pending splice - node_b.rbf_channel(&user_channel_id_b, node_a.node_id()).unwrap(); + // bump_channel_funding_fee should succeed when there's a pending splice + node_b.bump_channel_funding_fee(&user_channel_id_b, node_a.node_id()).unwrap(); let rbf_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); expect_splice_negotiated_event!(node_b, node_a.node_id()); From b91de2386f1525e9bf01ba6c33c4a1f892478177 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Mon, 20 Apr 2026 19:15:37 -0500 Subject: [PATCH 3/7] Tie funding payment status transitions to Lightning lifecycle events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Channel-opening and splice transactions transition to Succeeded when ChannelReady fires, not after ANTI_REORG_DELAY confirmations. This matches the point at which the Lightning layer considers the channel usable: a zero-conf channel graduates as soon as its counterparty signals, and a high-conf channel waits however many confirmations the peer requires, rather than always stopping at six. For splice RBF, the payment records whichever candidate actually confirmed, with that candidate's amount and this node's share of the fee — not the fee-estimate used for weight at coin-selection time, and not the whole-tx fee for a multi-contributor splice. A channel closure whose funding or splice never confirmed discards its payment record instead of leaving it pending forever. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/builder.rs | 2 + src/event.rs | 22 ++ src/payment/pending_payment_store.rs | 45 ++- src/tx_broadcaster.rs | 35 +- src/wallet/mod.rs | 571 +++++++++++++++++++++++++-- tests/integration_tests_rust.rs | 49 +++ 6 files changed, 693 insertions(+), 31 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 03ded494f..1d32a8a12 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1624,6 +1624,8 @@ fn build_with_store_internal( Arc::clone(&pending_payment_store), )); + tx_broadcaster.set_wallet(Arc::downgrade(&wallet)); + // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { log_error!(logger, "Failed to get current time: {}", e); diff --git a/src/event.rs b/src/event.rs index 7d23be99a..304737269 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1580,6 +1580,18 @@ where ); } + if let Err(e) = + self.wallet.handle_channel_ready(channel_id, funding_txo.map(|txo| txo.txid)) + { + log_error!( + self.logger, + "Failed to graduate funding payment on ChannelReady for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) @@ -1609,6 +1621,16 @@ where } => { log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason); + if let Err(e) = self.wallet.handle_channel_closed(channel_id) { + log_error!( + self.logger, + "Failed to handle ChannelClosed for channel {}: {:?}", + channel_id, + e, + ); + return Err(ReplayEvent()); + } + let event = Event::ChannelClosed { channel_id, user_channel_id: UserChannelId(user_channel_id), diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index eb72f89ec..16837d70c 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. use bitcoin::Txid; +use lightning::chain::chaininterface::FundingCandidate; use lightning::impl_writeable_tlv_based; use lightning::ln::channelmanager::PaymentId; @@ -13,6 +14,19 @@ use crate::data_store::{StorableObject, StorableObjectUpdate}; use crate::payment::store::PaymentDetailsUpdate; use crate::payment::PaymentDetails; +/// Marks an on-chain payment as belonging to an interactive-funding negotiation. The +/// last entry in `candidates` is the currently-broadcast tx; earlier entries are RBF +/// predecessors that may still confirm if reorgs intervene. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FundingDetails { + /// Every negotiated candidate, oldest first. + pub candidates: Vec, +} + +impl_writeable_tlv_based!(FundingDetails, { + (0, candidates, optional_vec), +}); + /// Represents a pending payment #[derive(Clone, Debug, PartialEq, Eq)] pub struct PendingPaymentDetails { @@ -20,11 +34,24 @@ pub struct PendingPaymentDetails { pub details: PaymentDetails, /// Transaction IDs that have replaced or conflict with this payment. pub conflicting_txids: Vec, + /// Set when the payment's transaction is an interactive-funding broadcast (channel + /// open or splice). The record transitions to [`PaymentStatus::Succeeded`] on + /// `ChannelReady` instead of after [`ANTI_REORG_DELAY`] confirmations. + /// + /// [`PaymentStatus::Succeeded`]: crate::payment::store::PaymentStatus::Succeeded + /// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY + pub funding_details: Option, } impl PendingPaymentDetails { pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { - Self { details, conflicting_txids } + Self { details, conflicting_txids, funding_details: None } + } + + pub(crate) fn with_funding_details( + details: PaymentDetails, conflicting_txids: Vec, funding_details: FundingDetails, + ) -> Self { + Self { details, conflicting_txids, funding_details: Some(funding_details) } } /// Convert to finalized payment for the main payment store @@ -36,6 +63,7 @@ impl PendingPaymentDetails { impl_writeable_tlv_based!(PendingPaymentDetails, { (0, details, required), (2, conflicting_txids, optional_vec), + (4, funding_details, option), }); #[derive(Clone, Debug, PartialEq, Eq)] @@ -43,6 +71,7 @@ pub(crate) struct PendingPaymentDetailsUpdate { pub id: PaymentId, pub payment_update: Option, pub conflicting_txids: Option>, + pub funding_details: Option>, } impl StorableObject for PendingPaymentDetails { @@ -68,6 +97,13 @@ impl StorableObject for PendingPaymentDetails { } } + if let Some(new_funding_details) = update.funding_details { + if self.funding_details != new_funding_details { + self.funding_details = new_funding_details; + updated = true; + } + } + updated } @@ -89,6 +125,11 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { } else { Some(value.conflicting_txids.clone()) }; - Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids } + Self { + id: value.id(), + payment_update: Some(value.details.to_update()), + conflicting_txids, + funding_details: Some(value.funding_details.clone()), + } } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 7084135b0..24abf8f11 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -6,12 +6,14 @@ // accordance with one or both of these licenses. use std::ops::Deref; +use std::sync::{Mutex as StdMutex, Weak}; use bitcoin::Transaction; use lightning::chain::chaininterface::{BroadcasterInterface, TransactionType}; use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; +use crate::types::Wallet; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; @@ -21,6 +23,12 @@ where { queue_sender: mpsc::Sender>, queue_receiver: Mutex>>, + /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts + /// (channel opens and splices) into payment records. Remains `None` while the + /// builder is wiring the node up, during which broadcasts are still forwarded to + /// the queue but no payment record is written. [`Self::set_wallet`] installs the + /// handle once the [`Wallet`] exists. + wallet: StdMutex>>, logger: L, } @@ -30,7 +38,19 @@ where { pub(crate) fn new(logger: L) -> Self { let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE); - Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), logger } + Self { + queue_sender, + queue_receiver: Mutex::new(queue_receiver), + wallet: StdMutex::new(None), + logger, + } + } + + /// Installs the [`Wallet`] handle used to classify funding broadcasts (channel + /// opens and splices) into payment records. Called once the builder has constructed + /// both the broadcaster and the wallet. + pub(crate) fn set_wallet(&self, wallet: Weak) { + *self.wallet.lock().expect("lock") = Some(wallet); } pub(crate) async fn get_broadcast_queue( @@ -45,6 +65,19 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { + let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet { + for (tx, tx_type) in txs { + if let Err(e) = wallet.classify_broadcast(tx, tx_type) { + log_error!( + self.logger, + "Failed to classify broadcast tx {}: {:?}", + tx.compute_txid(), + e, + ); + } + } + } let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index db212cd24..3e4ac41b3 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -33,14 +33,16 @@ use bitcoin::{ WitnessProgram, WitnessVersion, }; use lightning::chain::chaininterface::{ - BroadcasterInterface, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, + BroadcasterInterface, TransactionType, INCREMENTAL_RELAY_FEE_SAT_PER_1000_WEIGHT, }; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::{BlockLocator, ClaimId, Listen}; use lightning::ln::channelmanager::PaymentId; +use lightning::ln::funding::FundingContribution; use lightning::ln::inbound_payment::ExpandedKey; use lightning::ln::msgs::UnsignedGossipMessage; use lightning::ln::script::ShutdownScript; +use lightning::ln::types::ChannelId as LnChannelId; use lightning::sign::{ ChangeDestinationSource, EntropySource, InMemorySigner, KeysManager, NodeSigner, OutputSpender, PeerStorageKey, Recipient, SignerProvider, SpendableOutputDescriptor, @@ -55,6 +57,9 @@ use persist::KVStoreWalletPersister; use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use lightning::chain::chaininterface::{ChannelFunding, FundingCandidate, FundingPurpose}; + +use crate::payment::pending_payment_store::FundingDetails; use crate::payment::store::ConfirmationStatus; use crate::payment::{ PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, @@ -251,18 +256,9 @@ impl Wallet { for event in events { match event { WalletEvent::TxConfirmed { txid, tx, block_time, .. } => { - let cur_height = locked_wallet.latest_checkpoint().height(); - let confirmation_height = block_time.block_id.height; - let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 - { - PaymentStatus::Succeeded - } else { - PaymentStatus::Pending - }; - let confirmation_status = ConfirmationStatus::Confirmed { block_hash: block_time.block_id.hash, - height: confirmation_height, + height: block_time.block_id.height, timestamp: block_time.confirmation_time, }; @@ -270,6 +266,23 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + confirmation_status, + )? { + continue; + } + + let cur_height = locked_wallet.latest_checkpoint().height(); + let confirmation_height = block_time.block_id.height; + let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 + { + PaymentStatus::Succeeded + } else { + PaymentStatus::Pending + }; + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -279,13 +292,12 @@ impl Wallet { confirmation_status, ); - self.payment_store.insert_or_update(payment.clone())?; - if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment)?; + self.persist_pending(pending_payment)?; + } else { + self.payment_store.insert_or_update(payment)?; } }, WalletEvent::ChainTipChanged { new_tip, .. } => { @@ -296,8 +308,11 @@ impl Wallet { "Non-pending payment {:?} found in pending store", p.details.id, ); + // Funding records complete on `ChannelReady`, not after + // `ANTI_REORG_DELAY` confirmations. p.details.status == PaymentStatus::Pending && matches!(p.details.kind, PaymentKind::Onchain { .. }) + && p.funding_details.is_none() }); let mut unconfirmed_outbound_txids: Vec = Vec::new(); @@ -358,6 +373,14 @@ impl Wallet { .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -366,10 +389,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -405,6 +426,15 @@ impl Wallet { let payment_id = self .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + if self.apply_funding_details_status_update( + payment_id, + txid, + ConfirmationStatus::Unconfirmed, + )? { + continue; + } + let payment = self.create_payment_from_tx( locked_wallet, txid, @@ -413,10 +443,8 @@ impl Wallet { PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, ); - let pending_payment = - self.create_pending_payment_from_tx(payment.clone(), Vec::new()); - self.payment_store.insert_or_update(payment)?; - self.pending_payment_store.insert_or_update(pending_payment)?; + let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); + self.persist_pending(pending_payment)?; }, _ => { continue; @@ -1146,6 +1174,41 @@ impl Wallet { Ok(tx) } + /// Computes the amount, fee, and direction of an on-chain payment from the + /// wallet's view of the transaction. Used by [`TransactionBroadcaster`] to + /// describe a single-funded channel-open, for which no [`FundingContribution`] + /// is available. + /// + /// [`TransactionBroadcaster`]: crate::tx_broadcaster::TransactionBroadcaster + /// [`FundingContribution`]: lightning::ln::funding::FundingContribution + pub(crate) fn onchain_payment_fields( + &self, tx: &Transaction, + ) -> (Option, Option, PaymentDirection) { + let locked_wallet = self.inner.lock().expect("lock"); + let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let (sent, received) = locked_wallet.sent_and_received(tx); + let fee_sat = fee.to_sat(); + + let (direction, amount_msat) = if sent > received { + ( + PaymentDirection::Outbound, + Some( + (sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat())) + * 1000, + ), + ) + } else { + ( + PaymentDirection::Inbound, + Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000, + ), + ) + }; + + (amount_msat, Some(fee_sat * 1000), direction) + } + fn create_payment_from_tx( &self, locked_wallet: &PersistedWallet, txid: Txid, payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, @@ -1202,6 +1265,223 @@ impl Wallet { PendingPaymentDetails::new(payment, conflicting_txids) } + /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their + /// respective stores in a fixed order. Callers that need to keep the two stores in + /// sync should always go through this. + fn persist_pending(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + self.payment_store.insert_or_update(pending.details.clone())?; + self.pending_payment_store.insert_or_update(pending)?; + Ok(()) + } + + /// Called on `ChannelReady` to mark a funding payment (channel open or splice) as + /// succeeded. + /// + /// If `funding_txo_txid` matches a candidate other than the currently-active one, + /// that candidate is promoted to active first and the outer [`PaymentDetails`] is + /// updated from its contribution. If no candidate matches (the confirmed funding + /// txid belongs to a broadcast this node didn't contribute to), the pending record + /// is left in place for later handling. + pub(crate) fn handle_channel_ready( + &self, channel_id: LnChannelId, funding_txo_txid: Option, + ) -> Result<(), Error> { + let funding_txo_txid = match funding_txo_txid { + Some(t) => t, + None => return Ok(()), + }; + + let mut pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + let funding_details = match pending.funding_details.clone() { + Some(fd) => fd, + None => return Ok(()), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == funding_txo_txid) + { + Some(c) => c.clone(), + None => { + // Confirmed `funding_txo` wasn't produced by any of our broadcasts. The + // record is left alone; some higher-level flow decides what to do. + log_debug!( + self.logger, + "ChannelReady for channel {}: confirmed funding_txo {} is not one of our candidates", + channel_id, + funding_txo_txid, + ); + return Ok(()); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding record must use PaymentKind::Onchain"); + return Ok(()); + }, + }; + + if old_txid != funding_txo_txid { + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != funding_txo_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + // Preserve the confirmation status already on the record (set by wallet sync if + // it's seen the tx confirm). `ChannelReady` alone doesn't carry block details. + let existing_status = match pending.details.kind { + PaymentKind::Onchain { status, .. } => status, + _ => ConfirmationStatus::Unconfirmed, + }; + pending.details.kind = + PaymentKind::Onchain { txid: funding_txo_txid, status: existing_status }; + + pending.details.status = PaymentStatus::Succeeded; + let payment_id = pending.details.id; + self.payment_store.insert_or_update(pending.details)?; + self.pending_payment_store.remove(&payment_id)?; + + Ok(()) + } + + /// Called on `ChannelClosed`. Removes any funding record (channel open or splice) + /// for `channel_id` whose candidates never reached confirmed — e.g. a funding + /// transaction that never made it on-chain. A record that does reflect a confirmed + /// transaction is left alone and will transition to `Succeeded` normally. + pub(crate) fn handle_channel_closed(&self, channel_id: LnChannelId) -> Result<(), Error> { + let pending = match self + .pending_payment_store + .list_filter(|p| { + p.funding_details + .as_ref() + .map(|fd| record_includes_channel(fd, channel_id)) + .unwrap_or(false) + }) + .into_iter() + .next() + { + Some(p) => p, + None => return Ok(()), + }; + + let is_confirmed = matches!( + pending.details.kind, + PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { .. }, .. } + ); + if is_confirmed { + return Ok(()); + } + + let payment_id = pending.details.id; + self.pending_payment_store.remove(&payment_id)?; + self.payment_store.remove(&payment_id)?; + Ok(()) + } + + /// Updates a funding record's `kind` in response to a wallet-sync event, swapping + /// the active candidate when `event_txid` differs from the current one. + /// + /// Amount, fee, and direction are not recomputed from the wallet's view: they were + /// set at broadcast time from the `FundingContribution` and must persist until + /// `ChannelReady`. + /// + /// Returns `true` when a funding record was updated (so the caller skips the + /// default Onchain create/update path), `false` otherwise. + fn apply_funding_details_status_update( + &self, payment_id: PaymentId, event_txid: Txid, confirmation_status: ConfirmationStatus, + ) -> Result { + // `ChannelReady` may move the payment to the main store before wallet sync + // sees the tx confirm. In that case, update `kind` directly; recomputing from + // the wallet's view would overwrite the per-node fee set at broadcast time. + if let Some(mut existing) = self.payment_store.get(&payment_id) { + if existing.status == PaymentStatus::Succeeded + && matches!(existing.kind, PaymentKind::Onchain { .. }) + && self.pending_payment_store.get(&payment_id).is_none() + { + let needs_update = match existing.kind { + PaymentKind::Onchain { txid, status } => { + txid != event_txid || status != confirmation_status + }, + _ => false, + }; + if needs_update { + existing.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + self.payment_store.insert_or_update(existing)?; + } + return Ok(true); + } + } + + let mut pending = match self.pending_payment_store.get(&payment_id) { + Some(p) => p, + None => return Ok(false), + }; + let funding_details = match pending.funding_details.as_ref() { + Some(fd) => fd, + None => return Ok(false), + }; + + let candidate = match funding_details.candidates.iter().find(|c| c.txid == event_txid) { + Some(c) => c.clone(), + None => { + log_debug!( + self.logger, + "Event txid {} resolved to funding_details payment {} but is not in candidates", + event_txid, + payment_id, + ); + return Ok(false); + }, + }; + + let old_txid = match pending.details.kind { + PaymentKind::Onchain { txid, .. } => txid, + _ => { + debug_assert!(false, "funding_details record must use PaymentKind::Onchain"); + return Ok(false); + }, + }; + + if old_txid != event_txid { + // A different candidate confirmed. Move the previous active txid onto + // `conflicting_txids` and re-derive amount/fee from the new candidate's + // contributions. + if !pending.conflicting_txids.contains(&old_txid) { + pending.conflicting_txids.push(old_txid); + } + pending.conflicting_txids.retain(|t| *t != event_txid); + + let aggregate = aggregate_local_stakes(&candidate); + pending.details.amount_msat = aggregate.amount_msat; + pending.details.fee_paid_msat = aggregate.fee_paid_msat; + } + + pending.details.kind = + PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; + + self.persist_pending(pending)?; + + Ok(true) + } + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { let direct_payment_id = PaymentId(target_txid.to_byte_array()); if self.pending_payment_store.contains_key(&direct_payment_id) { @@ -1213,12 +1493,28 @@ impl Wallet { .list_filter(|p| { matches!(p.details.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid) || p.conflicting_txids.contains(&target_txid) + || p.funding_details + .as_ref() + .map(|fd| fd.candidates.iter().any(|c| c.txid == target_txid)) + .unwrap_or(false) }) .first() { return Some(replaced_details.details.id); } + // Once moved to the main store, a funding payment is still matched by its + // confirmed txid so late wallet events resolve correctly. + if let Some(p) = self + .payment_store + .list_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == target_txid), + ) + .first() + { + return Some(p.id); + } + None } @@ -1417,16 +1713,235 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); - let pending_payment_store = - self.create_pending_payment_from_tx(new_payment.clone(), Vec::new()); - - self.pending_payment_store.insert_or_update(pending_payment_store)?; - self.payment_store.insert_or_update(new_payment)?; + let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); + self.persist_pending(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); Ok(new_txid) } + + pub(crate) fn classify_broadcast( + &self, tx: &Transaction, tx_type: &TransactionType, + ) -> Result<(), Error> { + match tx_type { + TransactionType::Funding { channels } => self.classify_funding(tx, channels), + TransactionType::InteractiveFunding { candidates } => { + self.classify_interactive_funding(tx, candidates) + }, + _ => Ok(()), + } + } + + fn classify_funding( + &self, tx: &Transaction, channels: &[(PublicKey, LnChannelId)], + ) -> Result<(), Error> { + // Batch funding (one transaction funding multiple channels) isn't supported; let + // wallet sync record the payment normally so graduation still runs through + // ANTI_REORG_DELAY. + if channels.len() != 1 { + if channels.len() > 1 { + log_trace!( + self.logger, + "Skipping funding classification for batched broadcast ({} channels)", + channels.len() + ); + } + return Ok(()); + } + + let (counterparty_node_id, channel_id) = channels[0]; + let txid = tx.compute_txid(); + let (amount_msat, fee_paid_msat, direction) = self.onchain_payment_fields(tx); + + let candidate = FundingCandidate { + txid, + channels: vec![ChannelFunding { + counterparty_node_id, + channel_id, + purpose: FundingPurpose::Establishment, + contribution: None, + }], + }; + + let details = PaymentDetails::new( + PaymentId(txid.to_byte_array()), + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + let funding_details = FundingDetails { candidates: vec![candidate] }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded channel-funding broadcast {} for channel {}", + txid, + channel_id, + ); + Ok(()) + } + + fn classify_interactive_funding( + &self, tx: &Transaction, candidates: &[FundingCandidate], + ) -> Result<(), Error> { + // `InteractiveFunding` carries the full negotiated history. The currently-broadcast + // candidate is the last entry; earlier entries are RBF predecessors. + let active = match candidates.last() { + Some(c) => c, + None => return Ok(()), + }; + let first = match candidates.first() { + Some(c) => c, + None => return Ok(()), + }; + + let txid = tx.compute_txid(); + debug_assert_eq!(active.txid, txid, "broadcast tx must match the active candidate"); + + // Aggregate amount/fee/direction across this candidate's channels by summing the + // local-stake contributions. If we didn't contribute on this candidate, leave the + // record to wallet sync — there's nothing for us to track here, and any wallet- + // visible activity (e.g. a counterparty's splice-out paid to our address) is + // better surfaced as a plain on-chain receive. + let aggregate = aggregate_local_stakes(active); + let amount_msat = match aggregate.amount_msat { + Some(amt) => Some(amt), + None => { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no local contribution", + txid, + ); + return Ok(()); + }, + }; + let fee_paid_msat = aggregate.fee_paid_msat; + let direction = aggregate.direction; + + // Skip broadcasts that don't move funds in or out of our on-chain wallet — e.g. + // a splice-out we initiated toward an external address. + let (wallet_amount_msat, _wallet_fee_msat, _wallet_direction) = + self.onchain_payment_fields(tx); + if wallet_amount_msat == Some(0) { + log_trace!( + self.logger, + "Skipping interactive-funding broadcast {}: no wallet-level activity", + txid, + ); + return Ok(()); + } + + // Anchor the PaymentId to the first negotiated candidate so the record stays + // stable across RBF replacements. + let payment_id = PaymentId(first.txid.to_byte_array()); + let candidate_count = candidates.len(); + let active_channel_count = active.channels.len(); + + let details = PaymentDetails::new( + payment_id, + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed }, + amount_msat, + fee_paid_msat, + direction, + PaymentStatus::Pending, + ); + + // Funding records carry their own RBF history in `candidates`; lookup by txid + // (find_payment_by_txid) already searches that, so no separate + // `conflicting_txids` Vec is needed. + let funding_details = FundingDetails { candidates: candidates.to_vec() }; + + let pending = + PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); + + self.persist_pending(pending)?; + log_debug!( + self.logger, + "Recorded interactive-funding broadcast {} ({} candidates, {} channels)", + txid, + candidate_count, + active_channel_count, + ); + Ok(()) + } +} + +/// Returns this node's share of the on-chain fee for a funding transaction (channel +/// open or splice), in millisatoshis. Sourced from the contribution's +/// [`FundingContribution::estimated_fee`], which upstream computes per-contributor. +fn our_actual_fee_msat(contribution: &FundingContribution) -> u64 { + contribution.estimated_fee().to_sat() * 1000 +} + +fn record_includes_channel(details: &FundingDetails, channel_id: LnChannelId) -> bool { + details.candidates.iter().any(|c| c.channels.iter().any(|ch| ch.channel_id == channel_id)) +} + +struct LocalStakeAggregate { + amount_msat: Option, + fee_paid_msat: Option, + direction: PaymentDirection, +} + +/// Aggregates local-stake amount/fee/direction across the channels of a single +/// [`FundingCandidate`]. Each channel's contribution (when present) is treated as +/// local-stake-only, so contributions across channels are summed without +/// double-counting. +fn aggregate_local_stakes(candidate: &FundingCandidate) -> LocalStakeAggregate { + let mut amount_outbound: u64 = 0; + let mut amount_inbound: u64 = 0; + let mut fee: u64 = 0; + let mut have_contribution = false; + for channel in &candidate.channels { + if let Some(c) = channel.contribution.as_ref() { + have_contribution = true; + fee = fee.saturating_add(our_actual_fee_msat(c)); + match contribution_direction(c) { + Some((PaymentDirection::Outbound, amt)) => { + amount_outbound = amount_outbound.saturating_add(amt); + }, + Some((PaymentDirection::Inbound, amt)) => { + amount_inbound = amount_inbound.saturating_add(amt); + }, + None => {}, + } + } + } + if !have_contribution { + return LocalStakeAggregate { + amount_msat: None, + fee_paid_msat: None, + direction: PaymentDirection::Outbound, + }; + } + let (direction, amount_msat) = if amount_outbound >= amount_inbound { + (PaymentDirection::Outbound, amount_outbound.saturating_sub(amount_inbound)) + } else { + (PaymentDirection::Inbound, amount_inbound.saturating_sub(amount_outbound)) + }; + LocalStakeAggregate { amount_msat: Some(amount_msat), fee_paid_msat: Some(fee), direction } +} + +/// Returns this contribution's direction and magnitude in msat, or `None` if it can't +/// be classified as a single inbound or outbound payment. +fn contribution_direction(contribution: &FundingContribution) -> Option<(PaymentDirection, u64)> { + let value_added = contribution.value_added(); + let outputs_total: Amount = contribution.outputs().iter().map(|o| o.value).sum(); + + if value_added > Amount::ZERO && outputs_total == Amount::ZERO { + Some((PaymentDirection::Outbound, value_added.to_sat() * 1000)) + } else if value_added == Amount::ZERO && outputs_total > Amount::ZERO { + Some((PaymentDirection::Inbound, outputs_total.to_sat() * 1000)) + } else { + None + } } impl Listen for Wallet { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index c2f19115e..925f2e4c3 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1227,6 +1227,35 @@ async fn rbf_splice_channel() { assert_ne!(original_txo, rbf_txo, "RBF should produce a different funding txo"); + // After RBF but before confirmation, node_b (the initiator) should have a single + // on-chain payment covering both candidates: id anchored to the first broadcast, + // `kind.txid` pointing at the latest (RBF) candidate, and the original candidate + // recorded as a replaced one on the pending record. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment exists"); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Unconfirmed } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Unconfirmed, got {:?}", other), + } + assert_eq!(payment.status, PaymentStatus::Pending); + // Only one Onchain Pending payment for this splice attempt (not one per candidate). + let splice_payments = node_b.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Onchain { .. }) + && p.status == PaymentStatus::Pending + }); + assert_eq!( + splice_payments.len(), + 1, + "expected exactly one pending Onchain payment for the splice, got {}: {:#?}", + splice_payments.len(), + splice_payments, + ); + } + // Wait for the RBF transaction to replace the original in the mempool wait_for_tx(&electrsd.client, rbf_txo.txid).await; @@ -1254,6 +1283,26 @@ async fn rbf_splice_channel() { ref e => panic!("node_b got unexpected event: {:?}", e), } + // After `ChannelReady` we should have graduated to `Succeeded` — even though + // `ANTI_REORG_DELAY` may not have elapsed yet — and the `kind.txid` should + // reflect the winning RBF candidate, with `fee_paid_msat` matching our + // per-node `FundingContribution::estimated_fee` for that candidate. + { + let payment_id = PaymentId(original_txo.txid.to_byte_array()); + let payment = node_b.payment(&payment_id).expect("splice payment graduated"); + assert_eq!(payment.status, PaymentStatus::Succeeded); + match payment.kind { + PaymentKind::Onchain { txid, status: ConfirmationStatus::Confirmed { .. } } => { + assert_eq!(txid, rbf_txo.txid); + }, + ref other => panic!("expected Onchain Confirmed, got {:?}", other), + } + assert!( + payment.fee_paid_msat.is_some(), + "splice payment should carry a fee from its FundingContribution", + ); + } + node_a.stop().unwrap(); node_b.stop().unwrap(); } From 248ad8f3986519c52ba9910693d173620f0947c7 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 3 Jun 2026 15:56:16 -0500 Subject: [PATCH 4/7] f - Preserve funding details when a splice candidate is replaced The TxReplaced wallet event rebuilt the payment record from scratch, dropping its funding details. When a wallet sync fell between a splice broadcast and its RBF, the replacement of the original candidate cleared those details, so the payment no longer graduated to Succeeded on ChannelReady. Funding records are managed by the classify path and the Lightning lifecycle handlers, so leave them untouched on replacement. Co-Authored-By: Claude --- src/wallet/mod.rs | 12 ++++++++++++ tests/integration_tests_rust.rs | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 3e4ac41b3..958325077 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -402,6 +402,18 @@ impl Wallet { continue; }; + // Funding records (channel opens and splices) track their active candidate and + // status through `classify_*` and the Lightning lifecycle handlers. A replaced + // candidate is expected during splice RBF and must not reset the record or drop + // its funding details, so leave such records untouched here. + if self + .pending_payment_store + .get(&payment_id) + .map_or(false, |p| p.funding_details.is_some()) + { + continue; + } + // Collect all conflict txids let mut conflict_txids: Vec = conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect(); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 925f2e4c3..a1b25e4d6 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1206,6 +1206,24 @@ async fn rbf_splice_channel() { let original_txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); expect_splice_negotiated_event!(node_b, node_a.node_id()); + // Sync so the original splice candidate is recorded as a canonical wallet transaction before + // the RBF below replaces it. This makes the post-RBF sync observe the original candidate being + // replaced (a `WalletEvent::TxReplaced`), which must not drop the payment's funding details. + // + // This is a best-effort regression guard rather than a deterministic one: with the + // funding-details-preservation fix in place the splice still graduates correctly, but without + // it the resulting inconsistency only surfaces intermittently (via a timing-dependent + // `debug_assert` in the chain-tip handler), so a reverted fix is caught probabilistically. + // + // TODO: Make this deterministic. If funding payments carried a durable classification in the + // main payment store (e.g. a `tx_type` on `PaymentKind::Onchain`, as in + // lightningdevkit/ldk-node#791), a dropped funding-details record would be a detectable + // contradiction on `ChannelReady` rather than a timing-dependent assert, letting this test + // fail reliably whenever the fix is reverted. + wait_for_tx(&electrsd.client, original_txo.txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + // splice_in should fail when there's a pending splice (RBF guard) assert_eq!( node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000), From 7b7647bcca02cab13597d13f36cfd1ae5785924d Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 3 Jun 2026 16:01:15 -0500 Subject: [PATCH 5/7] f - Reject on-chain RBF of funding and splice payments bump_fee_rbf accepted channel-funding and splice payments because they are recorded as outbound, unconfirmed on-chain payments. Replacing such a transaction via wallet RBF would broadcast one LDK isn't tracking, and for splices the shared input can't be wallet-signed. Reject these and leave splice fee-bumping to rbf_channel. Co-Authored-By: Claude --- src/wallet/mod.rs | 17 ++++++++++ tests/integration_tests_rust.rs | 57 +++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 958325077..e5ac26bf1 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -1539,6 +1539,23 @@ impl Wallet { Error::InvalidPaymentId })?; + // Funding transactions (channel opens and splices) are driven by LDK's funding/splice + // lifecycle, not the on-chain wallet. Replacing one via on-chain RBF would broadcast a + // transaction LDK isn't tracking (and, for splices, can't sign). Fee-bumping a pending + // splice goes through `bump_channel_funding_fee` instead. + if self + .pending_payment_store + .get(&payment_id) + .map_or(false, |p| p.funding_details.is_some()) + { + log_error!( + self.logger, + "Cannot RBF funding payment {} via bump_fee_rbf; use bump_channel_funding_fee instead", + payment_id, + ); + return Err(Error::InvalidPaymentId); + } + if let PaymentKind::Onchain { status, .. } = &payment.kind { match status { ConfirmationStatus::Confirmed { .. } => { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index a1b25e4d6..522e396d5 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1325,6 +1325,63 @@ async fn rbf_splice_channel() { node_b.stop().unwrap(); } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn bump_fee_rbf_rejects_funding_payment() { + // A channel-funding or splice transaction is driven by LDK's funding/splice lifecycle, not the + // on-chain wallet. `bump_fee_rbf` must reject such payments — replacing the funding transaction + // via plain wallet RBF would broadcast a transaction LDK isn't tracking (and, for splices, + // can't even sign). Fee-bumping a pending splice goes through `bump_channel_funding_fee` instead. + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let address_a = node_a.onchain_payment().new_address().unwrap(); + let address_b = node_b.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 5_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a, address_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 4_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let _user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); + let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + + // Splice-in to create a pending splice payment. + node_b.splice_in(&user_channel_id_b, node_a.node_id(), 1_000_000).unwrap(); + + let txo = expect_splice_negotiated_event!(node_a, node_b.node_id()); + expect_splice_negotiated_event!(node_b, node_a.node_id()); + + // Make node_b's wallet aware of the splice transaction so `bump_fee_rbf` reaches its funding + // guard rather than failing earlier for a transaction it can't find. + wait_for_tx(&electrsd.client, txo.txid).await; + node_b.sync_wallets().unwrap(); + + // The splice payment is an on-chain, outbound, unconfirmed record, so it passes + // `bump_fee_rbf`'s other guards; it must nonetheless be rejected as a funding payment. + let splice_payment_id = PaymentId(txo.txid.to_byte_array()); + assert_eq!( + node_b.onchain_payment().bump_fee_rbf(splice_payment_id, None), + Err(NodeError::InvalidPaymentId), + ); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); From 511d194e066388058fb2c7c0a84aef34b9b7761a Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Fri, 5 Jun 2026 14:16:08 -0500 Subject: [PATCH 6/7] f - Rename persist_pending to persist_pending_payment Co-Authored-By: Claude --- src/wallet/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index e5ac26bf1..cd8c6ec2d 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -295,7 +295,7 @@ impl Wallet { if payment_status == PaymentStatus::Pending { let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; } else { self.payment_store.insert_or_update(payment)?; } @@ -390,7 +390,7 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; }, WalletEvent::TxReplaced { txid, conflicts, .. } => { let Some(payment_id) = self.find_payment_by_txid(txid) else { @@ -456,7 +456,7 @@ impl Wallet { ConfirmationStatus::Unconfirmed, ); let pending_payment = self.create_pending_payment_from_tx(payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; }, _ => { continue; @@ -1280,7 +1280,7 @@ impl Wallet { /// Writes a [`PendingPaymentDetails`] and its inner [`PaymentDetails`] to their /// respective stores in a fixed order. Callers that need to keep the two stores in /// sync should always go through this. - fn persist_pending(&self, pending: PendingPaymentDetails) -> Result<(), Error> { + fn persist_pending_payment(&self, pending: PendingPaymentDetails) -> Result<(), Error> { self.payment_store.insert_or_update(pending.details.clone())?; self.pending_payment_store.insert_or_update(pending)?; Ok(()) @@ -1489,7 +1489,7 @@ impl Wallet { pending.details.kind = PaymentKind::Onchain { txid: event_txid, status: confirmation_status }; - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; Ok(true) } @@ -1743,7 +1743,7 @@ impl Wallet { ); let pending_payment = self.create_pending_payment_from_tx(new_payment, Vec::new()); - self.persist_pending(pending_payment)?; + self.persist_pending_payment(pending_payment)?; log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); @@ -1807,7 +1807,7 @@ impl Wallet { let pending = PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; log_debug!( self.logger, "Recorded channel-funding broadcast {} for channel {}", @@ -1890,7 +1890,7 @@ impl Wallet { let pending = PendingPaymentDetails::with_funding_details(details, Vec::new(), funding_details); - self.persist_pending(pending)?; + self.persist_pending_payment(pending)?; log_debug!( self.logger, "Recorded interactive-funding broadcast {} ({} candidates, {} channels)", From bec7723f663a7890c74f5e4c372bfbdc78922a13 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 23 Apr 2026 14:58:20 -0500 Subject: [PATCH 7/7] Persist payment transaction data without blocking LDK Previously the BroadcasterInterface implementation wrote the payment record synchronously when LDK invoked it. With a remote KV store this could block LDK's message handling for hundreds of milliseconds per call, noticeably during force-close bursts or splice broadcasts. Persistence now happens asynchronously and must complete before the transaction is sent to the chain client. If persistence fails, the broadcast is dropped: a payment record must exist for every on-chain tx we emit, otherwise a crash could leave the tx confirmed with no matching record. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/chain/bitcoind.rs | 8 ++++--- src/chain/electrum.rs | 6 +++-- src/chain/esplora.rs | 8 ++++--- src/chain/mod.rs | 20 ++++++++++++---- src/tx_broadcaster.rs | 56 +++++++++++++++++++++++++++---------------- 5 files changed, 66 insertions(+), 32 deletions(-) diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 2582f32f6..eeed41a8d 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -568,16 +568,18 @@ impl BitcoindChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 // features, we should eventually switch to use `submitpackage` via the // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual // transactions. - for tx in &package { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS), - self.api_client.broadcast_transaction(tx), + self.api_client.broadcast_transaction(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 54e7fff0c..28825b191 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -275,7 +275,9 @@ impl ElectrumChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { let electrum_client: Arc = if let Some(client) = self.electrum_runtime_status.read().expect("lock").client().as_ref() { @@ -285,7 +287,7 @@ impl ElectrumChainSource { return; }; - for tx in package { + for tx in txs { electrum_client.broadcast(tx).await; } } diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index 5825a0984..5f88ad76e 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -352,12 +352,14 @@ impl EsploraChainSource { Ok(()) } - pub(crate) async fn process_broadcast_package(&self, package: Vec) { - for tx in &package { + pub(crate) async fn process_broadcast_package( + &self, txs: impl IntoIterator, + ) { + for tx in txs { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs), - self.esplora_client.broadcast(tx), + self.esplora_client.broadcast(&tx), ); match timeout_fut.await { Ok(res) => match res { diff --git a/src/chain/mod.rs b/src/chain/mod.rs index cb8541be6..cf0c946ba 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -24,7 +24,7 @@ use crate::config::{ WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; -use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; +use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; @@ -453,15 +453,27 @@ impl ChainSource { return; } Some(next_package) = receiver.recv() => { + let package = match self.tx_broadcaster.classify_package(next_package).await { + Ok(p) => p, + Err(e) => { + log_error!( + tx_bcast_logger, + "Skipping broadcast: failed to persist payment records: {:?}", + e, + ); + continue; + }, + }; + let txs = package.into_iter().map(|(tx, _)| tx); match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { - esplora_chain_source.process_broadcast_package(next_package).await + esplora_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Electrum(electrum_chain_source) => { - electrum_chain_source.process_broadcast_package(next_package).await + electrum_chain_source.process_broadcast_package(txs).await }, ChainSourceKind::Bitcoind(bitcoind_chain_source) => { - bitcoind_chain_source.process_broadcast_package(next_package).await + bitcoind_chain_source.process_broadcast_package(txs).await }, } } diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 24abf8f11..1e65fb1aa 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -14,20 +14,26 @@ use tokio::sync::{mpsc, Mutex, MutexGuard}; use crate::logger::{log_error, LdkLogger}; use crate::types::Wallet; +use crate::Error; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; +/// A package of transactions that LDK handed to the broadcaster in one +/// `broadcast_transactions` call, along with each transaction's type. Queued until the +/// background task classifies and broadcasts it. +pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>; + pub(crate) struct TransactionBroadcaster where L::Target: LdkLogger, { - queue_sender: mpsc::Sender>, - queue_receiver: Mutex>>, + queue_sender: mpsc::Sender, + queue_receiver: Mutex>, /// Weak handle to the [`Wallet`] that performs classification of funding broadcasts /// (channel opens and splices) into payment records. Remains `None` while the - /// builder is wiring the node up, during which broadcasts are still forwarded to - /// the queue but no payment record is written. [`Self::set_wallet`] installs the - /// handle once the [`Wallet`] exists. + /// builder is wiring the node up, during which broadcasts are forwarded to the + /// queue but no payment record is written. [`Self::set_wallet`] installs the handle + /// once the [`Wallet`] exists. wallet: StdMutex>>, logger: L, } @@ -55,9 +61,31 @@ where pub(crate) async fn get_broadcast_queue( &self, - ) -> MutexGuard<'_, mpsc::Receiver>> { + ) -> MutexGuard<'_, mpsc::Receiver> { self.queue_receiver.lock().await } + + /// Classifies a queued package into payment records and returns the package ready + /// for the chain client. Returns `Err` if any classification fails; callers must + /// not broadcast the package in that case, since a crash would leave the tx + /// on-chain without a record. + pub(crate) async fn classify_package( + &self, package: BroadcastPackage, + ) -> Result { + let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); + if let Some(wallet) = wallet_opt { + tokio::task::spawn_blocking(move || { + for (tx, tx_type) in &package { + wallet.classify_broadcast(tx, tx_type)?; + } + Ok::<_, Error>(package) + }) + .await + .map_err(|_| Error::PersistenceFailed)? + } else { + Ok(package) + } + } } impl BroadcasterInterface for TransactionBroadcaster @@ -65,20 +93,8 @@ where L::Target: LdkLogger, { fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) { - let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade); - if let Some(wallet) = wallet { - for (tx, tx_type) in txs { - if let Err(e) = wallet.classify_broadcast(tx, tx_type) { - log_error!( - self.logger, - "Failed to classify broadcast tx {}: {:?}", - tx.compute_txid(), - e, - ); - } - } - } - let package = txs.iter().map(|(t, _)| (*t).clone()).collect::>(); + let package: BroadcastPackage = + txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect(); self.queue_sender.try_send(package).unwrap_or_else(|e| { log_error!(self.logger, "Failed to broadcast transactions: {}", e); });