Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion crates/hotfix/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::session::admin_request::AdminRequest;
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
use crate::session::error::SessionCreationError;
use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError};
pub use crate::session::error::{SendError, SendOutcome};
pub use crate::session::error::{SendError, SendOutcome, SetNextTargetSeqNumError};
pub use crate::session::info::{SessionInfo, Status};
pub use crate::session::session_handle::SessionHandle;
#[cfg(not(feature = "test-utils"))]
Expand Down Expand Up @@ -568,6 +568,22 @@ where
warn!("resetting sequence numbers on next logon");
self.reset_on_next_logon = true;
}
AdminRequest::SetNextTargetSeqNum { seq_num, responder } => {
let response = self
.state
.try_set_next_target_seq_num(&mut self.ctx, seq_num)
.await;
if let Err(ref err) = response {
warn!(
?err,
seq_num = seq_num.get(),
"SetNextTargetSeqNum rejected"
);
}
if responder.send(response).is_err() {
error!("failed to respond to SetNextTargetSeqNum request");
}
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions crates/hotfix/src/session/admin_request.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::session::SessionInfo;
use crate::session::error::SetNextTargetSeqNumError;
use std::num::NonZeroU64;
use tokio::sync::oneshot;

/// Administrative actions exposed to users of the engine to control the session.
Expand All @@ -13,4 +15,10 @@ pub enum AdminRequest {
/// which can be used to re-synchronise our state with the counterparty in
/// unfortunate scenarios where such drastic recover is required.
ResetSequenceNumbersOnNextLogon,
/// Set the next expected target sequence number. Permitted only while the
/// session is `Disconnected` — see `SetNextTargetSeqNumError::InvalidState`.
SetNextTargetSeqNum {
seq_num: NonZeroU64,
responder: oneshot::Sender<Result<(), SetNextTargetSeqNumError>>,
},
}
17 changes: 17 additions & 0 deletions crates/hotfix/src/session/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,23 @@ impl<T> InternalSendResultExt<T> for Result<T, InternalSendError> {
}
}

/// Error returned when setting the next expected target sequence number via admin.
#[derive(Debug, Error)]
pub enum SetNextTargetSeqNumError {
/// The session was not in a state where the target sequence number can be
/// safely adjusted. Only permitted while `Disconnected`.
#[error("operation rejected in state {current:?}; only permitted while disconnected")]
InvalidState { current: crate::session::Status },

/// Channel-level failure — the session task is gone or the responder was dropped.
#[error(transparent)]
Send(#[from] SendError),

/// Underlying store write failed.
#[error(transparent)]
Store(#[from] StoreError),
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
23 changes: 22 additions & 1 deletion crates/hotfix/src/session/session_handle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::session::admin_request::AdminRequest;
use crate::session::error::{SendError, SendOutcome};
use crate::session::error::{SendError, SendOutcome, SetNextTargetSeqNumError};
use crate::session::session_ref::OutboundRequest;
use crate::session::{InternalSessionRef, SessionInfo};
use std::num::NonZeroU64;
use tokio::sync::{mpsc, oneshot};

/// A public handle to the session that can be used to interact with the session.
Expand Down Expand Up @@ -66,6 +67,26 @@ impl<Outbound> SessionHandle<Outbound> {
.await?;
Ok(())
}

/// Sets the next expected target sequence number.
///
/// Permitted only while the session is `Disconnected`. Use this to realign
/// after a counterparty-initiated sequence reset without forcing a bilateral
/// reset — the peer's subsequent `ResendRequest` is handled by the existing
/// resend/gap-fill logic.
pub async fn set_next_target_seq_num(
&self,
seq_num: NonZeroU64,
) -> Result<(), SetNextTargetSeqNumError> {
let (responder, receiver) = oneshot::channel();
self.admin_request_sender
.send(AdminRequest::SetNextTargetSeqNum { seq_num, responder })
.await
.map_err(|_| SetNextTargetSeqNumError::Send(SendError::SessionGone))?;
receiver
.await
.map_err(|_| SetNextTargetSeqNumError::Send(SendError::SessionGone))?
}
}

impl<M> From<InternalSessionRef<M>> for SessionHandle<M> {
Expand Down
33 changes: 32 additions & 1 deletion crates/hotfix/src/session/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use crate::message::logon::Logon;
use crate::message::logout::Logout;
use crate::message::verification::VerificationFlags;
use crate::session::ctx::{PreProcessDecision, SessionCtx, TransitionResult, VerificationResult};
use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError};
use crate::session::error::{
InternalSendError, InternalSendResultExt, SessionOperationError, SetNextTargetSeqNumError,
};
use crate::session::event::ScheduleResponse;
use crate::session::info::Status as SessionInfoStatus;
use crate::transport::writer::WriterRef;
use hotfix_message::message::Message;
use hotfix_store::MessageStore;
use std::num::NonZeroU64;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::Instant;
Expand Down Expand Up @@ -247,6 +250,34 @@ impl SessionState {
}
}

/// Set the next expected target sequence number.
pub(crate) async fn try_set_next_target_seq_num<A, S>(
&self,
ctx: &mut SessionCtx<A, S>,
seq_num: NonZeroU64,
) -> Result<(), SetNextTargetSeqNumError>
where
A: Application,
S: MessageStore,
{
// Only permitted while `Disconnected` — any other state returns `InvalidState`.
match self {
SessionState::Disconnected(_) => {
// The store stores "last seen" (see `inbound::on_sequence_reset` passing `end - 1`),
// so we subtract 1 to make `next_target_seq_number()` return `seq_num`.
let target_seq_num = seq_num.get() - 1;

ctx.store
.set_target_seq_number(target_seq_num)
.await
.map_err(SetNextTargetSeqNumError::from)
}
_ => Err(SetNextTargetSeqNumError::InvalidState {
current: self.as_status(),
}),
}
}

/// Sends a logout message and puts the session state into an [`AwaitingLogout`] state.
///
/// The session waits for a configurable timeout period for the counterparty to
Expand Down
Loading
Loading