From 587fa5252e7f819ae343c74f360496733172f8b0 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 23 Apr 2026 13:41:23 +0200 Subject: [PATCH 1/3] Propagate ScheduleResponse::Shutdown to the connection loop --- crates/hotfix/src/initiator.rs | 16 +++++++++++++--- crates/hotfix/src/session/session_ref.rs | 7 ++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index 7e64adf..b86bdca 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -15,6 +15,7 @@ use crate::application::Application; use crate::config::SessionConfig; use crate::message::OutboundMessage; use crate::session::error::{SendError, SendOutcome, SessionCreationError}; +use crate::session::event::ScheduleResponse; use crate::session::{InternalSessionRef, SessionHandle}; use crate::store::MessageStore; use crate::transport::connect; @@ -107,9 +108,18 @@ async fn establish_connection( completion_tx: watch::Sender, ) { loop { - if session_ref.await_in_schedule().await.is_err() { - warn!("session task terminated when checking schedule"); - break; + match session_ref.await_in_schedule().await { + Ok(ScheduleResponse::InSchedule) => { + debug!("resuming connection as schedule is active"); + } + Ok(ScheduleResponse::Shutdown) => { + warn!("session indicated shutdown during schedule check"); + break; + } + Err(_) => { + warn!("session task terminated when checking schedule"); + break; + } } match connect(&config, session_ref.clone()).await { diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index 15aa395..77d8328 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -82,16 +82,13 @@ impl InternalSessionRef { Ok(receiver.await?) } - pub async fn await_in_schedule(&self) -> Result<(), SessionGone> { + pub async fn await_in_schedule(&self) -> Result { debug!("awaiting in-schedule time"); let (sender, receiver) = oneshot::channel::(); self.event_sender .send(SessionEvent::AwaitSchedule(sender)) .await?; - receiver.await?; - - debug!("resuming connection as schedule is active"); - Ok(()) + Ok(receiver.await?) } } From eb6841269f2fdc045f345468301342cba71d2161 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 23 Apr 2026 14:04:12 +0200 Subject: [PATCH 2/3] Add unit tests for await_in_schedule --- crates/hotfix/src/session/session_ref.rs | 73 ++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index 77d8328..9628cff 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -107,3 +107,76 @@ impl From for SessionGone { Self(err.to_string()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::message::Message; + use crate::message::OutboundMessage; + use crate::session::admin_request::AdminRequest; + + #[derive(Clone, Debug, PartialEq)] + struct TestMessage; + + impl OutboundMessage for TestMessage { + fn write(&self, _msg: &mut Message) {} + fn message_type(&self) -> &str { + "TEST" + } + } + + fn create_test_session_ref() -> ( + InternalSessionRef, + mpsc::Receiver, + ) { + let (event_sender, event_receiver) = mpsc::channel::(100); + let (outbound_message_sender, _outbound_receiver) = + mpsc::channel::>(10); + let (admin_request_sender, _admin_receiver) = mpsc::channel::(10); + + let session_ref = InternalSessionRef { + event_sender, + outbound_message_sender, + admin_request_sender, + }; + + (session_ref, event_receiver) + } + + #[tokio::test] + async fn await_in_schedule_returns_in_schedule_when_session_responds_in_schedule() { + let (session_ref, mut event_receiver) = create_test_session_ref(); + + tokio::spawn(async move { + if let Some(SessionEvent::AwaitSchedule(responder)) = event_receiver.recv().await { + let _ = responder.send(ScheduleResponse::InSchedule); + } + }); + + let result = session_ref.await_in_schedule().await; + assert!(matches!(result, Ok(ScheduleResponse::InSchedule))); + } + + #[tokio::test] + async fn await_in_schedule_returns_shutdown_when_session_responds_shutdown() { + let (session_ref, mut event_receiver) = create_test_session_ref(); + + tokio::spawn(async move { + if let Some(SessionEvent::AwaitSchedule(responder)) = event_receiver.recv().await { + let _ = responder.send(ScheduleResponse::Shutdown); + } + }); + + let result = session_ref.await_in_schedule().await; + assert!(matches!(result, Ok(ScheduleResponse::Shutdown))); + } + + #[tokio::test] + async fn await_in_schedule_returns_err_when_event_channel_closed() { + let (session_ref, event_receiver) = create_test_session_ref(); + drop(event_receiver); + + let result = session_ref.await_in_schedule().await; + assert!(result.is_err()); + } +} From 7129b2f40de78dac0e6a874da71f4e6e73c19af4 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 23 Apr 2026 14:16:35 +0200 Subject: [PATCH 3/3] Cover await_in_schedule InSchedule/Shutdown/SessionGone paths --- crates/hotfix/src/session.rs | 2 +- crates/hotfix/src/session/session_ref.rs | 48 +++++-------------- crates/hotfix/src/session/test_utils.rs | 32 +++++++++++++ .../src/transport/socket/socket_reader.rs | 35 +------------- 4 files changed, 46 insertions(+), 71 deletions(-) diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 0165324..fc28d81 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -9,7 +9,7 @@ mod session_handle; pub mod session_ref; mod state; #[cfg(test)] -mod test_utils; +pub(crate) mod test_utils; use chrono::Utc; use hotfix_message::dict::Dictionary; diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index 9628cff..a697146 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -111,45 +111,18 @@ impl From for SessionGone { #[cfg(test)] mod tests { use super::*; - use crate::message::Message; - use crate::message::OutboundMessage; - use crate::session::admin_request::AdminRequest; - - #[derive(Clone, Debug, PartialEq)] - struct TestMessage; - - impl OutboundMessage for TestMessage { - fn write(&self, _msg: &mut Message) {} - fn message_type(&self) -> &str { - "TEST" - } - } - - fn create_test_session_ref() -> ( - InternalSessionRef, - mpsc::Receiver, - ) { - let (event_sender, event_receiver) = mpsc::channel::(100); - let (outbound_message_sender, _outbound_receiver) = - mpsc::channel::>(10); - let (admin_request_sender, _admin_receiver) = mpsc::channel::(10); - - let session_ref = InternalSessionRef { - event_sender, - outbound_message_sender, - admin_request_sender, - }; - - (session_ref, event_receiver) - } + use crate::session::test_utils::create_test_session_ref; #[tokio::test] async fn await_in_schedule_returns_in_schedule_when_session_responds_in_schedule() { let (session_ref, mut event_receiver) = create_test_session_ref(); tokio::spawn(async move { - if let Some(SessionEvent::AwaitSchedule(responder)) = event_receiver.recv().await { - let _ = responder.send(ScheduleResponse::InSchedule); + match event_receiver.recv().await { + Some(SessionEvent::AwaitSchedule(responder)) => { + let _ = responder.send(ScheduleResponse::InSchedule); + } + other => panic!("unexpected event: {other:?}"), } }); @@ -162,8 +135,11 @@ mod tests { let (session_ref, mut event_receiver) = create_test_session_ref(); tokio::spawn(async move { - if let Some(SessionEvent::AwaitSchedule(responder)) = event_receiver.recv().await { - let _ = responder.send(ScheduleResponse::Shutdown); + match event_receiver.recv().await { + Some(SessionEvent::AwaitSchedule(responder)) => { + let _ = responder.send(ScheduleResponse::Shutdown); + } + other => panic!("unexpected event: {other:?}"), } }); @@ -177,6 +153,6 @@ mod tests { drop(event_receiver); let result = session_ref.await_in_schedule().await; - assert!(result.is_err()); + assert!(matches!(result, Err(SessionGone(_)))); } } diff --git a/crates/hotfix/src/session/test_utils.rs b/crates/hotfix/src/session/test_utils.rs index d71becc..a7e6cfa 100644 --- a/crates/hotfix/src/session/test_utils.rs +++ b/crates/hotfix/src/session/test_utils.rs @@ -1,5 +1,9 @@ use crate::config::SessionConfig; +use crate::message::{Message, OutboundMessage}; +use crate::session::admin_request::AdminRequest; use crate::session::ctx::SessionCtx; +use crate::session::event::SessionEvent; +use crate::session::session_ref::{InternalSessionRef, OutboundRequest}; use crate::store::{MessageStore, Result as StoreResult}; use crate::transport::writer::{WriterMessage, WriterRef}; use chrono::{DateTime, Utc}; @@ -117,3 +121,31 @@ pub(crate) fn extract_field(raw: &[u8], tag: u32) -> Option { } None } + +#[derive(Clone)] +pub(crate) struct TestMessage; + +impl OutboundMessage for TestMessage { + fn write(&self, _msg: &mut Message) {} + fn message_type(&self) -> &str { + "TEST" + } +} + +pub(crate) fn create_test_session_ref() -> ( + InternalSessionRef, + mpsc::Receiver, +) { + let (event_sender, event_receiver) = mpsc::channel::(100); + let (outbound_message_sender, _outbound_receiver) = + mpsc::channel::>(10); + let (admin_request_sender, _admin_receiver) = mpsc::channel::(10); + + let session_ref = InternalSessionRef { + event_sender, + outbound_message_sender, + admin_request_sender, + }; + + (session_ref, event_receiver) +} diff --git a/crates/hotfix/src/transport/socket/socket_reader.rs b/crates/hotfix/src/transport/socket/socket_reader.rs index 4e9ad0c..3cdbcaa 100644 --- a/crates/hotfix/src/transport/socket/socket_reader.rs +++ b/crates/hotfix/src/transport/socket/socket_reader.rs @@ -84,42 +84,9 @@ where #[cfg(test)] mod tests { use super::*; - use crate::message::Message; - use crate::session::admin_request::AdminRequest; use crate::session::event::SessionEvent; - use crate::session::session_ref::OutboundRequest; + use crate::session::test_utils::create_test_session_ref; use tokio::io::{AsyncWriteExt, duplex}; - use tokio::sync::mpsc; - - #[derive(Clone, Debug, PartialEq)] - struct TestMessage; - - impl OutboundMessage for TestMessage { - fn write(&self, _msg: &mut Message) {} - - fn message_type(&self) -> &str { - "TEST" - } - } - - /// Creates a test InternalSessionRef that captures events for verification - fn create_test_session_ref() -> ( - InternalSessionRef, - mpsc::Receiver, - ) { - let (event_sender, event_receiver) = mpsc::channel::(100); - let (outbound_message_sender, _outbound_receiver) = - mpsc::channel::>(10); - let (admin_request_sender, _admin_receiver) = mpsc::channel::(10); - - let session_ref = InternalSessionRef { - event_sender, - outbound_message_sender, - admin_request_sender, - }; - - (session_ref, event_receiver) - } /// Test that the reader correctly parses a valid FIX message and sends it to the session /// for processing.