diff --git a/agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.NyxRelayStreaming.cs b/agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.NyxRelayStreaming.cs index e873b97be..92633d86f 100644 --- a/agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.NyxRelayStreaming.cs +++ b/agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.NyxRelayStreaming.cs @@ -39,6 +39,11 @@ private enum NyxRelayStreamingGuardSource Finalize, } + private sealed record NyxRelayTextOperationInFlight( + NyxRelayTextOperationKind Operation, + long Sequence, + long Generation); + /// /// Actor-scoped streaming state for one conversation turn, backed by /// ConversationGAgentState.ActiveReplyLifecycles. @@ -48,10 +53,27 @@ private sealed record NyxRelayStreamingState( string? PlatformMessageId, string LastFlushedText, int EditCount, - string? TerminalReason) + string? TerminalReason, + NyxRelayTextOperationInFlight? InFlight, + long OperationGeneration, + string? PendingAccumulatedText, + string? PendingFinalizeText, + string? PendingFinalizeCommandId, + LlmReplyTerminalState PendingTerminalState) { public static NyxRelayStreamingState Initial { get; } = - new(NyxRelayStreamingPhase.Idle, null, string.Empty, 0, null); + new( + NyxRelayStreamingPhase.Idle, + PlatformMessageId: null, + LastFlushedText: string.Empty, + EditCount: 0, + TerminalReason: null, + InFlight: null, + OperationGeneration: 0, + PendingAccumulatedText: null, + PendingFinalizeText: null, + PendingFinalizeCommandId: null, + PendingTerminalState: LlmReplyTerminalState.Unspecified); public bool AllowsInterimEdit => Phase is NyxRelayStreamingPhase.Idle @@ -76,9 +98,11 @@ or NyxRelayStreamingPhase.TerminalSucceeded private static bool IsLegalNyxRelayStreamingTransition(NyxRelayStreamingPhase from, NyxRelayStreamingPhase to) => (from, to) switch { + (NyxRelayStreamingPhase.Idle, NyxRelayStreamingPhase.Idle) => true, (NyxRelayStreamingPhase.Idle, NyxRelayStreamingPhase.PlaceholderSent) => true, (NyxRelayStreamingPhase.Idle, NyxRelayStreamingPhase.DisabledPreSend) => true, + (NyxRelayStreamingPhase.PlaceholderSent, NyxRelayStreamingPhase.PlaceholderSent) => true, (NyxRelayStreamingPhase.PlaceholderSent, NyxRelayStreamingPhase.Streaming) => true, (NyxRelayStreamingPhase.PlaceholderSent, NyxRelayStreamingPhase.SuppressingInterim) => true, (NyxRelayStreamingPhase.PlaceholderSent, NyxRelayStreamingPhase.TerminalSucceeded) => true, @@ -89,6 +113,7 @@ private static bool IsLegalNyxRelayStreamingTransition(NyxRelayStreamingPhase fr (NyxRelayStreamingPhase.Streaming, NyxRelayStreamingPhase.TerminalSucceeded) => true, (NyxRelayStreamingPhase.Streaming, NyxRelayStreamingPhase.TerminalPartial) => true, + (NyxRelayStreamingPhase.SuppressingInterim, NyxRelayStreamingPhase.SuppressingInterim) => true, (NyxRelayStreamingPhase.SuppressingInterim, NyxRelayStreamingPhase.TerminalSucceeded) => true, (NyxRelayStreamingPhase.SuppressingInterim, NyxRelayStreamingPhase.TerminalPartial) => true, @@ -109,7 +134,18 @@ private NyxRelayStreamingState GetOrInitNyxRelayStreamingState(string correlatio NormalizeOptional(lifecycle.PlatformMessageId), lifecycle.LastFlushedText ?? string.Empty, lifecycle.EditCount, - NormalizeOptional(lifecycle.TerminalReason)); + NormalizeOptional(lifecycle.TerminalReason), + lifecycle.NyxRelayInFlightOperation == NyxRelayTextOperationKind.Unspecified + ? null + : new NyxRelayTextOperationInFlight( + lifecycle.NyxRelayInFlightOperation, + lifecycle.NyxRelayInFlightSequence, + lifecycle.NyxRelayOperationGeneration), + lifecycle.NyxRelayOperationGeneration, + NormalizeOptional(lifecycle.PendingAccumulatedText), + NormalizeOptional(lifecycle.PendingFinalizeText), + NormalizeOptional(lifecycle.PendingFinalizeCommandId), + lifecycle.PendingNyxRelayTerminalState); } /// @@ -163,6 +199,13 @@ private async Task TransitionNyxRelayStreamingPhaseAsync var updated = carried with { Phase = next, + InFlight = IsTerminalNyxRelayStreamingPhase(next) ? null : carried.InFlight, + PendingAccumulatedText = IsTerminalNyxRelayStreamingPhase(next) ? null : carried.PendingAccumulatedText, + PendingFinalizeText = IsTerminalNyxRelayStreamingPhase(next) ? null : carried.PendingFinalizeText, + PendingFinalizeCommandId = IsTerminalNyxRelayStreamingPhase(next) ? null : carried.PendingFinalizeCommandId, + PendingTerminalState = IsTerminalNyxRelayStreamingPhase(next) + ? LlmReplyTerminalState.Unspecified + : carried.PendingTerminalState, TerminalReason = IsTerminalNyxRelayStreamingPhase(next) ? (terminalReason ?? carried.TerminalReason) : carried.TerminalReason, @@ -225,5 +268,41 @@ private static ConversationReplyLifecycleState ToLifecycleState( EditCount = state.EditCount, TerminalReason = state.TerminalReason ?? string.Empty, UpdatedAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + NyxRelayInFlightOperation = state.InFlight?.Operation ?? NyxRelayTextOperationKind.Unspecified, + NyxRelayInFlightSequence = state.InFlight?.Sequence ?? 0, + NyxRelayOperationGeneration = state.OperationGeneration, + PendingAccumulatedText = state.PendingAccumulatedText ?? string.Empty, + PendingFinalizeText = state.PendingFinalizeText ?? string.Empty, + PendingFinalizeCommandId = state.PendingFinalizeCommandId ?? string.Empty, + PendingNyxRelayTerminalState = state.PendingTerminalState, }; + + private long NextNyxRelayTextOperationGeneration(NyxRelayStreamingState state) => + Math.Max(state.OperationGeneration, state.InFlight?.Generation ?? 0) + 1; + + private static string BuildNyxRelayTextOperationTimeoutCallbackId( + string correlationId, + NyxRelayTextOperationKind operation, + long generation) => + $"conversation-nyx-relay-text:{correlationId}:{operation}:{generation}"; + + private static string BuildNyxRelayTextOperationId( + string correlationId, + NyxRelayTextOperationKind operation, + long sequence, + long generation) => + $"{correlationId}:{operation}:{sequence}:{generation}"; + + private static bool MatchesNyxRelayTextInFlight( + NyxRelayStreamingState state, + NyxRelayTextOperationKind operation, + long sequence, + long generation) + { + if (state.InFlight is not { } inFlight) + return false; + return inFlight.Operation == operation && + inFlight.Sequence == sequence && + inFlight.Generation == generation; + } } diff --git a/agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.cs b/agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.cs index b28385f0f..f5bd2e8dd 100644 --- a/agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.cs +++ b/agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.cs @@ -1,10 +1,12 @@ using Aevatar.ChatRouting.Abstractions; using Aevatar.ChatRouting.Core; +using Aevatar.Foundation.Abstractions; using Aevatar.Foundation.Abstractions.Attributes; using Aevatar.Foundation.Core; using Aevatar.Foundation.Core.EventSourcing; using Aevatar.GAgents.Channel.Abstractions; using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -815,6 +817,12 @@ private async Task HandleNyxRelayStreamingChunkCoreAsync(LlmReplyStreamChunkEven if (ShouldSkipNyxRelayStreamingForUnavailable(state, NyxRelayStreamingGuardSource.AcceptInterimChunk)) return; + if (state.InFlight is not null) + { + await PersistNyxRelayTextCoalescedStateAsync(correlationId, state, evt.AccumulatedText); + return; + } + var runtimeContext = BuildNyxRelayRuntimeContext( evt.CorrelationId, evt.Activity, @@ -833,67 +841,45 @@ await TransitionNyxRelayStreamingPhaseAsync( return; } - var runner = ResolveRunner(); - // Bound the upstream edit so a stuck relay/network can't pin the actor turn forever - // (PR #562 review). 10s matches the failure-path timeout below; the edit is best-effort, - // so timing out cleanly into the !result.Success branch preserves correctness. - using var streamChunkCts = new CancellationTokenSource(StreamingFailureUpdateTimeout); - var result = await runner.RunStreamChunkAsync( - evt, - state.PlatformMessageId, - runtimeContext, - streamChunkCts.Token); - if (!result.Success) - { - if (state.AllowsFinalEdit) - { - // First chunk already consumed the reply token. Skip further interim edits but - // preserve PlatformMessageId so the final edit on LlmReplyReady can still try - // to reconcile the user-visible message. Falling back to /reply would reuse a - // dead token. - Logger.LogInformation( - "Streaming interim edit failed after token consumed; suppressing interim edits, final edit will still be attempted. correlation={CorrelationId}, code={Code}, editUnsupported={EditUnsupported}", - evt.CorrelationId, - result.ErrorCode, - result.EditUnsupported); - await TransitionNyxRelayStreamingPhaseAsync( - correlationId, - state, - NyxRelayStreamingPhase.SuppressingInterim, - terminalReason: $"interim_edit_failed:{result.ErrorCode}"); - } - else - { - // First send itself failed, so the reply token is still usable. Let - // LlmReplyReady fall back to a single-shot /reply via RunLlmReplyAsync. - Logger.LogInformation( - "Streaming initial send failed before token consumed; disabling streaming and allowing /reply fallback. correlation={CorrelationId}, code={Code}, editUnsupported={EditUnsupported}", - evt.CorrelationId, - result.ErrorCode, - result.EditUnsupported); - await TransitionNyxRelayStreamingPhaseAsync( - correlationId, - state, - NyxRelayStreamingPhase.DisabledPreSend, - terminalReason: $"first_send_failed:{result.ErrorCode}"); - } - return; - } - - var isFirstChunk = state.Phase == NyxRelayStreamingPhase.Idle; - var newPlatformMessageId = string.IsNullOrWhiteSpace(result.PlatformMessageId) - ? state.PlatformMessageId - : result.PlatformMessageId; + var sequence = state.EditCount + 1L; + var generation = NextNyxRelayTextOperationGeneration(state); await TransitionNyxRelayStreamingPhaseAsync( correlationId, state, - isFirstChunk ? NyxRelayStreamingPhase.PlaceholderSent : NyxRelayStreamingPhase.Streaming, + state.Phase, fieldUpdate: s => s with { - PlatformMessageId = newPlatformMessageId, - LastFlushedText = evt.AccumulatedText, - EditCount = isFirstChunk ? 0 : s.EditCount + 1, + InFlight = new NyxRelayTextOperationInFlight( + NyxRelayTextOperationKind.Interim, + sequence, + generation), + OperationGeneration = generation, + PendingAccumulatedText = evt.AccumulatedText, }); + await ScheduleNyxRelayTextOperationTimeoutAsync( + correlationId, + NyxRelayTextOperationKind.Interim, + sequence, + generation, + evt, + state.PlatformMessageId, + commandId: string.Empty, + finalText: string.Empty, + lastFlushedText: state.LastFlushedText, + editCount: state.EditCount, + CancellationToken.None); + StartNyxRelayTextOperation( + NyxRelayTextOperationKind.Interim, + evt, + correlationId, + state.PlatformMessageId, + commandId: string.Empty, + finalText: string.Empty, + lastFlushedText: state.LastFlushedText, + editCount: state.EditCount, + sequence, + generation, + runtimeContext); } private async Task TryCompleteStreamedReplyAsync( @@ -920,6 +906,34 @@ private async Task TryCompleteStreamedReplyAsync( var platformMessageId = state.PlatformMessageId!; + if (state.InFlight is not null) + { + if (evt.TerminalState == LlmReplyTerminalState.Failed) + { + var failureText = NormalizeOptional(evt.Outbound?.Text) + ?? NormalizeOptional(evt.ErrorSummary) + ?? "Sorry, the reply failed. Please try again."; + await PersistNyxRelayTextCoalescedStateAsync( + correlationId, + state, + finalizeText: failureText, + finalizeCommandId: commandId, + terminalState: LlmReplyTerminalState.Failed); + return true; + } + + if (evt.TerminalState == LlmReplyTerminalState.Completed) + { + await PersistNyxRelayTextCoalescedStateAsync( + correlationId, + state, + finalizeText: evt.Outbound?.Text ?? string.Empty, + finalizeCommandId: commandId, + terminalState: LlmReplyTerminalState.Completed); + return true; + } + } + // Streaming-start already consumed the reply token. On Failed, falling through to // RunLlmReplyAsync would issue a fresh /reply against the dead token and surface // as `401 Reply token already used` to NyxID — leaving the user staring at the @@ -931,7 +945,6 @@ private async Task TryCompleteStreamedReplyAsync( var failureText = NormalizeOptional(evt.Outbound?.Text) ?? NormalizeOptional(evt.ErrorSummary) ?? "Sorry, the reply failed. Please try again."; - var runner = ResolveRunner(); var failureChunk = new LlmReplyStreamChunkEvent { CorrelationId = evt.CorrelationId, @@ -940,43 +953,44 @@ private async Task TryCompleteStreamedReplyAsync( AccumulatedText = failureText, ChunkAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; - using var failureUpdateCts = new CancellationTokenSource(StreamingFailureUpdateTimeout); - var failureResult = await runner.RunStreamChunkAsync( - failureChunk, - platformMessageId, - runtimeContext, - failureUpdateCts.Token); - if (failureResult.Success) - { - Logger.LogWarning( - "LLM reply failed after streaming-start; updated placeholder with failure text. correlation={CorrelationId}, errorCode={ErrorCode}, platformMessageId={PlatformMessageId}", - evt.CorrelationId, - evt.ErrorCode, - platformMessageId); - await TransitionNyxRelayStreamingPhaseAsync( - correlationId, - state, - NyxRelayStreamingPhase.TerminalSucceeded, - terminalReason: $"failed_self_heal:{evt.ErrorCode}"); - await PersistStreamedCompletionAsync(evt, commandId, referenceActivity, platformMessageId, failureText, state.EditCount + 1); - return true; - } - - // Edit failed too (rare — Lark may reject a message edit for unrelated reasons). - // Falling back to /reply would still hit the dead token, so persist the last - // flushed partial as terminal. The user sees the partial (potentially empty) - // but we don't spin on a guaranteed 401. - Logger.LogWarning( - "Streaming LLM failure-update could not edit placeholder; persisting last flushed partial as terminal. correlation={CorrelationId}, code={Code}, platformMessageId={PlatformMessageId}", - evt.CorrelationId, - failureResult.ErrorCode, - platformMessageId); + var sequence = state.EditCount + 1L; + var generation = NextNyxRelayTextOperationGeneration(state); await TransitionNyxRelayStreamingPhaseAsync( correlationId, state, - NyxRelayStreamingPhase.TerminalPartial, - terminalReason: $"failed_self_heal_edit_failed:{failureResult.ErrorCode}"); - await PersistStreamedCompletionAsync(evt, commandId, referenceActivity, platformMessageId, state.LastFlushedText, state.EditCount); + state.Phase, + fieldUpdate: s => s with + { + InFlight = new NyxRelayTextOperationInFlight( + NyxRelayTextOperationKind.FailureSelfHeal, + sequence, + generation), + OperationGeneration = generation, + }); + await ScheduleNyxRelayTextOperationTimeoutAsync( + correlationId, + NyxRelayTextOperationKind.FailureSelfHeal, + sequence, + generation, + failureChunk, + platformMessageId, + commandId, + finalText: failureText, + lastFlushedText: state.LastFlushedText, + editCount: state.EditCount, + CancellationToken.None); + StartNyxRelayTextOperation( + NyxRelayTextOperationKind.FailureSelfHeal, + failureChunk, + correlationId, + platformMessageId, + commandId, + finalText: failureText, + lastFlushedText: state.LastFlushedText, + editCount: state.EditCount, + sequence, + generation, + runtimeContext); return true; } @@ -1005,7 +1019,6 @@ await TransitionNyxRelayStreamingPhaseAsync( var edits = state.EditCount; if (!string.Equals(finalText, state.LastFlushedText, StringComparison.Ordinal)) { - var runner = ResolveRunner(); var finalChunk = new LlmReplyStreamChunkEvent { CorrelationId = evt.CorrelationId, @@ -1014,44 +1027,603 @@ await TransitionNyxRelayStreamingPhaseAsync( AccumulatedText = finalText, ChunkAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; - using var finalChunkCts = new CancellationTokenSource(StreamingFailureUpdateTimeout); - var finalResult = await runner.RunStreamChunkAsync( + var sequence = state.EditCount + 1L; + var generation = NextNyxRelayTextOperationGeneration(state); + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + state.Phase, + fieldUpdate: s => s with + { + InFlight = new NyxRelayTextOperationInFlight( + NyxRelayTextOperationKind.Final, + sequence, + generation), + OperationGeneration = generation, + }); + await ScheduleNyxRelayTextOperationTimeoutAsync( + correlationId, + NyxRelayTextOperationKind.Final, + sequence, + generation, finalChunk, platformMessageId, - runtimeContext, - finalChunkCts.Token); - if (!finalResult.Success) + commandId, + finalText, + state.LastFlushedText, + state.EditCount, + CancellationToken.None); + StartNyxRelayTextOperation( + NyxRelayTextOperationKind.Final, + finalChunk, + correlationId, + platformMessageId, + commandId, + finalText, + state.LastFlushedText, + state.EditCount, + sequence, + generation, + runtimeContext); + return true; + } + + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.TerminalSucceeded, + terminalReason: "completed"); + await PersistStreamedCompletionAsync(evt, commandId, referenceActivity, platformMessageId, finalText, edits); + return true; + } + + private Task PersistNyxRelayTextCoalescedStateAsync( + string correlationId, + NyxRelayStreamingState state, + string? accumulatedText = null, + string? finalizeText = null, + string? finalizeCommandId = null, + LlmReplyTerminalState terminalState = LlmReplyTerminalState.Unspecified) => + TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + state.Phase, + fieldUpdate: s => s with + { + PendingAccumulatedText = NormalizeOptional(accumulatedText) ?? s.PendingAccumulatedText, + PendingFinalizeText = NormalizeOptional(finalizeText) ?? s.PendingFinalizeText, + PendingFinalizeCommandId = NormalizeOptional(finalizeCommandId) ?? s.PendingFinalizeCommandId, + PendingTerminalState = terminalState == LlmReplyTerminalState.Unspecified + ? s.PendingTerminalState + : terminalState, + }); + + private async Task ScheduleNyxRelayTextOperationTimeoutAsync( + string correlationId, + NyxRelayTextOperationKind operation, + long sequence, + long generation, + LlmReplyStreamChunkEvent chunk, + string? currentPlatformMessageId, + string? commandId, + string? finalText, + string? lastFlushedText, + int editCount, + CancellationToken ct) + { + await ScheduleSelfDurableTimeoutAsync( + BuildNyxRelayTextOperationTimeoutCallbackId(correlationId, operation, generation), + StreamingFailureUpdateTimeout, + new NyxRelayTextOperationTimeoutFiredEvent { - // The reply token was already consumed by the first chunk, so falling back to - // a fresh /reply via RunLlmReplyAsync would reuse a dead JTI and surface as 401 - // to the user. Persist the last flushed partial as the terminal state instead — - // the user sees the stale partial, but we don't spin on a guaranteed-failing - // send. Retries cannot help here. - Logger.LogWarning( - "Streaming final flush failed after token consumed; persisting last flushed partial as terminal. correlation={CorrelationId}, code={Code}, platformMessageId={PlatformMessageId}", + CorrelationId = correlationId, + Operation = operation, + Sequence = sequence, + OperationGeneration = generation, + Chunk = CloneNyxRelayTextTimeoutChunkForDurableState(chunk), + CurrentPlatformMessageId = currentPlatformMessageId ?? string.Empty, + CommandId = commandId ?? string.Empty, + FinalText = finalText ?? string.Empty, + LastFlushedText = lastFlushedText ?? string.Empty, + EditCount = editCount, + FiredAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }, + ct: ct); + } + + private static LlmReplyStreamChunkEvent CloneNyxRelayTextTimeoutChunkForDurableState( + LlmReplyStreamChunkEvent chunk) => + new() + { + CorrelationId = chunk.CorrelationId ?? string.Empty, + RegistrationId = chunk.RegistrationId ?? string.Empty, + Activity = CloneForDurableState(chunk.Activity) ?? new ChatActivity(), + AccumulatedText = chunk.AccumulatedText ?? string.Empty, + ChunkAtUnixMs = chunk.ChunkAtUnixMs, + }; + + private void StartNyxRelayTextOperation( + NyxRelayTextOperationKind operation, + LlmReplyStreamChunkEvent chunk, + string correlationId, + string? currentPlatformMessageId, + string? commandId, + string? finalText, + string? lastFlushedText, + int editCount, + long sequence, + long generation, + ConversationTurnRuntimeContext runtimeContext) + { + var runner = ResolveRunner(); + _ = Task.Run(() => ExecuteNyxRelayTextOperationAsync( + runner, + operation, + chunk.Clone(), + correlationId, + currentPlatformMessageId, + commandId, + finalText, + lastFlushedText, + editCount, + sequence, + generation, + runtimeContext)); + } + + private async Task ExecuteNyxRelayTextOperationAsync( + IConversationTurnRunner runner, + NyxRelayTextOperationKind operation, + LlmReplyStreamChunkEvent chunk, + string correlationId, + string? currentPlatformMessageId, + string? commandId, + string? finalText, + string? lastFlushedText, + int editCount, + long sequence, + long generation, + ConversationTurnRuntimeContext runtimeContext) + { + NyxRelayTextOperationCompletedEvent signal; + try + { + using var cts = new CancellationTokenSource(StreamingFailureUpdateTimeout); + var result = await runner.RunStreamChunkAsync( + chunk, + currentPlatformMessageId, + runtimeContext, + cts.Token) + .ConfigureAwait(false); + signal = new NyxRelayTextOperationCompletedEvent + { + OperationId = BuildNyxRelayTextOperationId(correlationId, operation, sequence, generation), + CorrelationId = correlationId, + Operation = operation, + Sequence = sequence, + OperationGeneration = generation, + State = result.Success + ? NyxRelayTextOperationResultState.Succeeded + : NyxRelayTextOperationResultState.Failed, + RawResult = ToRawResult(result), + Chunk = chunk, + CurrentPlatformMessageId = currentPlatformMessageId ?? string.Empty, + CommandId = commandId ?? string.Empty, + FinalText = finalText ?? string.Empty, + LastFlushedText = lastFlushedText ?? string.Empty, + EditCount = editCount, + }; + } + catch (Exception ex) + { + Logger.LogWarning(ex, "Nyx relay text operation executor threw. correlation={CorrelationId}, operation={Operation}", correlationId, operation); + signal = new NyxRelayTextOperationCompletedEvent + { + OperationId = BuildNyxRelayTextOperationId(correlationId, operation, sequence, generation), + CorrelationId = correlationId, + Operation = operation, + Sequence = sequence, + OperationGeneration = generation, + State = NyxRelayTextOperationResultState.Faulted, + RawResult = ToNyxRelayTextRawFault(ex), + Chunk = chunk, + CurrentPlatformMessageId = currentPlatformMessageId ?? string.Empty, + CommandId = commandId ?? string.Empty, + FinalText = finalText ?? string.Empty, + LastFlushedText = lastFlushedText ?? string.Empty, + EditCount = editCount, + }; + } + + await DispatchNyxRelayTextOperationCompletedSignalAsync(signal, correlationId, CancellationToken.None) + .ConfigureAwait(false); + } + + private async Task DispatchNyxRelayTextOperationCompletedSignalAsync( + NyxRelayTextOperationCompletedEvent evt, + string correlationId, + CancellationToken ct) + { + var dispatchPort = Services.GetService(); + if (dispatchPort is null) + { + Logger.LogWarning( + "IActorDispatchPort unavailable; cannot dispatch Nyx relay text operation signal. correlation={CorrelationId}", + correlationId); + return; + } + + await dispatchPort.DispatchAsync( + Id, + new EventEnvelope + { + Id = Guid.NewGuid().ToString("N"), + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + Payload = Any.Pack(evt), + Route = EnvelopeRouteSemantics.CreateDirect(Id, Id), + Propagation = new EnvelopePropagation { CorrelationId = correlationId }, + }, + ct) + .ConfigureAwait(false); + } + + private static NyxRelayTextOperationRawResult ToRawResult(ConversationStreamChunkResult result) => + new() + { + PlatformMessageId = result.PlatformMessageId ?? string.Empty, + EditUnsupported = result.EditUnsupported, + RawErrorCode = result.ErrorCode ?? string.Empty, + RawErrorSummary = result.ErrorSummary ?? string.Empty, + }; + + private static NyxRelayTextOperationRawResult ToNyxRelayTextRawFault(Exception ex) => + new() + { + ExceptionType = ex.GetType().Name, + ExceptionMessage = ex.Message, + }; + + private static ConversationStreamChunkResult ToStreamChunkResult(NyxRelayTextOperationCompletedEvent evt) + { + var raw = evt.RawResult ?? new NyxRelayTextOperationRawResult(); + if (evt.State == NyxRelayTextOperationResultState.Succeeded) + return ConversationStreamChunkResult.Succeeded(raw.PlatformMessageId); + + return ConversationStreamChunkResult.Failed( + evt.State == NyxRelayTextOperationResultState.Faulted + ? BuildNyxRelayTextFaultErrorCode(raw) + : raw.RawErrorCode, + evt.State == NyxRelayTextOperationResultState.Faulted + ? raw.ExceptionMessage + : raw.RawErrorSummary, + raw.EditUnsupported); + } + + private static string BuildNyxRelayTextFaultErrorCode(NyxRelayTextOperationRawResult raw) + { + var exceptionType = string.IsNullOrWhiteSpace(raw.ExceptionType) + ? "Exception" + : raw.ExceptionType; + return $"relay_text_threw:{exceptionType}"; + } + + [EventHandler] + public async Task HandleNyxRelayTextOperationCompletedAsync(NyxRelayTextOperationCompletedEvent evt) + { + ArgumentNullException.ThrowIfNull(evt); + var correlationId = NormalizeOptional(evt.CorrelationId); + if (correlationId is null) + return; + + var state = GetOrInitNyxRelayStreamingState(correlationId); + if (!MatchesNyxRelayTextInFlight(state, evt.Operation, evt.Sequence, evt.OperationGeneration)) + return; + + switch (evt.Operation) + { + case NyxRelayTextOperationKind.Interim: + await HandleNyxRelayTextInterimCompletionAsync(correlationId, state, evt); + return; + case NyxRelayTextOperationKind.FailureSelfHeal: + await HandleNyxRelayTextFailureSelfHealCompletionAsync(correlationId, state, evt); + return; + case NyxRelayTextOperationKind.Final: + await HandleNyxRelayTextFinalCompletionAsync(correlationId, state, evt); + return; + default: + return; + } + } + + private async Task HandleNyxRelayTextInterimCompletionAsync( + string correlationId, + NyxRelayStreamingState state, + NyxRelayTextOperationCompletedEvent evt) + { + var result = ToStreamChunkResult(evt); + if (!result.Success) + { + if (state.AllowsFinalEdit) + { + Logger.LogInformation( + "Streaming interim edit failed after token consumed; suppressing interim edits, final edit will still be attempted. correlation={CorrelationId}, code={Code}, editUnsupported={EditUnsupported}", evt.CorrelationId, - finalResult.ErrorCode, - platformMessageId); + result.ErrorCode, + result.EditUnsupported); await TransitionNyxRelayStreamingPhaseAsync( correlationId, state, - NyxRelayStreamingPhase.TerminalPartial, - terminalReason: $"final_edit_failed:{finalResult.ErrorCode}"); - await PersistStreamedCompletionAsync(evt, commandId, referenceActivity, platformMessageId, state.LastFlushedText, state.EditCount); - return true; + NyxRelayStreamingPhase.SuppressingInterim, + terminalReason: $"interim_edit_failed:{result.ErrorCode}", + fieldUpdate: s => s with { InFlight = null }); } - edits += 1; + else + { + Logger.LogInformation( + "Streaming initial send failed before token consumed; disabling streaming and allowing /reply fallback. correlation={CorrelationId}, code={Code}, editUnsupported={EditUnsupported}", + evt.CorrelationId, + result.ErrorCode, + result.EditUnsupported); + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.DisabledPreSend, + terminalReason: $"first_send_failed:{result.ErrorCode}", + fieldUpdate: s => s with { InFlight = null }); + } + return; + } + + var isFirstChunk = state.Phase == NyxRelayStreamingPhase.Idle; + var newPlatformMessageId = string.IsNullOrWhiteSpace(result.PlatformMessageId) + ? state.PlatformMessageId + : result.PlatformMessageId; + var ackedText = evt.Chunk?.AccumulatedText ?? state.PendingAccumulatedText ?? state.LastFlushedText; + var pendingText = string.Equals(state.PendingAccumulatedText, ackedText, StringComparison.Ordinal) + ? null + : state.PendingAccumulatedText; + var updated = await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + isFirstChunk ? NyxRelayStreamingPhase.PlaceholderSent : NyxRelayStreamingPhase.Streaming, + fieldUpdate: s => s with + { + PlatformMessageId = newPlatformMessageId, + LastFlushedText = ackedText, + EditCount = isFirstChunk ? 0 : s.EditCount + 1, + InFlight = null, + PendingAccumulatedText = pendingText, + }); + await ContinueNyxRelayTextCoalescedWorkAsync(correlationId, updated, evt.Chunk); + } + + private async Task HandleNyxRelayTextFailureSelfHealCompletionAsync( + string correlationId, + NyxRelayStreamingState state, + NyxRelayTextOperationCompletedEvent evt) + { + var result = ToStreamChunkResult(evt); + var platformMessageId = NormalizeOptional(evt.CurrentPlatformMessageId) ?? state.PlatformMessageId ?? string.Empty; + var commandId = NormalizeOptional(evt.CommandId) ?? state.PendingFinalizeCommandId ?? BuildLlmReplyCommandId(correlationId); + var failureText = state.PendingFinalizeText ?? evt.FinalText ?? evt.Chunk?.AccumulatedText ?? string.Empty; + if (result.Success) + { + Logger.LogWarning( + "LLM reply failed after streaming-start; updated placeholder with failure text. correlation={CorrelationId}, platformMessageId={PlatformMessageId}", + evt.CorrelationId, + platformMessageId); + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.TerminalSucceeded, + terminalReason: "failed_self_heal", + fieldUpdate: s => s with { InFlight = null }); + await PersistStreamedCompletionAsync(evt, commandId, platformMessageId, failureText, state.EditCount + 1); + return; + } + + Logger.LogWarning( + "Streaming LLM failure-update could not edit placeholder; persisting last flushed partial as terminal. correlation={CorrelationId}, code={Code}, platformMessageId={PlatformMessageId}", + evt.CorrelationId, + result.ErrorCode, + platformMessageId); + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.TerminalPartial, + terminalReason: $"failed_self_heal_edit_failed:{result.ErrorCode}", + fieldUpdate: s => s with { InFlight = null }); + await PersistStreamedCompletionAsync(evt, commandId, platformMessageId, state.LastFlushedText, state.EditCount); + } + + private async Task HandleNyxRelayTextFinalCompletionAsync( + string correlationId, + NyxRelayStreamingState state, + NyxRelayTextOperationCompletedEvent evt) + { + var result = ToStreamChunkResult(evt); + var platformMessageId = NormalizeOptional(evt.CurrentPlatformMessageId) ?? state.PlatformMessageId ?? string.Empty; + var commandId = NormalizeOptional(evt.CommandId) ?? state.PendingFinalizeCommandId ?? BuildLlmReplyCommandId(correlationId); + var finalText = state.PendingFinalizeText ?? evt.FinalText ?? evt.Chunk?.AccumulatedText ?? string.Empty; + if (!result.Success) + { + Logger.LogWarning( + "Streaming final flush failed after token consumed; persisting last flushed partial as terminal. correlation={CorrelationId}, code={Code}, platformMessageId={PlatformMessageId}", + evt.CorrelationId, + result.ErrorCode, + platformMessageId); + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.TerminalPartial, + terminalReason: $"final_edit_failed:{result.ErrorCode}", + fieldUpdate: s => s with { InFlight = null }); + await PersistStreamedCompletionAsync(evt, commandId, platformMessageId, state.LastFlushedText, state.EditCount); + return; } await TransitionNyxRelayStreamingPhaseAsync( correlationId, state, NyxRelayStreamingPhase.TerminalSucceeded, - terminalReason: "completed"); - await PersistStreamedCompletionAsync(evt, commandId, referenceActivity, platformMessageId, finalText, edits); - return true; + terminalReason: "completed", + fieldUpdate: s => s with + { + LastFlushedText = finalText, + EditCount = state.EditCount + 1, + InFlight = null, + }); + await PersistStreamedCompletionAsync(evt, commandId, platformMessageId, finalText, state.EditCount + 1); } + [EventHandler] + public async Task HandleNyxRelayTextOperationTimeoutFiredAsync(NyxRelayTextOperationTimeoutFiredEvent evt) + { + ArgumentNullException.ThrowIfNull(evt); + var correlationId = NormalizeOptional(evt.CorrelationId); + if (correlationId is null) + return; + + var state = GetOrInitNyxRelayStreamingState(correlationId); + if (!MatchesNyxRelayTextInFlight(state, evt.Operation, evt.Sequence, evt.OperationGeneration)) + return; + + switch (evt.Operation) + { + case NyxRelayTextOperationKind.Interim: + if (state.AllowsFinalEdit) + { + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.SuppressingInterim, + terminalReason: "interim_edit_timeout", + fieldUpdate: s => s with { InFlight = null }); + } + else + { + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.DisabledPreSend, + terminalReason: "first_send_timeout", + fieldUpdate: s => s with { InFlight = null }); + } + return; + case NyxRelayTextOperationKind.FailureSelfHeal: + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.TerminalPartial, + terminalReason: "failed_self_heal_timeout", + fieldUpdate: s => s with { InFlight = null }); + await PersistStreamedCompletionAsync( + evt, + NormalizeOptional(evt.CommandId) ?? state.PendingFinalizeCommandId ?? BuildLlmReplyCommandId(correlationId), + NormalizeOptional(evt.CurrentPlatformMessageId) ?? state.PlatformMessageId ?? string.Empty, + state.LastFlushedText, + state.EditCount); + return; + case NyxRelayTextOperationKind.Final: + await TransitionNyxRelayStreamingPhaseAsync( + correlationId, + state, + NyxRelayStreamingPhase.TerminalPartial, + terminalReason: "final_edit_timeout", + fieldUpdate: s => s with { InFlight = null }); + await PersistStreamedCompletionAsync( + evt, + NormalizeOptional(evt.CommandId) ?? state.PendingFinalizeCommandId ?? BuildLlmReplyCommandId(correlationId), + NormalizeOptional(evt.CurrentPlatformMessageId) ?? state.PlatformMessageId ?? string.Empty, + state.LastFlushedText, + state.EditCount); + return; + } + } + + private async Task ContinueNyxRelayTextCoalescedWorkAsync( + string correlationId, + NyxRelayStreamingState state, + LlmReplyStreamChunkEvent? sourceChunk) + { + if (state.InFlight is not null || IsTerminalNyxRelayStreamingPhase(state.Phase)) + return; + + if (state.PendingFinalizeText is not null) + { + var commandId = state.PendingFinalizeCommandId ?? BuildLlmReplyCommandId(correlationId); + var ready = new LlmReplyReadyEvent + { + CorrelationId = correlationId, + RegistrationId = sourceChunk?.RegistrationId ?? string.Empty, + Activity = sourceChunk?.Activity?.Clone() ?? new ChatActivity(), + Outbound = new MessageContent { Text = state.PendingFinalizeText }, + TerminalState = state.PendingTerminalState == LlmReplyTerminalState.Unspecified + ? LlmReplyTerminalState.Completed + : state.PendingTerminalState, + ReadyAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + var runtimeContext = BuildNyxRelayRuntimeContext( + correlationId, + ready.Activity, + sourceChunk?.ReplyToken, + sourceChunk?.ReplyTokenExpiresAtUnixMs ?? 0); + await TryCompleteStreamedReplyAsync(ready, commandId, ready.Activity, runtimeContext); + return; + } + + if (state.PendingAccumulatedText is null || sourceChunk is null) + return; + + var chunk = sourceChunk.Clone(); + chunk.AccumulatedText = state.PendingAccumulatedText; + await HandleNyxRelayStreamingChunkCoreAsync(chunk); + } + + private async Task PersistStreamedCompletionAsync( + NyxRelayTextOperationCompletedEvent evt, + string commandId, + string platformMessageId, + string outboundText, + int edits) => + await PersistStreamedCompletionAsync( + new LlmReplyReadyEvent + { + CorrelationId = evt.CorrelationId, + RegistrationId = evt.Chunk?.RegistrationId ?? string.Empty, + Activity = evt.Chunk?.Activity?.Clone() ?? new ChatActivity(), + Outbound = new MessageContent { Text = outboundText }, + TerminalState = LlmReplyTerminalState.Completed, + ReadyAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }, + commandId, + evt.Chunk?.Activity, + platformMessageId, + outboundText, + edits); + + private async Task PersistStreamedCompletionAsync( + NyxRelayTextOperationTimeoutFiredEvent evt, + string commandId, + string platformMessageId, + string outboundText, + int edits) => + await PersistStreamedCompletionAsync( + new LlmReplyReadyEvent + { + CorrelationId = evt.CorrelationId, + RegistrationId = evt.Chunk?.RegistrationId ?? string.Empty, + Activity = evt.Chunk?.Activity?.Clone() ?? new ChatActivity(), + Outbound = new MessageContent { Text = outboundText }, + TerminalState = LlmReplyTerminalState.Completed, + ReadyAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }, + commandId, + evt.Chunk?.Activity, + platformMessageId, + outboundText, + edits); + private async Task PersistStreamedCompletionAsync( LlmReplyReadyEvent evt, string commandId, diff --git a/agents/Aevatar.GAgents.Channel.Runtime/protos/conversation_events.proto b/agents/Aevatar.GAgents.Channel.Runtime/protos/conversation_events.proto index e5f468968..e2a28ff2b 100644 --- a/agents/Aevatar.GAgents.Channel.Runtime/protos/conversation_events.proto +++ b/agents/Aevatar.GAgents.Channel.Runtime/protos/conversation_events.proto @@ -149,6 +149,13 @@ enum LarkCardOperationPhase { LARK_CARD_OPERATION_PHASE_FINALIZE = 3; } +enum NyxRelayTextOperationKind { + NYX_RELAY_TEXT_OPERATION_KIND_UNSPECIFIED = 0; + NYX_RELAY_TEXT_OPERATION_KIND_INTERIM = 1; + NYX_RELAY_TEXT_OPERATION_KIND_FAILURE_SELF_HEAL = 2; + NYX_RELAY_TEXT_OPERATION_KIND_FINAL = 3; +} + // Refactor (iter20/cluster-004): // Old pattern: ConversationGAgent 持有 actor token registry + 可见回复状态部分仅在内存 // New principle: 删 actor token registry,credentials runtime-only,可见回复 lifecycle 持久到 ConversationGAgent state @@ -172,6 +179,10 @@ message ConversationReplyLifecycleState { string pending_accumulated_text = 17; string pending_finalize_text = 18; string pending_finalize_command_id = 19; + NyxRelayTextOperationKind nyx_relay_in_flight_operation = 20; + int64 nyx_relay_in_flight_sequence = 21; + int64 nyx_relay_operation_generation = 22; + LlmReplyTerminalState pending_nyx_relay_terminal_state = 23; } // Refactor (iter20/cluster-004): @@ -303,6 +314,52 @@ message LarkCardOperationTimeoutFiredEvent { int64 fired_at_unix_ms = 12; } +enum NyxRelayTextOperationResultState { + NYX_RELAY_TEXT_OPERATION_RESULT_STATE_UNSPECIFIED = 0; + NYX_RELAY_TEXT_OPERATION_RESULT_STATE_SUCCEEDED = 1; + NYX_RELAY_TEXT_OPERATION_RESULT_STATE_FAILED = 2; + NYX_RELAY_TEXT_OPERATION_RESULT_STATE_FAULTED = 3; +} + +message NyxRelayTextOperationRawResult { + string platform_message_id = 1; + bool edit_unsupported = 2; + string raw_error_code = 3; + string raw_error_summary = 4; + string exception_type = 5; + string exception_message = 6; +} + +message NyxRelayTextOperationCompletedEvent { + string operation_id = 1; + string correlation_id = 2; + NyxRelayTextOperationKind operation = 3; + int64 sequence = 4; + int64 operation_generation = 5; + NyxRelayTextOperationResultState state = 6; + NyxRelayTextOperationRawResult raw_result = 7; + LlmReplyStreamChunkEvent chunk = 8; + string current_platform_message_id = 9; + string command_id = 10; + string final_text = 11; + string last_flushed_text = 12; + int32 edit_count = 13; +} + +message NyxRelayTextOperationTimeoutFiredEvent { + string correlation_id = 1; + NyxRelayTextOperationKind operation = 2; + int64 sequence = 3; + int64 operation_generation = 4; + LlmReplyStreamChunkEvent chunk = 5; + string current_platform_message_id = 6; + string command_id = 7; + string final_text = 8; + string last_flushed_text = 9; + int32 edit_count = 10; + int64 fired_at_unix_ms = 11; +} + message DeferredLlmReplyDispatchRequestedEvent { string correlation_id = 1; int64 requested_at_unix_ms = 2; diff --git a/test/Aevatar.GAgents.Channel.Protocol.Tests/ConversationGAgentDedupTests.cs b/test/Aevatar.GAgents.Channel.Protocol.Tests/ConversationGAgentDedupTests.cs index 45407e357..ab39246bd 100644 --- a/test/Aevatar.GAgents.Channel.Protocol.Tests/ConversationGAgentDedupTests.cs +++ b/test/Aevatar.GAgents.Channel.Protocol.Tests/ConversationGAgentDedupTests.cs @@ -1485,10 +1485,12 @@ public async Task HandleLlmReplyStreamChunkAsync_FirstChunk_CallsRunStreamChunkW StreamChunkResultFactory = (_, currentPmid) => ConversationStreamChunkResult.Succeeded(currentPmid ?? "om_first"), }; - var (agent, _) = CreateAgent(runner, "conv-stream-first"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, _) = CreateAgent(runner, "conv-stream-first", dispatchPort: dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream", "relay-msg-1", "hello")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); runner.StreamChunkCount.ShouldBe(1); runner.LastStreamChunkCurrentPlatformMessageId.ShouldBeNull(); @@ -1502,12 +1504,15 @@ public async Task HandleLlmReplyStreamChunkAsync_SubsequentChunk_PassesStoredPla StreamChunkResultFactory = (_, currentPmid) => ConversationStreamChunkResult.Succeeded(currentPmid ?? "om_first"), }; - var (agent, _) = CreateAgent(runner, "conv-stream-2"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, _) = CreateAgent(runner, "conv-stream-2", dispatchPort: dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-2", "relay-msg-1", "first chunk")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-2", "relay-msg-1", "first chunk plus more")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); runner.StreamChunkCount.ShouldBe(2); runner.LastStreamChunkCurrentPlatformMessageId.ShouldBe("om_first"); @@ -1521,10 +1526,12 @@ public async Task HandleLlmReplyStreamChunkAsync_WhenRunnerFails_MarksDisabledAn StreamChunkResultFactory = (_, _) => ConversationStreamChunkResult.Failed("relay_reply_edit_unsupported", "nope", editUnsupported: true), }; - var (agent, _) = CreateAgent(runner, "conv-stream-fail"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, _) = CreateAgent(runner, "conv-stream-fail", dispatchPort: dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-fail", "relay-msg-1", "first")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-fail", "relay-msg-1", "first plus second")); await agent.HandleLlmReplyStreamChunkAsync( @@ -1553,10 +1560,12 @@ public async Task HandleLlmReplyReadyAsync_WhenStreamingSucceeded_PersistsComple StreamChunkResultFactory = (_, currentPmid) => ConversationStreamChunkResult.Succeeded(currentPmid ?? "om_stream"), }; - var (agent, store) = CreateAgent(runner, "conv-stream-short-circuit"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, store) = CreateAgent(runner, "conv-stream-short-circuit", dispatchPort: dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-sc", "relay-msg-1", "final text")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); var ready = new LlmReplyReadyEvent { @@ -1599,6 +1608,16 @@ public async Task HandleLlmReplyReadyAsync_TextStreamingLifecycleSurvivesReactiv await firstAgent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-reactivate", "relay-msg-1", "first partial")); + await firstAgent.HandleNyxRelayTextOperationCompletedAsync(new NyxRelayTextOperationCompletedEvent + { + CorrelationId = "act-stream-reactivate", + Operation = NyxRelayTextOperationKind.Interim, + Sequence = 1, + OperationGeneration = firstAgent.State.ActiveReplyLifecycles.Single().NyxRelayOperationGeneration, + State = NyxRelayTextOperationResultState.Succeeded, + RawResult = new NyxRelayTextOperationRawResult { PlatformMessageId = "om_reactivated" }, + Chunk = CreateStreamChunk("act-stream-reactivate", "relay-msg-1", "first partial"), + }); var lifecycle = firstAgent.State.ActiveReplyLifecycles.Single(); lifecycle.Mode.ShouldBe(ConversationReplyLifecycleMode.NyxRelayText); @@ -1611,7 +1630,8 @@ await firstAgent.HandleLlmReplyStreamChunkAsync( StreamChunkResultFactory = (_, currentPmid) => ConversationStreamChunkResult.Succeeded(currentPmid ?? "om_reactivated"), }; - var (secondAgent, _) = CreateAgent(secondRunner, "conv-stream-reactivate", store: store); + var secondDispatch = new RecordingActorDispatchPort(); + var (secondAgent, _) = CreateAgent(secondRunner, "conv-stream-reactivate", store: store, dispatchPort: secondDispatch); await secondAgent.HandleLlmReplyReadyAsync(new LlmReplyReadyEvent { @@ -1625,6 +1645,7 @@ await secondAgent.HandleLlmReplyReadyAsync(new LlmReplyReadyEvent ReplyToken = "runtime-ready-token", ReplyTokenExpiresAtUnixMs = DateTimeOffset.UtcNow.AddMinutes(5).ToUnixTimeMilliseconds(), }); + await CompleteNextNyxRelayTextOperationAsync(secondAgent, secondDispatch); secondRunner.LlmReplyCount.ShouldBe(0); secondRunner.StreamChunkCount.ShouldBe(1); @@ -1644,10 +1665,12 @@ public async Task HandleLlmReplyReadyAsync_WhenStreamingDisabled_FallsBackToRunL StreamChunkResultFactory = (_, _) => ConversationStreamChunkResult.Failed("relay_reply_edit_unsupported", "nope", editUnsupported: true), }; - var (agent, store) = CreateAgent(runner, "conv-stream-fallback"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, store) = CreateAgent(runner, "conv-stream-fallback", dispatchPort: dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-fb", "relay-msg-1", "partial")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); var ready = new LlmReplyReadyEvent { @@ -1692,14 +1715,17 @@ public async Task HandleLlmReplyStreamChunkAsync_InterimEditFailureAfterTokenCon return ConversationStreamChunkResult.Failed("transient_edit_error", "boom"); }, }; - var (agent, _) = CreateAgent(runner, "conv-stream-suppress"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, _) = CreateAgent(runner, "conv-stream-suppress", dispatchPort: dispatch); // First chunk consumes the reply token. await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-suppress", "relay-msg-1", "hello")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); // Interim edit fails after token consumed. await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-suppress", "relay-msg-1", "hello world")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); // Later interim chunk must be dropped (not dispatched to runner). await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-suppress", "relay-msg-1", "hello world again")); @@ -1727,12 +1753,15 @@ public async Task HandleLlmReplyReadyAsync_WhenTokenAlreadyConsumedAndInterimEdi return ConversationStreamChunkResult.Succeeded(pmid ?? "om_first_consumed"); }, }; - var (agent, store) = CreateAgent(runner, "conv-stream-final-retry"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, store) = CreateAgent(runner, "conv-stream-final-retry", dispatchPort: dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-final-retry", "relay-msg-1", "hello")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-final-retry", "relay-msg-1", "hello world")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); var ready = new LlmReplyReadyEvent { @@ -1745,6 +1774,7 @@ await agent.HandleLlmReplyStreamChunkAsync( ReadyAtUnixMs = 100, }; await agent.HandleLlmReplyReadyAsync(ready); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); // Must not fall back to RunLlmReplyAsync — the token is already consumed. runner.LlmReplyCount.ShouldBe(0); @@ -1775,12 +1805,15 @@ public async Task HandleLlmReplyReadyAsync_WhenTokenConsumedAndFinalEditAlsoFail return ConversationStreamChunkResult.Failed("transient_edit_error", "boom"); }, }; - var (agent, store) = CreateAgent(runner, "conv-stream-final-degraded"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, store) = CreateAgent(runner, "conv-stream-final-degraded", dispatchPort: dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-final-degraded", "relay-msg-1", "hello partial")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-final-degraded", "relay-msg-1", "hello partial more")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); var ready = new LlmReplyReadyEvent { @@ -1793,6 +1826,7 @@ await agent.HandleLlmReplyStreamChunkAsync( ReadyAtUnixMs = 100, }; await agent.HandleLlmReplyReadyAsync(ready); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); runner.LlmReplyCount.ShouldBe(0); var events = await store.GetEventsAsync(agent.Id); @@ -1830,11 +1864,13 @@ public async Task HandleLlmReplyReadyAsync_WhenStreamingStartedThenLlmFailed_Edi return ConversationStreamChunkResult.Succeeded(pmid ?? "om_placeholder_consumed"); }, }; - var (agent, store) = CreateAgent(runner, "conv-stream-failed-edit"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, store) = CreateAgent(runner, "conv-stream-failed-edit", dispatchPort: dispatch); // First chunk lands the placeholder + consumes the reply token. await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-failed", "relay-msg-1", "...")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); var ready = new LlmReplyReadyEvent { @@ -1851,6 +1887,7 @@ await agent.HandleLlmReplyStreamChunkAsync( ReadyAtUnixMs = 100, }; await agent.HandleLlmReplyReadyAsync(ready); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); // Must NOT fall through to RunLlmReplyAsync (would 401 on the dead token). runner.LlmReplyCount.ShouldBe(0); @@ -1885,10 +1922,12 @@ public async Task HandleLlmReplyReadyAsync_WhenStreamingStartedAndFailedEditAlso return ConversationStreamChunkResult.Failed("relay_reply_edit_unsupported", "lark refused", editUnsupported: true); }, }; - var (agent, store) = CreateAgent(runner, "conv-stream-failed-edit-deny"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, store) = CreateAgent(runner, "conv-stream-failed-edit-deny", dispatchPort: dispatch); await agent.HandleLlmReplyStreamChunkAsync( CreateStreamChunk("act-stream-failed-deny", "relay-msg-1", "first partial")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); var ready = new LlmReplyReadyEvent { @@ -1903,6 +1942,7 @@ await agent.HandleLlmReplyStreamChunkAsync( ReadyAtUnixMs = 100, }; await agent.HandleLlmReplyReadyAsync(ready); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); runner.LlmReplyCount.ShouldBe(0); var events = await store.GetEventsAsync(agent.Id); @@ -2014,12 +2054,13 @@ private static (ConversationGAgent agent, IEventStore store) CreateAgent( IChatRoutePolicyQueryPort? queryPort = null, ChatRouteResolver? chatRouteResolver = null, IEventStore? store = null, - IEventPublisher? eventPublisher = null) + IEventPublisher? eventPublisher = null, + RecordingActorDispatchPort? dispatchPort = null) { store ??= new InMemoryEventStore(); var services = new ServiceCollection(); services.AddSingleton(store); - services.AddSingleton(new RecordingActorDispatchPort()); + services.AddSingleton(dispatchPort ?? new RecordingActorDispatchPort()); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(runner); @@ -2123,6 +2164,15 @@ private static string GetRepositoryRoot() DispatchedAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; + private static async Task CompleteNextNyxRelayTextOperationAsync( + ConversationGAgent agent, + RecordingActorDispatchPort dispatchPort) + { + var completed = await dispatchPort.WaitForPayloadAsync(); + await agent.HandleNyxRelayTextOperationCompletedAsync(completed); + return completed; + } + private sealed class RecordingTurnRunner : IConversationTurnRunner { public int InboundCount; @@ -2275,12 +2325,43 @@ public Task SendToAsync( private sealed class RecordingActorDispatchPort : IActorDispatchPort { public List<(string ActorId, EventEnvelope Envelope)> Dispatches { get; } = []; + private readonly Queue _pending = new(); + private readonly SemaphoreSlim _available = new(0); public Task DispatchAsync(string actorId, EventEnvelope envelope, CancellationToken ct = default) { - Dispatches.Add((actorId, envelope.Clone())); + var clone = envelope.Clone(); + Dispatches.Add((actorId, clone.Clone())); + lock (_pending) + { + _pending.Enqueue(clone); + } + _available.Release(); return Task.CompletedTask; } + + public async Task WaitForPayloadAsync() + where T : IMessage, new() + { + var deadline = DateTimeOffset.UtcNow.AddSeconds(5); + while (DateTimeOffset.UtcNow < deadline) + { + var remaining = deadline - DateTimeOffset.UtcNow; + if (remaining <= TimeSpan.Zero || !await _available.WaitAsync(remaining)) + break; + + EventEnvelope envelope; + lock (_pending) + { + envelope = _pending.Dequeue(); + } + + if (envelope.Payload.Is(new T().Descriptor)) + return envelope.Payload.Unpack(); + } + + throw new TimeoutException($"Timed out waiting for dispatched {typeof(T).Name}."); + } } private sealed class RecordingCallbackScheduler : IActorRuntimeCallbackScheduler @@ -2349,7 +2430,8 @@ public async Task HandleLarkCardOperationTimeoutFiredAsync_CreateTimeout_FallsBa StreamChunkResultFactory = (_, currentPmid) => ConversationStreamChunkResult.Succeeded(currentPmid ?? "om_text_first"), }; - var (agent, _) = CreateAgent(text, "conv-card-timeout"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, _) = CreateAgent(text, "conv-card-timeout", dispatchPort: dispatch); await agent.HandleLlmReplyCardStreamChunkAsync( CreateCardStreamChunk("act-card-timeout", "relay-msg-1", "hello")); @@ -2365,6 +2447,7 @@ await agent.HandleLarkCardOperationTimeoutFiredAsync(new LarkCardOperationTimeou CommandId = "llm-reply:act-card-timeout", FiredAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); text.StreamChunkCount.ShouldBe(1); agent.State.ActiveReplyLifecycles.ShouldContain(x => @@ -2628,7 +2711,8 @@ public async Task HandleLlmReplyReadyAsync_CardCreationFailed_DefersToTextEditFa StreamChunkResultFactory = (_, currentPmid) => ConversationStreamChunkResult.Succeeded(currentPmid ?? "om_text_first"), }; - var (agent, store) = CreateAgent(text, "conv-card-fb-final"); + var dispatch = new RecordingActorDispatchPort(); + var (agent, store) = CreateAgent(text, "conv-card-fb-final", dispatchPort: dispatch); await agent.HandleLlmReplyCardStreamChunkAsync( CreateCardStreamChunk("act-card-fb-final", "relay-msg-1", "complete answer")); @@ -2642,6 +2726,7 @@ await agent.HandleLarkCardOperationCompletedAsync(CreateCardCreateCompletion( isRateLimited: true, errorCode: "card_create_failed", errorSummary: "down")); + await CompleteNextNyxRelayTextOperationAsync(agent, dispatch); var ready = new LlmReplyReadyEvent { @@ -2655,7 +2740,8 @@ await agent.HandleLarkCardOperationCompletedAsync(CreateCardCreateCompletion( }; await agent.HandleLlmReplyReadyAsync(ready); - // Text-edit finalize lands the ConversationTurnCompletedEvent with the legacy prefix. + // Text-edit fallback lands the ConversationTurnCompletedEvent with the legacy prefix + // after the typed text operation completion reconciles inside the actor. var events = await store.GetEventsAsync(agent.Id); events.Last().EventType.ShouldContain(nameof(ConversationTurnCompletedEvent)); var completed = ConversationTurnCompletedEvent.Parser.ParseFrom(events.Last().EventData.Value); diff --git a/test/Aevatar.GAgents.ChannelRuntime.Tests/NyxRelayTextOperationTimeoutPayloadTests.cs b/test/Aevatar.GAgents.ChannelRuntime.Tests/NyxRelayTextOperationTimeoutPayloadTests.cs new file mode 100644 index 000000000..bcf053a10 --- /dev/null +++ b/test/Aevatar.GAgents.ChannelRuntime.Tests/NyxRelayTextOperationTimeoutPayloadTests.cs @@ -0,0 +1,287 @@ +using System.Reflection; +using System.Text; +using Aevatar.Foundation.Abstractions; +using Aevatar.Foundation.Abstractions.Persistence; +using Aevatar.Foundation.Abstractions.Runtime.Callbacks; +using Aevatar.Foundation.Core.EventSourcing; +using Aevatar.GAgents.Channel.Abstractions; +using Aevatar.GAgents.Channel.Runtime; +using FluentAssertions; +using Google.Protobuf; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Aevatar.GAgents.ChannelRuntime.Tests; + +public sealed class NyxRelayTextOperationTimeoutPayloadTests +{ + [Fact] + public async Task HandleLlmReplyStreamChunkAsync_ScheduledTimeoutPayload_StripsRuntimeRelayCredentials() + { + var scheduler = new RecordingCallbackScheduler(); + var agent = CreateAgent("conv-nyx-timeout-sanitize", scheduler); + + await agent.HandleLlmReplyStreamChunkAsync(CreateStreamChunk()); + + var scheduled = scheduler.Timeouts.Should().ContainSingle().Subject; + var timeout = scheduled.TriggerEnvelope.Payload.Unpack(); + timeout.Chunk.Should().NotBeNull(); + timeout.Chunk.ReplyToken.Should().BeEmpty(); + timeout.Chunk.ReplyTokenExpiresAtUnixMs.Should().Be(0); + timeout.Chunk.Activity.TransportExtras.NyxUserAccessToken.Should().BeEmpty(); + timeout.Chunk.CorrelationId.Should().Be("corr-timeout-token"); + timeout.Chunk.RegistrationId.Should().Be("reg-1"); + timeout.Chunk.AccumulatedText.Should().Be("hello"); + timeout.Chunk.Activity.OutboundDelivery.ReplyMessageId.Should().Be("relay-msg-1"); + + var persistedBytes = scheduled.TriggerEnvelope.ToByteArray(); + Encoding.UTF8.GetString(persistedBytes).Should().NotContain("runtime-reply-token-secret"); + Encoding.UTF8.GetString(persistedBytes).Should().NotContain("runtime-user-access-token-secret"); + } + + private static ConversationGAgent CreateAgent( + string id, + IActorRuntimeCallbackScheduler scheduler) + { + var services = new ServiceCollection() + .AddSingleton() + .AddSingleton() + .AddSingleton(scheduler) + .AddSingleton() + .AddSingleton() + .AddTransient(typeof(IEventSourcingBehaviorFactory<>), typeof(DefaultEventSourcingBehaviorFactory<>)) + .BuildServiceProvider(); + + var agent = new ConversationGAgent + { + Services = services, + EventPublisher = new NoopEventPublisher(), + EventSourcingBehaviorFactory = + services.GetRequiredService>(), + }; + SetId(agent, id); + agent.ActivateAsync().GetAwaiter().GetResult(); + return agent; + } + + private static LlmReplyStreamChunkEvent CreateStreamChunk() => + new() + { + CorrelationId = "corr-timeout-token", + RegistrationId = "reg-1", + Activity = new ChatActivity + { + Id = "corr-timeout-token", + Type = ActivityType.Message, + ChannelId = new ChannelId { Value = "lark" }, + Bot = new BotInstanceId { Value = "lark-bot" }, + Conversation = new ConversationReference + { + Channel = new ChannelId { Value = "lark" }, + Bot = new BotInstanceId { Value = "lark-bot" }, + Scope = ConversationScope.Group, + CanonicalKey = "conv:lark:grp", + }, + Content = new MessageContent { Text = "user question" }, + OutboundDelivery = new OutboundDeliveryContext + { + ReplyMessageId = "relay-msg-1", + CorrelationId = "corr-timeout-token", + }, + TransportExtras = new TransportExtras + { + NyxUserAccessToken = "runtime-user-access-token-secret", + NyxPlatform = "lark", + NyxConversationId = "oc_group_chat_1", + }, + }, + AccumulatedText = "hello", + ChunkAtUnixMs = 42, + ReplyToken = "runtime-reply-token-secret", + ReplyTokenExpiresAtUnixMs = DateTimeOffset.UtcNow.AddMinutes(5).ToUnixTimeMilliseconds(), + }; + + private static void SetId(object agent, string id) + { + var current = agent.GetType(); + while (current is not null) + { + var setIdMethod = current.GetMethod( + "SetId", + BindingFlags.Instance | BindingFlags.NonPublic); + if (setIdMethod is not null) + { + setIdMethod.Invoke(agent, [id]); + return; + } + + current = current.BaseType; + } + + throw new InvalidOperationException("Unable to set agent id via reflection."); + } + + private sealed class SucceedingTurnRunner : IConversationTurnRunner + { + public Task RunInboundAsync( + ChatActivity activity, + ConversationTurnRuntimeContext runtimeContext, + CancellationToken ct) => + Task.FromResult(ConversationTurnResult.Ignored("not-used", activity.Id)); + + public Task RunLlmReplyAsync( + LlmReplyReadyEvent reply, + ConversationTurnRuntimeContext runtimeContext, + CancellationToken ct) => + Task.FromResult(ConversationTurnResult.Sent( + "sent", + reply.Outbound?.Clone() ?? new MessageContent(), + "bot")); + + public Task RunContinueAsync( + ConversationContinueRequestedEvent command, + CancellationToken ct) => + Task.FromResult(ConversationTurnResult.Ignored("not-used", command.CommandId)); + + public Task RunStreamChunkAsync( + LlmReplyStreamChunkEvent chunk, + string? currentPlatformMessageId, + ConversationTurnRuntimeContext runtimeContext, + CancellationToken ct) => + Task.FromResult(ConversationStreamChunkResult.Succeeded(currentPlatformMessageId ?? "om_first")); + } + + private sealed class RecordingCallbackScheduler : IActorRuntimeCallbackScheduler + { + public List Timeouts { get; } = []; + + public Task ScheduleTimeoutAsync( + RuntimeCallbackTimeoutRequest request, + CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + Timeouts.Add(new RuntimeCallbackTimeoutRequest + { + ActorId = request.ActorId, + CallbackId = request.CallbackId, + TriggerEnvelope = request.TriggerEnvelope.Clone(), + DueTime = request.DueTime, + DeliveryMode = request.DeliveryMode, + }); + return Task.FromResult(new RuntimeCallbackLease( + request.ActorId, + request.CallbackId, + 1, + RuntimeCallbackBackend.InMemory)); + } + + public Task ScheduleTimerAsync( + RuntimeCallbackTimerRequest request, + CancellationToken ct = default) => + Task.FromResult(new RuntimeCallbackLease( + request.ActorId, + request.CallbackId, + 1, + RuntimeCallbackBackend.InMemory)); + + public Task CancelAsync(RuntimeCallbackLease lease, CancellationToken ct = default) => Task.CompletedTask; + + public Task PurgeActorAsync(string actorId, CancellationToken ct = default) => Task.CompletedTask; + } + + private sealed class NoopActorDispatchPort : IActorDispatchPort + { + public Task DispatchAsync(string actorId, EventEnvelope envelope, CancellationToken ct = default) => + Task.CompletedTask; + } + + private sealed class NoopEventPublisher : IEventPublisher + { + public Task PublishAsync( + TEvent evt, + TopologyAudience audience = TopologyAudience.Children, + CancellationToken ct = default, + EventEnvelope? sourceEnvelope = null, + EventEnvelopePublishOptions? options = null) + where TEvent : IMessage => + Task.CompletedTask; + + public Task SendToAsync( + string targetActorId, + TEvent evt, + CancellationToken ct = default, + EventEnvelope? sourceEnvelope = null, + EventEnvelopePublishOptions? options = null) + where TEvent : IMessage => + Task.CompletedTask; + } + + private sealed class InMemoryEventStore : IEventStore + { + private readonly Dictionary> _events = new(StringComparer.Ordinal); + + public Task AppendAsync( + string agentId, + IEnumerable events, + long expectedVersion, + CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + if (!_events.TryGetValue(agentId, out var stream)) + { + stream = []; + _events[agentId] = stream; + } + + var currentVersion = stream.Count == 0 ? 0 : stream[^1].Version; + if (currentVersion != expectedVersion) + throw new EventStoreOptimisticConcurrencyException( + agentId, + expectedVersion, + currentVersion); + + var appended = events.Select(x => x.Clone()).ToList(); + stream.AddRange(appended); + return Task.FromResult(new EventStoreCommitResult + { + AgentId = agentId, + LatestVersion = stream.Count == 0 ? 0 : stream[^1].Version, + CommittedEvents = { appended.Select(x => x.Clone()) }, + }); + } + + public Task> GetEventsAsync( + string agentId, + long? fromVersion = null, + CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + if (!_events.TryGetValue(agentId, out var stream)) + return Task.FromResult>([]); + + IReadOnlyList result = fromVersion.HasValue + ? stream.Where(x => x.Version > fromVersion.Value).Select(x => x.Clone()).ToList() + : stream.Select(x => x.Clone()).ToList(); + return Task.FromResult(result); + } + + public Task GetVersionAsync(string agentId, CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + if (!_events.TryGetValue(agentId, out var stream) || stream.Count == 0) + return Task.FromResult(0L); + return Task.FromResult(stream[^1].Version); + } + + public Task DeleteEventsUpToAsync(string agentId, long toVersion, CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + if (toVersion <= 0 || !_events.TryGetValue(agentId, out var stream)) + return Task.FromResult(0L); + + var before = stream.Count; + stream.RemoveAll(x => x.Version <= toVersion); + return Task.FromResult((long)(before - stream.Count)); + } + } +} diff --git a/tools/ci/architecture_guards.sh b/tools/ci/architecture_guards.sh index f4821ad0a..ef55f923c 100755 --- a/tools/ci/architecture_guards.sh +++ b/tools/ci/architecture_guards.sh @@ -1324,6 +1324,66 @@ if [ -f "${lark_card_streaming_file}" ]; then fi fi +# Refactor (iter57/cluster-068-lark-extend-signal): +# Old pattern: ConversationGAgent Nyx relay text streaming awaited external relay writes +# inside the actor turn and interpreted their result inline. +# New principle: text helpers only dispatch NyxRelayTextOperationCompletedEvent with +# minimal raw result; actor handlers own lifecycle, timestamps, terminal reasons, and +# completion persistence. +conversation_gagent_file="agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.cs" +if [ -f "${conversation_gagent_file}" ]; then + nyx_relay_text_helper_report="$( + awk ' + /private async Task ExecuteNyxRelayTextOperationAsync/ { + in_helper = 1 + body_started = 0 + brace_depth = 0 + completed_signal_pending = 0 + completed_signal_depth = -1 + } + in_helper { + line = $0 + hard_forbidden = "TransitionNyxRelayStreamingPhaseAsync|PersistStreamedCompletionAsync|DateTimeOffset\\.UtcNow\\.ToUnixTimeMilliseconds\\(\\)|TerminalSucceeded|TerminalPartial|DisabledPreSend|SuppressingInterim|terminalReason|failed_self_heal|final_edit_failed|interim_edit_failed|first_send_failed" + inside_completed_signal = completed_signal_depth >= 0 || line ~ /new[[:space:]]+NyxRelayTextOperationCompletedEvent/ + + if (body_started && !inside_completed_signal && line ~ hard_forbidden) { + print FILENAME ":" FNR ":" $0 + } + + if (line ~ /new[[:space:]]+NyxRelayTextOperationCompletedEvent/) { + completed_signal_pending = 1 + } + + opens = gsub(/\{/, "{", line) + closes = gsub(/\}/, "}", line) + if (!body_started && opens > 0) { + body_started = 1 + } + brace_depth += opens - closes + + if (completed_signal_pending && opens > 0) { + completed_signal_depth = brace_depth + completed_signal_pending = 0 + } + if (completed_signal_depth >= 0 && brace_depth < completed_signal_depth) { + completed_signal_depth = -1 + } + if (body_started && brace_depth == 0) { + in_helper = 0 + body_started = 0 + completed_signal_pending = 0 + completed_signal_depth = -1 + } + } + ' "${conversation_gagent_file}" + )" + if [ -n "${nyx_relay_text_helper_report}" ]; then + echo "${nyx_relay_text_helper_report}" + echo "ConversationGAgent Nyx relay text helper must stay signal-only: no lifecycle transitions, terminal reasons, timestamps, or persistence in ExecuteNyxRelayTextOperationAsync." + exit 1 + fi +fi + check_orchestration_class_guard() { local file_path="$1" local max_non_empty_lines="$2"