Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ interface Node {
[Throws=NodeError]
void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats);
[Throws=NodeError]
void bump_channel_funding_fee([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id);
[Throws=NodeError]
void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id);
[Throws=NodeError]
void force_close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, string? reason);
Expand Down
2 changes: 2 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,8 @@ fn build_with_store_internal(
Arc::clone(&pending_payment_store),
));

tx_broadcaster.set_wallet(Arc::downgrade(&wallet));

// Initialize the KeysManager
let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| {
log_error!(logger, "Failed to get current time: {}", e);
Expand Down
8 changes: 5 additions & 3 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,16 +568,18 @@ impl BitcoindChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
pub(crate) async fn process_broadcast_package(
&self, txs: impl IntoIterator<Item = Transaction>,
) {
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
// features, we should eventually switch to use `submitpackage` via the
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
// transactions.
for tx in &package {
for tx in txs {
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS),
self.api_client.broadcast_transaction(tx),
self.api_client.broadcast_transaction(&tx),
);
match timeout_fut.await {
Ok(res) => match res {
Expand Down
6 changes: 4 additions & 2 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ impl ElectrumChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
pub(crate) async fn process_broadcast_package(
&self, txs: impl IntoIterator<Item = Transaction>,
) {
let electrum_client: Arc<ElectrumRuntimeClient> = if let Some(client) =
self.electrum_runtime_status.read().expect("lock").client().as_ref()
{
Expand All @@ -285,7 +287,7 @@ impl ElectrumChainSource {
return;
};

for tx in package {
for tx in txs {
electrum_client.broadcast(tx).await;
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/chain/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,14 @@ impl EsploraChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
for tx in &package {
pub(crate) async fn process_broadcast_package(
&self, txs: impl IntoIterator<Item = Transaction>,
) {
for tx in txs {
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs),
self.esplora_client.broadcast(tx),
self.esplora_client.broadcast(&tx),
);
match timeout_fut.await {
Ok(res) => match res {
Expand Down
20 changes: 16 additions & 4 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::config::{
WALLET_SYNC_INTERVAL_MINIMUM_SECS,
};
use crate::fee_estimator::OnchainFeeEstimator;
use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger};
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
use crate::runtime::Runtime;
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, NodeMetrics};
Expand Down Expand Up @@ -453,15 +453,27 @@ impl ChainSource {
return;
}
Some(next_package) = receiver.recv() => {
let package = match self.tx_broadcaster.classify_package(next_package).await {
Ok(p) => p,
Err(e) => {
log_error!(
tx_bcast_logger,
"Skipping broadcast: failed to persist payment records: {:?}",
e,
);
continue;
},
};
let txs = package.into_iter().map(|(tx, _)| tx);
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
esplora_chain_source.process_broadcast_package(next_package).await
esplora_chain_source.process_broadcast_package(txs).await
},
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source.process_broadcast_package(next_package).await
electrum_chain_source.process_broadcast_package(txs).await
},
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
bitcoind_chain_source.process_broadcast_package(next_package).await
bitcoind_chain_source.process_broadcast_package(txs).await
},
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,18 @@ where
);
}

if let Err(e) =
self.wallet.handle_channel_ready(channel_id, funding_txo.map(|txo| txo.txid))
{
log_error!(
self.logger,
"Failed to graduate funding payment on ChannelReady for channel {}: {:?}",
channel_id,
e,
);
return Err(ReplayEvent());
}

if let Some(liquidity_source) = self.liquidity_source.as_ref() {
liquidity_source
.handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id)
Expand Down Expand Up @@ -1609,6 +1621,16 @@ where
} => {
log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason);

if let Err(e) = self.wallet.handle_channel_closed(channel_id) {
log_error!(
self.logger,
"Failed to handle ChannelClosed for channel {}: {:?}",
channel_id,
e,
);
return Err(ReplayEvent());
}

let event = Event::ChannelClosed {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
Expand Down
71 changes: 69 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,7 @@ impl Node {
if funding_template.prior_contribution().is_some() {
log_error!(
self.logger,
"Failed to splice channel: a prior splice contribution is pending"
"Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead"
);
return Err(Error::ChannelSplicingFailed);
}
Expand Down Expand Up @@ -1716,7 +1716,7 @@ impl Node {
if funding_template.prior_contribution().is_some() {
log_error!(
self.logger,
"Failed to splice channel: a prior splice contribution is pending"
"Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee instead"
);
return Err(Error::ChannelSplicingFailed);
}
Expand Down Expand Up @@ -1753,6 +1753,73 @@ impl Node {
}
}

/// Replace a pending splice's funding transaction with a higher-feerate version.
///
/// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior
/// contribution is reused when possible; otherwise, coin selection is re-run.
///
/// # Experimental API
///
/// This API is experimental and may change in the future.
pub fn bump_channel_funding_fee(
&self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey,
) -> Result<(), Error> {
let open_channels =
self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
if let Some(channel_details) =
open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0)
{
let min_feerate =
self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding);
let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2);

let funding_template = self
.channel_manager
.splice_channel(&channel_details.channel_id, &counterparty_node_id)
.map_err(|e| {
log_error!(self.logger, "Failed to RBF channel: {:?}", e);
Error::ChannelSplicingFailed
})?;

if funding_template.min_rbf_feerate().is_none() {
log_error!(self.logger, "Failed to RBF channel: no pending splice to replace");
return Err(Error::ChannelSplicingFailed);
}

let contribution = self
.runtime
.block_on(funding_template.rbf_prior_contribution(
None,
max_feerate,
Arc::clone(&self.wallet),
))
.map_err(|e| {
log_error!(self.logger, "Failed to RBF channel: {}", e);
Error::ChannelSplicingFailed
})?;

self.channel_manager
.funding_contributed(
&channel_details.channel_id,
&counterparty_node_id,
contribution,
None,
)
.map_err(|e| {
log_error!(self.logger, "Failed to RBF channel: {:?}", e);
Error::ChannelSplicingFailed
})
} else {
log_error!(
self.logger,
"Channel not found for user_channel_id {} and counterparty {}",
user_channel_id,
counterparty_node_id
);
Err(Error::ChannelSplicingFailed)
}
}

/// Manually sync the LDK and BDK wallets with the current chain state and update the fee rate
/// cache.
///
Expand Down
45 changes: 43 additions & 2 deletions src/payment/pending_payment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,52 @@
// accordance with one or both of these licenses.

use bitcoin::Txid;
use lightning::chain::chaininterface::FundingCandidate;
use lightning::impl_writeable_tlv_based;
use lightning::ln::channelmanager::PaymentId;

use crate::data_store::{StorableObject, StorableObjectUpdate};
use crate::payment::store::PaymentDetailsUpdate;
use crate::payment::PaymentDetails;

/// Marks an on-chain payment as belonging to an interactive-funding negotiation. The
/// last entry in `candidates` is the currently-broadcast tx; earlier entries are RBF
/// predecessors that may still confirm if reorgs intervene.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FundingDetails {
/// Every negotiated candidate, oldest first.
pub candidates: Vec<FundingCandidate>,
}

impl_writeable_tlv_based!(FundingDetails, {
(0, candidates, optional_vec),
});

/// Represents a pending payment
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PendingPaymentDetails {
/// The full payment details
pub details: PaymentDetails,
/// Transaction IDs that have replaced or conflict with this payment.
pub conflicting_txids: Vec<Txid>,
/// Set when the payment's transaction is an interactive-funding broadcast (channel
/// open or splice). The record transitions to [`PaymentStatus::Succeeded`] on
/// `ChannelReady` instead of after [`ANTI_REORG_DELAY`] confirmations.
///
/// [`PaymentStatus::Succeeded`]: crate::payment::store::PaymentStatus::Succeeded
/// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY
pub funding_details: Option<FundingDetails>,
}

impl PendingPaymentDetails {
pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec<Txid>) -> Self {
Self { details, conflicting_txids }
Self { details, conflicting_txids, funding_details: None }
}

pub(crate) fn with_funding_details(
details: PaymentDetails, conflicting_txids: Vec<Txid>, funding_details: FundingDetails,
) -> Self {
Self { details, conflicting_txids, funding_details: Some(funding_details) }
}

/// Convert to finalized payment for the main payment store
Expand All @@ -36,13 +63,15 @@ impl PendingPaymentDetails {
impl_writeable_tlv_based!(PendingPaymentDetails, {
(0, details, required),
(2, conflicting_txids, optional_vec),
(4, funding_details, option),
});

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PendingPaymentDetailsUpdate {
pub id: PaymentId,
pub payment_update: Option<PaymentDetailsUpdate>,
pub conflicting_txids: Option<Vec<Txid>>,
pub funding_details: Option<Option<FundingDetails>>,
}

impl StorableObject for PendingPaymentDetails {
Expand All @@ -68,6 +97,13 @@ impl StorableObject for PendingPaymentDetails {
}
}

if let Some(new_funding_details) = update.funding_details {
if self.funding_details != new_funding_details {
self.funding_details = new_funding_details;
updated = true;
}
}

updated
}

Expand All @@ -89,6 +125,11 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate {
} else {
Some(value.conflicting_txids.clone())
};
Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids }
Self {
id: value.id(),
payment_update: Some(value.details.to_update()),
conflicting_txids,
funding_details: Some(value.funding_details.clone()),
}
}
}
Loading