Skip to content

Fix connection issues second approach #1411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7bd42ce
increase time waiting nat traversal and min connections by default
netsirius Feb 2, 2025
65a80e6
add some logs and github ci for corss compiling
netsirius Feb 2, 2025
f1a078b
some fixes
netsirius Feb 2, 2025
a664505
better management of reporting received packets
netsirius Feb 2, 2025
78701d9
increment packets channel buffer size
netsirius Feb 2, 2025
1b1969d
fmt
netsirius Feb 2, 2025
f32528d
add better traces for peer connection
netsirius Feb 2, 2025
bd47b2e
add better traces for peer connection
netsirius Feb 2, 2025
5a720ff
update cross-compile workflow to specify output path and enhance pack…
netsirius Feb 4, 2025
7112dc2
enhance packet sending debug traces
netsirius Feb 5, 2025
76d7282
enhance packet sending debug traces
netsirius Feb 6, 2025
6dedc09
enhance packet sending debug traces
netsirius Feb 6, 2025
a8ccccc
enhance logging for packet handling and cleanup processes
netsirius Feb 6, 2025
d00c9f0
refactor connection acceptance logic and add deployment script
netsirius Feb 7, 2025
8433e1e
refactor skip list handling to use HashSet for improved performance a…
netsirius Feb 8, 2025
173c3b1
docs: small clarifying comment
iduartgomez Feb 8, 2025
4c482bf
docs: fix incomplete path on freenet node setup guide (#1409)
alexisbatyk Feb 8, 2025
fc05127
refactor logging levels in packet handling for improved clarity and p…
netsirius Feb 8, 2025
33fa9cc
enhance NAT traversal with a logarithmic backoff strategy
netsirius Feb 9, 2025
a5f3959
chore: avoid unnecessary copy of skip list
iduartgomez Feb 9, 2025
89767d7
improve how we compute the backoffs
iduartgomez Feb 9, 2025
83ce592
refactor connection handling to use distinct skip lists for connectio…
netsirius Feb 9, 2025
5e2b8a5
wip
iduartgomez Feb 9, 2025
cdb1e41
wip
iduartgomez Feb 9, 2025
8c080a3
wip
iduartgomez Feb 9, 2025
09edf10
add connection established logging
netsirius Feb 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions .github/workflows/cross-compile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
name: Build and Cross-Compile

on:
workflow_dispatch:
pull_request:

jobs:
build-x86_64:
name: Build for x86_64-unknown-linux-gnu

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
with:
submodules: true

- name: Install Rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: stable

- uses: Swatinem/rust-cache@v2

- name: Install cross
run: cargo install cross

- name: Compile for x86_64-unknown-linux-gnu
run: cargo build --release

- name: Upload binaries
uses: actions/upload-artifact@v4
with:
name: binaries-x86_64
path: target/release/freenet

build-arm64:
name: Build for aarch64-unknown-linux-gnu

runs-on: ubuntu-24.04-arm

steps:
- uses: actions/checkout@v4
with:
submodules: true

- name: Install Rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: stable

- uses: Swatinem/rust-cache@v2

- name: Compile for aarch64-unknown-linux-gnu
run: cargo build --release

- name: Upload binaries
uses: actions/upload-artifact@v4
with:
name: binaries-arm64
path: target/release/freenet
5 changes: 4 additions & 1 deletion crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use freenet_stdlib::{
};
use futures::stream::FuturesUnordered;
use futures::{future::BoxFuture, FutureExt, StreamExt};
use std::collections::HashSet;
use std::fmt::Display;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
Expand Down Expand Up @@ -454,7 +455,9 @@ async fn process_open_request(
);
})?;

if let Err(err) = get::request_get(&op_manager, op, vec![]).await {
if let Err(err) =
get::request_get(&op_manager, op, HashSet::new()).await
{
tracing::error!("get::request_get error: {}", err);
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ impl ConfigArgs {
Either::Right(dir) => (dir.clone(), dir),
}
};
tracing::info!(
?config,
?data,
"Looking for configuration file in default directory"
);
self.config_paths.config_dir = Some(config.clone());
if self.config_paths.data_dir.is_none() {
self.config_paths.data_dir = Some(data);
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Contract executor.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::future::Future;
use std::path::PathBuf;
Expand Down Expand Up @@ -371,7 +371,7 @@ impl ComposeNetworkMessage<operations::get::GetOp> for GetContract {
}

async fn resume_op(op: operations::get::GetOp, op_manager: &OpManager) -> Result<(), OpError> {
operations::get::request_get(op_manager, op, vec![]).await
operations::get::request_get(op_manager, op, HashSet::new()).await
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,10 @@ impl Executor<Runtime> {
};
let new_state = WrappedState::new(new_state.into_bytes());

if new_state.as_ref() == current_state.as_ref() {
tracing::debug!("No changes in state for contract {key}, avoiding update");
return Ok(Either::Left(current_state.clone()));
}
// if new_state.as_ref() == current_state.as_ref() {
// tracing::debug!("No changes in state for contract {key}, avoiding update");
// return Ok(Either::Left(current_state.clone()));
// }

self.state_store
.update(key, new_state.clone())
Expand Down
16 changes: 8 additions & 8 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
//! - in-memory: a simplifying node used for emulation purposes mainly.
//! - inter-process: similar to in-memory, but can be rana cross multiple processes, closer to the real p2p impl

use anyhow::Context;
use either::Either;
use freenet_stdlib::{
client_api::{ClientRequest, ErrorKind},
prelude::ContractKey,
};
use std::collections::HashSet;
use std::{
borrow::Cow,
fmt::Display,
Expand All @@ -19,13 +26,6 @@ use std::{
time::Duration,
};

use anyhow::Context;
use either::Either;
use freenet_stdlib::{
client_api::{ClientRequest, ErrorKind},
prelude::ContractKey,
};

use rsa::pkcs8::DecodePublicKey;
use serde::{Deserialize, Serialize};
use tracing::Instrument;
Expand Down Expand Up @@ -649,7 +649,7 @@ pub async fn subscribe(
Err(OpError::ContractError(ContractError::ContractNotFound(key))) => {
tracing::info!(%key, "Trying to subscribe to a contract not present, requesting it first");
let get_op = get::start_op(key, true);
if let Err(error) = get::request_get(&op_manager, get_op, vec![]).await {
if let Err(error) = get::request_get(&op_manager, get_op, HashSet::new()).await {
tracing::error!(%key, %error, "Failed getting the contract while previously trying to subscribe; bailing");
return Err(error);
}
Expand Down
91 changes: 58 additions & 33 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,11 @@ impl HandshakeHandler {
outbound_conn = self.ongoing_outbound_connections.next(), if !self.ongoing_outbound_connections.is_empty() => {
let r = match outbound_conn {
Some(Ok(InternalEvent::OutboundConnEstablished(peer_id, connection))) => {
tracing::debug!(at=?connection.my_address(), from=%connection.remote_addr(), "Outbound connection successful");
tracing::info!(at=?connection.my_address(), from=%connection.remote_addr(), "Outbound connection successful");
Ok(Event::OutboundConnectionSuccessful { peer_id, connection })
}
Some(Ok(InternalEvent::OutboundGwConnEstablished(id, connection))) => {
tracing::info!(at=?connection.my_address(), from=%connection.remote_addr(), "Outbound gateway connection successful");
if let Some(addr) = connection.my_address() {
tracing::debug!(%addr, "Attempting setting own peer key");
self.connection_manager.try_set_peer_key(addr);
Expand Down Expand Up @@ -352,7 +353,7 @@ impl HandshakeHandler {
return Err(e.into());
}

let InboundGwJoinRequest { conn, id, joiner, hops_to_live, max_hops_to_live, skip_list } = req;
let InboundGwJoinRequest { conn, id, joiner, hops_to_live, max_hops_to_live, skip_connections, skip_forwards } = req;

let (ok, forward_info) = {
// TODO: refactor this so it happens in the background out of the main handler loop
Expand All @@ -367,19 +368,27 @@ impl HandshakeHandler {
location: Some(joiner_loc),
};

let mut skip_connections = skip_connections.clone();
let mut skip_forwards = skip_forwards.clone();
skip_connections.insert(my_peer_id.peer.clone());
skip_forwards.insert(my_peer_id.peer.clone());

let forward_info = ForwardParams {
left_htl: hops_to_live,
max_htl: max_hops_to_live,
accepted: true,
skip_connections,
skip_forwards,
req_peer: my_peer_id.clone(),
joiner: joiner_pk_loc.clone(),
};

let f = forward_conn(
id,
&self.connection_manager,
self.router.clone(),
&mut nw_bridge,
ForwardParams {
left_htl: hops_to_live,
max_htl: max_hops_to_live,
skip_list,
accepted: true,
req_peer: my_peer_id.clone(),
joiner: joiner_pk_loc.clone(),
}
forward_info
);

match f.await {
Expand Down Expand Up @@ -412,15 +421,16 @@ impl HandshakeHandler {
})

} else {
let InboundGwJoinRequest { mut conn, id, hops_to_live, max_hops_to_live, skip_list, .. } = req;
let InboundGwJoinRequest { mut conn, id, hops_to_live, max_hops_to_live, skip_connections, skip_forwards, .. } = req;
let remote = conn.remote_addr();
tracing::debug!(at=?conn.my_address(), from=%remote, "Transient connection");
let mut tx = TransientConnection {
tx: id,
joiner: req.joiner.clone(),
max_hops_to_live,
hops_to_live,
skip_list,
skip_connections,
skip_forwards,
};
match self.forward_transient_connection(&mut conn, &mut tx).await {
Ok(ForwardResult::Forward(forward_target, msg, info)) => {
Expand Down Expand Up @@ -503,22 +513,29 @@ impl HandshakeHandler {
location: Some(joiner_loc),
};
let my_peer_id = self.connection_manager.own_location();
transaction.skip_list.push(transaction.joiner.clone());
transaction.skip_list.push(my_peer_id.peer.clone());
transaction
.skip_connections
.insert(transaction.joiner.clone());
transaction.skip_forwards.insert(transaction.joiner.clone());
transaction.skip_connections.insert(my_peer_id.peer.clone());
transaction.skip_forwards.insert(my_peer_id.peer.clone());

let forward_info = ForwardParams {
left_htl: transaction.hops_to_live,
max_htl: transaction.max_hops_to_live,
accepted: true,
skip_connections: transaction.skip_connections.clone(),
skip_forwards: transaction.skip_forwards.clone(),
req_peer: my_peer_id.clone(),
joiner: joiner_pk_loc.clone(),
};

match forward_conn(
transaction.tx,
&self.connection_manager,
self.router.clone(),
&mut nw_bridge,
ForwardParams {
left_htl: transaction.hops_to_live,
max_htl: transaction.max_hops_to_live,
skip_list: transaction.skip_list.clone(),
accepted: false,
req_peer: my_peer_id.clone(),
joiner: joiner_pk_loc.clone(),
},
forward_info,
)
.await
{
Expand Down Expand Up @@ -731,7 +748,8 @@ struct InboundGwJoinRequest {
pub joiner: PeerId,
pub hops_to_live: usize,
pub max_hops_to_live: usize,
pub skip_list: Vec<PeerId>,
pub skip_connections: HashSet<PeerId>,
pub skip_forwards: HashSet<PeerId>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -782,7 +800,8 @@ async fn wait_for_gw_confirmation(
joiner_key: this_peer.pub_key.clone(),
hops_to_live: tracker.total_checks,
max_hops_to_live: tracker.total_checks,
skip_list: vec![this_peer],
skip_connections: HashSet::from([this_peer.clone()]),
skip_forwards: HashSet::from([this_peer.clone()]),
},
}));
tracing::debug!(
Expand Down Expand Up @@ -926,20 +945,24 @@ async fn gw_peer_connection_listener(
match net_message {
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
id,
msg: ConnectRequest::StartJoinReq { joiner, joiner_key, hops_to_live, max_hops_to_live, skip_list },
msg: ConnectRequest::StartJoinReq { joiner, joiner_key, hops_to_live, max_hops_to_live, skip_connections, skip_forwards },
..
})) => {
let joiner = joiner.unwrap_or_else(|| {
tracing::debug!(%joiner_key, "Joiner not provided, using joiner key");
PeerId::new(conn.remote_addr(), joiner_key)
});
break Ok((
InternalEvent::InboundGwJoinRequest(
InboundGwJoinRequest {
conn, id, joiner, hops_to_live, max_hops_to_live, skip_list
}
),
outbound
InternalEvent::InboundGwJoinRequest(InboundGwJoinRequest {
conn,
id,
joiner,
hops_to_live,
max_hops_to_live,
skip_connections,
skip_forwards,
}),
outbound,
));
}
other => {
Expand Down Expand Up @@ -1050,7 +1073,8 @@ struct TransientConnection {
joiner: PeerId,
max_hops_to_live: usize,
hops_to_live: usize,
skip_list: Vec<PeerId>,
skip_connections: HashSet<PeerId>,
skip_forwards: HashSet<PeerId>,
}

impl TransientConnection {
Expand Down Expand Up @@ -1169,7 +1193,8 @@ mod tests {
joiner_key: pub_key,
hops_to_live,
max_hops_to_live: hops_to_live,
skip_list: vec![],
skip_connections: HashSet::new(),
skip_forwards: HashSet::new(),
},
};
self.inbound_msg(
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ impl P2pConnManager {
} else {
tracing::warn!(%peer_id, "No callback for connection established");
}
tracing::info!(at=%peer_id, remote_addr=%connection.remote_addr(), "Connection established");
let (tx, rx) = mpsc::channel(10);
self.connections.insert(peer_id.clone(), tx);
let task = peer_connection_listener(rx, connection).boxed();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(debug_assertions)]
use std::backtrace::Backtrace as StdTrace;
use std::{pin::Pin, time::Duration};
use std::{collections::HashSet, pin::Pin, time::Duration};

use freenet_stdlib::prelude::ContractKey;
use futures::Future;
Expand Down Expand Up @@ -310,7 +310,7 @@ async fn start_subscription_request(
op_manager: &OpManager,
key: ContractKey,
try_get: bool,
skip_list: Vec<PeerId>,
skip_list: HashSet<PeerId>,
) {
let sub_op = subscribe::start_op(key);
if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await {
Expand Down
Loading
Loading