Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions crates/hotfix/src/initiator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::application::Application;
use crate::config::SessionConfig;
use crate::message::OutboundMessage;
use crate::session::error::{SendError, SendOutcome, SessionCreationError};
use crate::session::event::ScheduleResponse;
use crate::session::{InternalSessionRef, SessionHandle};
use crate::store::MessageStore;
use crate::transport::connect;
Expand Down Expand Up @@ -107,9 +108,18 @@ async fn establish_connection<Outbound: OutboundMessage>(
completion_tx: watch::Sender<bool>,
) {
loop {
if session_ref.await_in_schedule().await.is_err() {
warn!("session task terminated when checking schedule");
break;
match session_ref.await_in_schedule().await {
Ok(ScheduleResponse::InSchedule) => {
debug!("resuming connection as schedule is active");
}
Ok(ScheduleResponse::Shutdown) => {
warn!("session indicated shutdown during schedule check");
break;
}
Err(_) => {
warn!("session task terminated when checking schedule");
break;
}
}

match connect(&config, session_ref.clone()).await {
Expand Down
2 changes: 1 addition & 1 deletion crates/hotfix/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod session_handle;
pub mod session_ref;
mod state;
#[cfg(test)]
mod test_utils;
pub(crate) mod test_utils;

use chrono::Utc;
use hotfix_message::dict::Dictionary;
Expand Down
56 changes: 51 additions & 5 deletions crates/hotfix/src/session/session_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,13 @@ impl<Outbound: OutboundMessage> InternalSessionRef<Outbound> {
Ok(receiver.await?)
}

pub async fn await_in_schedule(&self) -> Result<(), SessionGone> {
pub async fn await_in_schedule(&self) -> Result<ScheduleResponse, SessionGone> {
debug!("awaiting in-schedule time");
let (sender, receiver) = oneshot::channel::<ScheduleResponse>();
self.event_sender
.send(SessionEvent::AwaitSchedule(sender))
.await?;
receiver.await?;

debug!("resuming connection as schedule is active");
Ok(())
Ok(receiver.await?)
}
}

Expand All @@ -110,3 +107,52 @@ impl From<oneshot::error::RecvError> for SessionGone {
Self(err.to_string())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::session::test_utils::create_test_session_ref;

#[tokio::test]
async fn await_in_schedule_returns_in_schedule_when_session_responds_in_schedule() {
let (session_ref, mut event_receiver) = create_test_session_ref();

tokio::spawn(async move {
match event_receiver.recv().await {
Some(SessionEvent::AwaitSchedule(responder)) => {
let _ = responder.send(ScheduleResponse::InSchedule);
}
other => panic!("unexpected event: {other:?}"),
}
});

let result = session_ref.await_in_schedule().await;
assert!(matches!(result, Ok(ScheduleResponse::InSchedule)));
}

#[tokio::test]
async fn await_in_schedule_returns_shutdown_when_session_responds_shutdown() {
let (session_ref, mut event_receiver) = create_test_session_ref();

tokio::spawn(async move {
match event_receiver.recv().await {
Some(SessionEvent::AwaitSchedule(responder)) => {
let _ = responder.send(ScheduleResponse::Shutdown);
}
other => panic!("unexpected event: {other:?}"),
}
});

let result = session_ref.await_in_schedule().await;
assert!(matches!(result, Ok(ScheduleResponse::Shutdown)));
}

#[tokio::test]
async fn await_in_schedule_returns_err_when_event_channel_closed() {
let (session_ref, event_receiver) = create_test_session_ref();
drop(event_receiver);

let result = session_ref.await_in_schedule().await;
assert!(matches!(result, Err(SessionGone(_))));
}
}
32 changes: 32 additions & 0 deletions crates/hotfix/src/session/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use crate::config::SessionConfig;
use crate::message::{Message, OutboundMessage};
use crate::session::admin_request::AdminRequest;
use crate::session::ctx::SessionCtx;
use crate::session::event::SessionEvent;
use crate::session::session_ref::{InternalSessionRef, OutboundRequest};
use crate::store::{MessageStore, Result as StoreResult};
use crate::transport::writer::{WriterMessage, WriterRef};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -117,3 +121,31 @@ pub(crate) fn extract_field(raw: &[u8], tag: u32) -> Option<String> {
}
None
}

#[derive(Clone)]
pub(crate) struct TestMessage;

impl OutboundMessage for TestMessage {
fn write(&self, _msg: &mut Message) {}
fn message_type(&self) -> &str {
"TEST"
}
}

pub(crate) fn create_test_session_ref() -> (
InternalSessionRef<TestMessage>,
mpsc::Receiver<SessionEvent>,
) {
let (event_sender, event_receiver) = mpsc::channel::<SessionEvent>(100);
let (outbound_message_sender, _outbound_receiver) =
mpsc::channel::<OutboundRequest<TestMessage>>(10);
let (admin_request_sender, _admin_receiver) = mpsc::channel::<AdminRequest>(10);

let session_ref = InternalSessionRef {
event_sender,
outbound_message_sender,
admin_request_sender,
};

(session_ref, event_receiver)
}
35 changes: 1 addition & 34 deletions crates/hotfix/src/transport/socket/socket_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,42 +84,9 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::message::Message;
use crate::session::admin_request::AdminRequest;
use crate::session::event::SessionEvent;
use crate::session::session_ref::OutboundRequest;
use crate::session::test_utils::create_test_session_ref;
use tokio::io::{AsyncWriteExt, duplex};
use tokio::sync::mpsc;

#[derive(Clone, Debug, PartialEq)]
struct TestMessage;

impl OutboundMessage for TestMessage {
fn write(&self, _msg: &mut Message) {}

fn message_type(&self) -> &str {
"TEST"
}
}

/// Creates a test InternalSessionRef that captures events for verification
fn create_test_session_ref() -> (
InternalSessionRef<TestMessage>,
mpsc::Receiver<SessionEvent>,
) {
let (event_sender, event_receiver) = mpsc::channel::<SessionEvent>(100);
let (outbound_message_sender, _outbound_receiver) =
mpsc::channel::<OutboundRequest<TestMessage>>(10);
let (admin_request_sender, _admin_receiver) = mpsc::channel::<AdminRequest>(10);

let session_ref = InternalSessionRef {
event_sender,
outbound_message_sender,
admin_request_sender,
};

(session_ref, event_receiver)
}

/// Test that the reader correctly parses a valid FIX message and sends it to the session
/// for processing.
Expand Down
Loading