diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 21fcb3f..0165324 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -39,7 +39,7 @@ use crate::session::admin_request::AdminRequest; use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult}; use crate::session::error::SessionCreationError; use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError}; -pub use crate::session::error::{SendError, SendOutcome}; +pub use crate::session::error::{SendError, SendOutcome, SetNextTargetSeqNumError}; pub use crate::session::info::{SessionInfo, Status}; pub use crate::session::session_handle::SessionHandle; #[cfg(not(feature = "test-utils"))] @@ -568,6 +568,22 @@ where warn!("resetting sequence numbers on next logon"); self.reset_on_next_logon = true; } + AdminRequest::SetNextTargetSeqNum { seq_num, responder } => { + let response = self + .state + .try_set_next_target_seq_num(&mut self.ctx, seq_num) + .await; + if let Err(ref err) = response { + warn!( + ?err, + seq_num = seq_num.get(), + "SetNextTargetSeqNum rejected" + ); + } + if responder.send(response).is_err() { + error!("failed to respond to SetNextTargetSeqNum request"); + } + } } } diff --git a/crates/hotfix/src/session/admin_request.rs b/crates/hotfix/src/session/admin_request.rs index 89c1228..1acd4d6 100644 --- a/crates/hotfix/src/session/admin_request.rs +++ b/crates/hotfix/src/session/admin_request.rs @@ -1,4 +1,6 @@ use crate::session::SessionInfo; +use crate::session::error::SetNextTargetSeqNumError; +use std::num::NonZeroU64; use tokio::sync::oneshot; /// Administrative actions exposed to users of the engine to control the session. @@ -13,4 +15,10 @@ pub enum AdminRequest { /// which can be used to re-synchronise our state with the counterparty in /// unfortunate scenarios where such drastic recover is required. ResetSequenceNumbersOnNextLogon, + /// Set the next expected target sequence number. Permitted only while the + /// session is `Disconnected` — see `SetNextTargetSeqNumError::InvalidState`. + SetNextTargetSeqNum { + seq_num: NonZeroU64, + responder: oneshot::Sender>, + }, } diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index 50d3b29..5f98379 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -128,6 +128,23 @@ impl InternalSendResultExt for Result { } } +/// Error returned when setting the next expected target sequence number via admin. +#[derive(Debug, Error)] +pub enum SetNextTargetSeqNumError { + /// The session was not in a state where the target sequence number can be + /// safely adjusted. Only permitted while `Disconnected`. + #[error("operation rejected in state {current:?}; only permitted while disconnected")] + InvalidState { current: crate::session::Status }, + + /// Channel-level failure — the session task is gone or the responder was dropped. + #[error(transparent)] + Send(#[from] SendError), + + /// Underlying store write failed. + #[error(transparent)] + Store(#[from] StoreError), +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/hotfix/src/session/session_handle.rs b/crates/hotfix/src/session/session_handle.rs index 6a5e710..05f0f70 100644 --- a/crates/hotfix/src/session/session_handle.rs +++ b/crates/hotfix/src/session/session_handle.rs @@ -1,7 +1,8 @@ use crate::session::admin_request::AdminRequest; -use crate::session::error::{SendError, SendOutcome}; +use crate::session::error::{SendError, SendOutcome, SetNextTargetSeqNumError}; use crate::session::session_ref::OutboundRequest; use crate::session::{InternalSessionRef, SessionInfo}; +use std::num::NonZeroU64; use tokio::sync::{mpsc, oneshot}; /// A public handle to the session that can be used to interact with the session. @@ -66,6 +67,26 @@ impl SessionHandle { .await?; Ok(()) } + + /// Sets the next expected target sequence number. + /// + /// Permitted only while the session is `Disconnected`. Use this to realign + /// after a counterparty-initiated sequence reset without forcing a bilateral + /// reset — the peer's subsequent `ResendRequest` is handled by the existing + /// resend/gap-fill logic. + pub async fn set_next_target_seq_num( + &self, + seq_num: NonZeroU64, + ) -> Result<(), SetNextTargetSeqNumError> { + let (responder, receiver) = oneshot::channel(); + self.admin_request_sender + .send(AdminRequest::SetNextTargetSeqNum { seq_num, responder }) + .await + .map_err(|_| SetNextTargetSeqNumError::Send(SendError::SessionGone))?; + receiver + .await + .map_err(|_| SetNextTargetSeqNumError::Send(SendError::SessionGone))? + } } impl From> for SessionHandle { diff --git a/crates/hotfix/src/session/state.rs b/crates/hotfix/src/session/state.rs index ce315da..de9c6d2 100644 --- a/crates/hotfix/src/session/state.rs +++ b/crates/hotfix/src/session/state.rs @@ -16,12 +16,15 @@ use crate::message::logon::Logon; use crate::message::logout::Logout; use crate::message::verification::VerificationFlags; use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult}; -use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError}; +use crate::session::error::{ + InternalSendError, InternalSendResultExt, SessionOperationError, SetNextTargetSeqNumError, +}; use crate::session::event::ScheduleResponse; use crate::session::info::Status as SessionInfoStatus; use crate::transport::writer::WriterRef; use hotfix_message::message::Message; use hotfix_store::MessageStore; +use std::num::NonZeroU64; use std::time::Duration; use tokio::sync::oneshot; use tokio::time::Instant; @@ -247,6 +250,34 @@ impl SessionState { } } + /// Set the next expected target sequence number. + pub(crate) async fn try_set_next_target_seq_num( + &self, + ctx: &mut SessionCtx, + seq_num: NonZeroU64, + ) -> Result<(), SetNextTargetSeqNumError> + where + A: Application, + S: MessageStore, + { + // Only permitted while `Disconnected` — any other state returns `InvalidState`. + match self { + SessionState::Disconnected(_) => { + // The store stores "last seen" (see `inbound::on_sequence_reset` passing `end - 1`), + // so we subtract 1 to make `next_target_seq_number()` return `seq_num`. + let target_seq_num = seq_num.get() - 1; + + ctx.store + .set_target_seq_number(target_seq_num) + .await + .map_err(SetNextTargetSeqNumError::from) + } + _ => Err(SetNextTargetSeqNumError::InvalidState { + current: self.as_status(), + }), + } + } + /// Sends a logout message and puts the session state into an [`AwaitingLogout`] state. /// /// The session waits for a configurable timeout period for the counterparty to diff --git a/crates/hotfix/tests/session_test_cases/admin_request_tests.rs b/crates/hotfix/tests/session_test_cases/admin_request_tests.rs index d8e709d..57583b6 100644 --- a/crates/hotfix/tests/session_test_cases/admin_request_tests.rs +++ b/crates/hotfix/tests/session_test_cases/admin_request_tests.rs @@ -2,9 +2,10 @@ use crate::common::actions::when; use crate::common::assertions::{assert_msg_type, then}; use crate::common::cleanup::finally; use crate::common::setup::given_an_active_session; -use hotfix::session::Status; +use hotfix::session::{SetNextTargetSeqNumError, Status}; use hotfix_message::Part; use hotfix_message::fix44::{MsgType, RESET_SEQ_NUM_FLAG}; +use std::num::NonZeroU64; /// Tests that we can request the session to reset sequence numbers once. /// @@ -72,3 +73,241 @@ async fn test_reset_sequence_numbers_once() { finally(&session, &mut counterparty).disconnect().await; } + +/// Happy path: while `Disconnected`, setting the next expected target sequence +/// number succeeds and the new value is visible via session info. +#[tokio::test] +async fn test_set_next_target_seq_num_while_disconnected() { + let session = crate::common::setup::given_a_disconnected_session(); + + let new_target = NonZeroU64::new(42).expect("42 is non-zero"); + session + .session_handle() + .set_next_target_seq_num(new_target) + .await + .expect("set_next_target_seq_num to succeed"); + + let info = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + assert_eq!(info.next_target_seq_number, 42); +} + +/// Rejection: while `Active`, SetNextTargetSeqNum is refused and the store is +/// untouched. +#[tokio::test] +async fn test_set_next_target_seq_num_rejected_while_active() { + let (session, mut counterparty) = crate::common::setup::given_an_active_session().await; + + let info_before = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + + let result = session + .session_handle() + .set_next_target_seq_num(NonZeroU64::new(42).expect("42 is non-zero")) + .await; + + assert!( + matches!( + result, + Err(SetNextTargetSeqNumError::InvalidState { + current: Status::Active + }) + ), + "expected InvalidState{{ current: Active }}, got: {result:?}" + ); + + let info_after = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + assert_eq!( + info_after.next_target_seq_number, info_before.next_target_seq_number, + "target sequence number should not change on rejection" + ); + + crate::common::cleanup::finally(&session, &mut counterparty) + .disconnect() + .await; +} + +/// Rejection: while `AwaitingLogon` (we've sent our Logon, peer hasn't responded), +/// SetNextTargetSeqNum is refused and the store is untouched. +#[tokio::test] +async fn test_set_next_target_seq_num_rejected_while_awaiting_logon() { + let (session, mut counterparty) = crate::common::setup::given_a_connected_session().await; + + // wait for our outbound Logon so we're deterministically in AwaitingLogon + crate::common::assertions::then(&mut counterparty) + .receives(|msg| { + crate::common::assertions::assert_msg_type(msg, hotfix_message::fix44::MsgType::Logon) + }) + .await; + + let info_before = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + + let result = session + .session_handle() + .set_next_target_seq_num(NonZeroU64::new(42).expect("42 is non-zero")) + .await; + + assert!( + matches!( + result, + Err(SetNextTargetSeqNumError::InvalidState { + current: Status::AwaitingLogon + }) + ), + "expected InvalidState{{ current: AwaitingLogon }}, got: {result:?}" + ); + + let info_after = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + assert_eq!( + info_after.next_target_seq_number, + info_before.next_target_seq_number + ); + + crate::common::cleanup::finally(&session, &mut counterparty) + .disconnect() + .await; +} + +/// Rejection: while `AwaitingLogout` (we've sent our Logout and are waiting +/// for the peer's reply), SetNextTargetSeqNum is refused and the store is +/// untouched. +#[tokio::test] +async fn test_set_next_target_seq_num_rejected_while_awaiting_logout() { + use crate::common::actions::when; + use crate::common::assertions::{assert_msg_type, then}; + use hotfix::message::logout::Logout; + use hotfix_message::fix44::MsgType; + + let (session, mut counterparty) = crate::common::setup::given_an_active_session().await; + + // initiate logout from our side — we stay in AwaitingLogout until the peer replies + when(&session).requests_disconnect().await; + then(&mut counterparty) + .receives(|msg| assert_msg_type(msg, MsgType::Logout)) + .await; + + let info_before = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + + let result = session + .session_handle() + .set_next_target_seq_num(NonZeroU64::new(42).expect("42 is non-zero")) + .await; + + assert!( + matches!( + result, + Err(SetNextTargetSeqNumError::InvalidState { + current: Status::AwaitingLogout + }) + ), + "expected InvalidState{{ current: AwaitingLogout }}, got: {result:?}" + ); + + let info_after = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + assert_eq!( + info_after.next_target_seq_number, + info_before.next_target_seq_number + ); + + // let the peer reply so the session cleans up (do NOT call finally().disconnect() + // — we're already in logout). + when(&mut counterparty) + .sends_message(Logout::default()) + .await; + then(&mut counterparty).gets_disconnected().await; +} + +/// Rejection: while `AwaitingResend` (we detected a gap and asked the peer to +/// resend), SetNextTargetSeqNum is refused and the store is untouched. +#[tokio::test] +async fn test_set_next_target_seq_num_rejected_while_awaiting_resend() { + use crate::common::actions::when; + use crate::common::test_messages::TestMessage; + + let (mut session, mut counterparty) = crate::common::setup::given_an_active_session().await; + + // create a gap so the session transitions to AwaitingResend + when(&mut counterparty) + .has_previously_sent(TestMessage::dummy_execution_report()) + .await; + when(&mut counterparty) + .sends_message(TestMessage::dummy_execution_report()) + .await; + + crate::common::assertions::then(&mut session) + .status_changes_to(Status::AwaitingResend { + begin: 2, + end: 3, + attempts: 1, + }) + .await; + crate::common::assertions::then(&mut counterparty) + .receives(|msg| { + crate::common::assertions::assert_msg_type( + msg, + hotfix_message::fix44::MsgType::ResendRequest, + ) + }) + .await; + + let info_before = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + + let result = session + .session_handle() + .set_next_target_seq_num(NonZeroU64::new(42).expect("42 is non-zero")) + .await; + + assert!( + matches!( + result, + Err(SetNextTargetSeqNumError::InvalidState { + current: Status::AwaitingResend { .. } + }) + ), + "expected InvalidState{{ current: AwaitingResend{{..}} }}, got: {result:?}" + ); + + let info_after = session + .session_handle() + .get_session_info() + .await + .expect("session info"); + assert_eq!( + info_after.next_target_seq_number, + info_before.next_target_seq_number + ); + + crate::common::cleanup::finally(&session, &mut counterparty) + .disconnect() + .await; +}