diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseEvent.cs index e57204ea4e..5d59366a20 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseEvent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseEvent.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System.Collections.Generic; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows; @@ -19,6 +20,28 @@ public AgentResponseEvent(string executorId, AgentResponse response) : base(resp this.Response = Throw.IfNull(response); } + /// + /// Initializes a new instance of the class with the given output tag. + /// + /// The identifier of the executor that generated this event. + /// The agent response. + /// The output tag to associate with this event. + public AgentResponseEvent(string executorId, AgentResponse response, OutputTag tag) : base(response, executorId, tag) + { + this.Response = Throw.IfNull(response); + } + + /// + /// Initializes a new instance of the class with the given output tags. + /// + /// The identifier of the executor that generated this event. + /// The agent response. + /// The output tags to associate with this event. May be or empty. + public AgentResponseEvent(string executorId, AgentResponse response, IEnumerable? tags) : base(response, executorId, tags) + { + this.Response = Throw.IfNull(response); + } + /// /// Gets the agent response. /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseUpdateEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseUpdateEvent.cs index 017dce1763..f3d5215ccd 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseUpdateEvent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseUpdateEvent.cs @@ -20,6 +20,28 @@ public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update) : this.Update = Throw.IfNull(update); } + /// + /// Initializes a new instance of the class with the given output tag. + /// + /// The identifier of the executor that generated this event. + /// The agent run response update. + /// The output tag to associate with this event. + public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update, OutputTag tag) : base(update, executorId, tag) + { + this.Update = Throw.IfNull(update); + } + + /// + /// Initializes a new instance of the class with the given output tags. + /// + /// The identifier of the executor that generated this event. + /// The agent run response update. + /// The output tags to associate with this event. May be or empty. + public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update, IEnumerable? tags) : base(update, executorId, tags) + { + this.Update = Throw.IfNull(update); + } + /// /// Gets the agent run response update. /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs index 9d7aa5b8c7..007920f6bc 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs @@ -3,9 +3,6 @@ using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.Agents.AI.Workflows.Specialized; using Microsoft.Extensions.AI; using Microsoft.Shared.Diagnostics; @@ -37,31 +34,10 @@ private static Workflow BuildSequentialCore(string? workflowName, params IEnumer { Throw.IfNullOrEmpty(agents); - // Create a builder that chains the agents together in sequence. The workflow simply begins - // with the first agent in the sequence. - - AIAgentHostOptions options = new() - { - ReassignOtherAgentsAsUsers = true, - ForwardIncomingMessages = true, - }; - - List agentExecutors = agents.Select(agent => agent.BindAsExecutor(options)).ToList(); - - ExecutorBinding previous = agentExecutors[0]; - WorkflowBuilder builder = new(previous); - - foreach (ExecutorBinding next in agentExecutors.Skip(1)) - { - builder.AddEdge(previous, next); - previous = next; - } - - OutputMessagesExecutor end = new(); - builder = builder.AddEdge(previous, end).WithOutputFrom(end); + SequentialWorkflowBuilder builder = new(agents); if (workflowName is not null) { - builder = builder.WithName(workflowName); + builder.WithName(workflowName); } return builder.Build(); } @@ -107,41 +83,14 @@ private static Workflow BuildConcurrentCore( { Throw.IfNull(agents); - // A workflow needs a starting executor, so we create one that forwards everything to each agent. - ChatForwardingExecutor start = new("Start"); - WorkflowBuilder builder = new(start); - - // For each agent, we create an executor to host it and an accumulator to batch up its output messages, - // so that the final accumulator receives a single list of messages from each agent. Otherwise, the - // accumulator would not be able to determine what came from what agent, as there's currently no - // provenance tracking exposed in the workflow context passed to a handler. - - ExecutorBinding[] agentExecutors = (from agent in agents - select agent.BindAsExecutor(new AIAgentHostOptions() { ReassignOtherAgentsAsUsers = true })).ToArray(); - ExecutorBinding[] accumulators = [.. from agent in agentExecutors select (ExecutorBinding)new AggregateTurnMessagesExecutor($"Batcher/{agent.Id}")]; - builder.AddFanOutEdge(start, agentExecutors); - - for (int i = 0; i < agentExecutors.Length; i++) + ConcurrentWorkflowBuilder builder = new(agents); + if (workflowName is not null) { - builder.AddEdge(agentExecutors[i], accumulators[i]); + builder.WithName(workflowName); } - - // Create the accumulating executor that will gather the results from each agent, and connect - // each agent's accumulator to it. If no aggregation function was provided, we default to returning - // the last message from each agent - aggregator ??= static lists => (from list in lists where list.Count > 0 select list.Last()).ToList(); - - Func> endFactory = - (_, __) => new(new ConcurrentEndExecutor(agentExecutors.Length, aggregator)); - - ExecutorBinding end = endFactory.BindExecutor(ConcurrentEndExecutor.ExecutorId); - - builder.AddFanInBarrierEdge(accumulators, end); - - builder = builder.WithOutputFrom(end); - if (workflowName is not null) + if (aggregator is not null) { - builder = builder.WithName(workflowName); + builder.WithAggregator(aggregator); } return builder.Build(); } @@ -179,4 +128,32 @@ public static GroupChatWorkflowBuilder CreateGroupChatBuilderWith(FuncCreates a new with the given pipeline of . + /// The sequence of agents to compose into a sequential workflow. + /// The builder for creating a sequential workflow. + public static SequentialWorkflowBuilder CreateSequentialBuilderWith(params IEnumerable agents) + { + Throw.IfNull(agents); + return new SequentialWorkflowBuilder(agents); + } + + /// Creates a new with the given participating . + /// The set of agents to compose into a concurrent workflow. + /// The builder for creating a concurrent workflow. + public static ConcurrentWorkflowBuilder CreateConcurrentBuilderWith(params IEnumerable agents) + { + Throw.IfNull(agents); + return new ConcurrentWorkflowBuilder(agents); + } + + /// Creates a new with the given . + /// The LLM-powered manager agent that coordinates the team. + /// The builder for creating a Magentic workflow. + [Experimental(DiagnosticConstants.ExperimentalFeatureDiagnostic)] + public static MagenticWorkflowBuilder CreateMagenticBuilderWith(AIAgent managerAgent) + { + Throw.IfNull(managerAgent); + return new MagenticWorkflowBuilder(managerAgent); + } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfo.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfo.cs index f40882265a..aac14fee35 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfo.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfo.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System; using System.Collections.Generic; using System.Linq; using System.Text.Json.Serialization; @@ -15,14 +16,14 @@ internal WorkflowInfo( Dictionary> edges, HashSet requestPorts, string startExecutorId, - HashSet? outputExecutorIds) + Dictionary>? outputExecutorIds) { this.Executors = Throw.IfNull(executors); this.Edges = Throw.IfNull(edges); this.RequestPorts = Throw.IfNull(requestPorts); this.StartExecutorId = Throw.IfNullOrEmpty(startExecutorId); - this.OutputExecutorIds = outputExecutorIds ?? []; + this.OutputExecutorIds = outputExecutorIds ?? new Dictionary>(StringComparer.Ordinal); } public Dictionary Executors { get; } @@ -32,7 +33,15 @@ internal WorkflowInfo( public TypeId? InputType { get; } public string StartExecutorId { get; } - public HashSet OutputExecutorIds { get; } + /// + /// Map of executor id to the set of s under which the executor is registered. + /// An empty set means the executor is registered as a regular (untagged) output source. + /// JSON shape: { "executorId": ["intermediate"], ... }. Legacy payloads using the + /// older string[] shape are read by and + /// each id is treated as registered with an empty tag set. + /// + [JsonConverter(typeof(WorkflowInfoOutputExecutorsConverter))] + public Dictionary> OutputExecutorIds { get; } public bool IsMatch(Workflow workflow) { @@ -80,9 +89,12 @@ public bool IsMatch(Workflow workflow) return false; } - // Validate the outputs + // Validate the outputs (key set + tag set per id must match) if (workflow.OutputExecutors.Count != this.OutputExecutorIds.Count || - this.OutputExecutorIds.Any(id => !workflow.OutputExecutors.Contains(id))) + this.OutputExecutorIds.Any(kvp => + !workflow.OutputExecutors.TryGetValue(kvp.Key, out HashSet? tags) || + tags.Count != kvp.Value.Count || + !tags.SetEquals(kvp.Value))) { return false; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfoOutputExecutorsConverter.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfoOutputExecutorsConverter.cs new file mode 100644 index 0000000000..8ee8d39590 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/WorkflowInfoOutputExecutorsConverter.cs @@ -0,0 +1,122 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Microsoft.Agents.AI.Workflows.Checkpointing; + +/// +/// JSON converter for that supports both the new +/// map shape ({ "id": ["intermediate"] }) and the legacy array shape +/// (["id1", "id2"]). Legacy-shaped payloads are read as if every id had been registered +/// as a regular (untagged) output source; output is always written in the new map shape. +/// +internal sealed class WorkflowInfoOutputExecutorsConverter : JsonConverter>> +{ + public override Dictionary> Read( + ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + Dictionary> result = new(StringComparer.Ordinal); + + if (reader.TokenType == JsonTokenType.Null) + { + return result; + } + + if (reader.TokenType == JsonTokenType.StartArray) + { + // Legacy shape: a flat array of executor ids. Treat each as a registered + // (untagged) output executor. + while (reader.Read()) + { + if (reader.TokenType == JsonTokenType.EndArray) + { + return result; + } + + if (reader.TokenType != JsonTokenType.String) + { + throw new JsonException($"Expected a string in legacy outputExecutorIds array, got {reader.TokenType}."); + } + + string id = reader.GetString()!; + result[id] = []; + } + + throw new JsonException("Unexpected end of legacy outputExecutorIds array."); + } + + if (reader.TokenType != JsonTokenType.StartObject) + { + throw new JsonException($"Expected object or array for outputExecutorIds, got {reader.TokenType}."); + } + + while (reader.Read()) + { + if (reader.TokenType == JsonTokenType.EndObject) + { + return result; + } + + if (reader.TokenType != JsonTokenType.PropertyName) + { + throw new JsonException($"Expected property name in outputExecutorIds object, got {reader.TokenType}."); + } + + string id = reader.GetString()!; + reader.Read(); + + HashSet tags = []; + if (reader.TokenType == JsonTokenType.StartArray) + { + while (reader.Read() && reader.TokenType != JsonTokenType.EndArray) + { + if (reader.TokenType != JsonTokenType.String) + { + throw new JsonException($"Expected a string tag, got {reader.TokenType}."); + } + + tags.Add(ReadTag(reader.GetString()!)); + } + } + else + { + throw new JsonException($"Expected array of tags for outputExecutorIds[{id}], got {reader.TokenType}."); + } + + result[id] = tags; + } + + throw new JsonException("Unexpected end of outputExecutorIds object."); + } + + private static OutputTag ReadTag(string value) + { + if (string.Equals(value, OutputTag.Intermediate.Value, StringComparison.Ordinal)) + { + return OutputTag.Intermediate; + } + return new OutputTag(value); + } + + public override void Write( + Utf8JsonWriter writer, + Dictionary> value, + JsonSerializerOptions options) + { + writer.WriteStartObject(); + foreach (KeyValuePair> kvp in value) + { + writer.WritePropertyName(kvp.Key); + writer.WriteStartArray(); + foreach (OutputTag tag in kvp.Value) + { + writer.WriteStringValue(tag.Value); + } + writer.WriteEndArray(); + } + writer.WriteEndObject(); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ConcurrentWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ConcurrentWorkflowBuilder.cs new file mode 100644 index 0000000000..feb31ddd9f --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ConcurrentWorkflowBuilder.cs @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Specialized; +using Microsoft.Extensions.AI; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Fluent builder for concurrent agent workflows: a fan-out start that broadcasts the +/// incoming messages to every participating agent, a per-agent accumulator that batches +/// each agent's outgoing messages, and a fan-in aggregator that reduces them into a +/// single output list. +/// +/// +/// When no explicit output designations are made, the default is the Python-aligned +/// shape: the terminal aggregator is the workflow output, and every participating agent +/// (plus its per-agent accumulator) is designated as an intermediate output source. +/// Calling +/// or +/// at all suppresses these defaults. +/// +public sealed class ConcurrentWorkflowBuilder : OrchestrationBuilderBase +{ + private readonly List _agents = []; + private Func>, List>? _aggregator; + + /// + /// Initializes a new with the given participating + /// . + /// + public ConcurrentWorkflowBuilder(params IEnumerable agents) + { + Throw.IfNull(agents); + foreach (AIAgent agent in agents) + { + Throw.IfNull(agent, nameof(agents)); + this._agents.Add(agent); + } + } + + /// + /// Sets the aggregator function. If not called, defaults to returning the last message + /// from each agent that produced at least one message. + /// + public ConcurrentWorkflowBuilder WithAggregator(Func>, List> aggregator) + { + this._aggregator = Throw.IfNull(aggregator); + return this; + } + + /// Builds the configured concurrent workflow. + public Workflow Build() + { + if (this._agents.Count == 0) + { + throw new ArgumentException("At least one agent must be provided to the ConcurrentWorkflowBuilder.", "agents"); + } + + ChatForwardingExecutor start = new("Start"); + WorkflowBuilder builder = new(start); + + Dictionary agentMap = new(AIAgentIDEqualityComparer.Instance); + ExecutorBinding[] agentExecutors = new ExecutorBinding[this._agents.Count]; + ExecutorBinding[] accumulators = new ExecutorBinding[this._agents.Count]; + AIAgentHostOptions options = new() { ReassignOtherAgentsAsUsers = true }; + for (int i = 0; i < this._agents.Count; i++) + { + AIAgent agent = this._agents[i]; + ExecutorBinding binding = agent.BindAsExecutor(options); + agentExecutors[i] = binding; + agentMap[agent] = binding; + accumulators[i] = new AggregateTurnMessagesExecutor($"Batcher/{binding.Id}"); + } + + builder.AddFanOutEdge(start, agentExecutors); + for (int i = 0; i < agentExecutors.Length; i++) + { + builder.AddEdge(agentExecutors[i], accumulators[i]); + } + + Func>, List> aggregator = + this._aggregator ?? (static lists => (from list in lists where list.Count > 0 select list.Last()).ToList()); + + Func> endFactory = + (_, __) => new(new ConcurrentEndExecutor(agentExecutors.Length, aggregator)); + + ExecutorBinding end = endFactory.BindExecutor(ConcurrentEndExecutor.ExecutorId); + builder.AddFanInBarrierEdge(accumulators, end); + + this.ApplyMetadata(builder); + this.ApplyOutputDesignations(builder, agentMap, "concurrent", () => + { + builder.WithOutputFrom(end); + builder.WithIntermediateOutputFrom([.. agentExecutors, .. accumulators]); + }); + + return builder.Build(); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/OutputFilter.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/OutputFilter.cs index cecf1da9f8..5ef5b8713b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/OutputFilter.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/OutputFilter.cs @@ -1,11 +1,17 @@ // Copyright (c) Microsoft. All rights reserved. +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + namespace Microsoft.Agents.AI.Workflows.Execution; internal sealed class OutputFilter(Workflow workflow) { public bool CanOutput(string sourceExecutorId, object output) { - return workflow.OutputExecutors.Contains(sourceExecutorId); + return workflow.OutputExecutors.ContainsKey(sourceExecutorId); } + + public bool TryGetTags(string sourceExecutorId, [NotNullWhen(true)] out HashSet? tags) + => workflow.OutputExecutors.TryGetValue(sourceExecutorId, out tags); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Futures.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Futures.cs new file mode 100644 index 0000000000..b2c83f112a --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Futures.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Process-wide opt-in switches for in-development behavior changes that will become +/// the default in a future major release. Each flag defaults to +/// and should be toggled once at application startup. +/// +public static class Futures +{ + /// + /// When , and + /// payloads yielded by an executor participate + /// in the normal output-filter pipeline (i.e. they must be designated via + /// or + /// + /// to surface), and the resulting s carry + /// reflecting that designation. + /// + /// + /// + /// When (the current default), the runner emits + /// and unconditionally, + /// bypassing the output filter (historical behavior). Lifecycle: opt-in today, marked + /// [Obsolete] in v2.0.0 when the new behavior becomes default, and removed in v3.0.0. + /// + /// + /// Interaction with . When this flag + /// is , joins + /// in being forwarded out of the agent surface + /// unconditionally — neither honors the host's includeWorkflowOutputsInResponse + /// switch. That switch only governs the generic path for + /// non-AIAgent payloads. When this flag is , the legacy asymmetry + /// is preserved: is always forwarded but + /// stays gated by includeWorkflowOutputsInResponse. + /// + /// + public static bool EnableAgentResponseOutputTaggingAndFiltering { get; set; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/GroupChatWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/GroupChatWorkflowBuilder.cs index 66e4429e35..61330a9c42 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/GroupChatWorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/GroupChatWorkflowBuilder.cs @@ -12,12 +12,10 @@ namespace Microsoft.Agents.AI.Workflows; /// /// Provides a builder for specifying group chat relationships between agents and building the resulting workflow. /// -public sealed class GroupChatWorkflowBuilder +public sealed class GroupChatWorkflowBuilder : OrchestrationBuilderBase { private readonly Func, GroupChatManager> _managerFactory; private readonly HashSet _participants = new(AIAgentIDEqualityComparer.Instance); - private string _name = string.Empty; - private string _description = string.Empty; internal GroupChatWorkflowBuilder(Func, GroupChatManager> managerFactory) => this._managerFactory = managerFactory; @@ -44,28 +42,6 @@ public GroupChatWorkflowBuilder AddParticipants(params IEnumerable agen return this; } - /// - /// Sets the human-readable name for the workflow. - /// - /// The name of the workflow. - /// This instance of the . - public GroupChatWorkflowBuilder WithName(string name) - { - this._name = name; - return this; - } - - /// - /// Sets the description for the workflow. - /// - /// The description of what the workflow does. - /// This instance of the . - public GroupChatWorkflowBuilder WithDescription(string description) - { - this._description = description; - return this; - } - /// /// Builds a composed of agents that operate via group chat, with the next /// agent to process messages selected by the group chat manager. @@ -89,15 +65,7 @@ public Workflow Build() ExecutorBinding host = groupChatHostFactory.BindExecutor(nameof(GroupChatHost)); WorkflowBuilder builder = new(host); - if (!string.IsNullOrEmpty(this._name)) - { - builder = builder.WithName(this._name); - } - - if (!string.IsNullOrEmpty(this._description)) - { - builder = builder.WithDescription(this._description); - } + this.ApplyMetadata(builder); foreach (var participant in agentMap.Values) { @@ -106,6 +74,15 @@ public Workflow Build() .AddEdge(participant, host); } - return builder.WithOutputFrom(host).Build(); + this.ApplyOutputDesignations(builder, agentMap, "group chat", () => + { + builder.WithOutputFrom(host); + if (agentMap.Count > 0) + { + builder.WithIntermediateOutputFrom([.. agentMap.Values]); + } + }); + + return builder.Build(); } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/HandoffWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/HandoffWorkflowBuilder.cs index 7142faad0b..0d4b56aec7 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/HandoffWorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/HandoffWorkflowBuilder.cs @@ -38,7 +38,8 @@ public sealed class HandoffWorkflowBuilder(AIAgent initialAgent) : HandoffWorkfl /// Provides a builder for specifying the handoff relationships between agents and building the resulting workflow. /// [Experimental(DiagnosticConstants.ExperimentalFeatureDiagnostic)] -public class HandoffWorkflowBuilderCore where TBuilder : HandoffWorkflowBuilderCore +public class HandoffWorkflowBuilderCore : OrchestrationBuilderBase + where TBuilder : HandoffWorkflowBuilderCore { /// /// The prefix for function calls that trigger handoffs to other agents; the full name is then `{FunctionPrefix}<agent_id>`, @@ -54,8 +55,6 @@ public class HandoffWorkflowBuilderCore where TBuilder : HandoffWorkfl private bool _emitAgentResponseUpdateEvents; private HandoffToolCallFilteringBehavior _toolCallFilteringBehavior = HandoffToolCallFilteringBehavior.HandoffOnly; private bool _returnToPrevious; - private string? _name; - private string? _description; /// /// Initializes a new instance of the class with no handoff relationships. @@ -99,20 +98,6 @@ public TBuilder WithHandoffInstructions(string? instructions) return (TBuilder)this; } - /// - public TBuilder WithName(string name) - { - this._name = name; - return (TBuilder)this; - } - - /// - public TBuilder WithDescription(string description) - { - this._description = description; - return (TBuilder)this; - } - /// /// Sets a value indicating whether agent streaming update events should be emitted during execution. /// If , the value will be taken from the @@ -346,16 +331,32 @@ public Workflow Build() builder.AddEdge(start, executors[this._initialAgent.Id]); } - if (!string.IsNullOrWhiteSpace(this._name)) + this.ApplyMetadata(builder); + + // Ensure the end executor is bound regardless of whether it ends up as an output + // designation source — the user may take full control of output designations. + builder.BindExecutor(end); + + // Build the AIAgent -> ExecutorBinding map the base helper expects. + Dictionary agentMap = new(AIAgentIDEqualityComparer.Instance); + foreach (AIAgent agent in this._allAgents) { - builder.WithName(this._name); + agentMap[agent] = executors[agent.Id]; } - if (!string.IsNullOrWhiteSpace(this._description)) + this.ApplyOutputDesignations(builder, agentMap, "handoff", () => { - builder.WithDescription(this._description); - } + // Defaults (matches Python's Handoff orchestration): + // end -> terminal output + // every handoff agent -> intermediate output + builder.WithOutputFrom(end); + List agentBindings = [.. executors.Values]; + if (agentBindings.Count > 0) + { + builder.WithIntermediateOutputFrom(agentBindings); + } + }); - return builder.WithOutputFrom(end).Build(); + return builder.Build(); } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index d6c7d301e3..dec2b31bb8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -241,30 +241,47 @@ private async ValueTask YieldOutputAsync(string sourceId, object output, Cancell this.CheckEnded(); Throw.IfNull(output); - // Special-case AgentResponse and AgentResponseUpdate to create their specific event types - // and bypass the output filter (for backwards compatibility - these events were previously - // emitted directly via AddEventAsync without filtering) - if (output is AgentResponseUpdate update) - { - await this.AddEventAsync(new AgentResponseUpdateEvent(sourceId, update), cancellationToken).ConfigureAwait(false); - return; - } - else if (output is AgentResponse response) + bool isAgentResponseShaped = output is AgentResponse or AgentResponseUpdate; + + if (isAgentResponseShaped && !Futures.EnableAgentResponseOutputTaggingAndFiltering) { - await this.AddEventAsync(new AgentResponseEvent(sourceId, response), cancellationToken).ConfigureAwait(false); + // Legacy bypass: AgentResponse/AgentResponseUpdate skip the output filter and are + // emitted as their typed event subclasses with no tags. Preserved verbatim for + // back-compat; once Futures.EnableAgentResponseOutputTaggingAndFiltering becomes the + // default in v2.0.0, this branch goes away. + WorkflowEvent typedEvent = output switch + { + AgentResponseUpdate u => new AgentResponseUpdateEvent(sourceId, u), + AgentResponse r => new AgentResponseEvent(sourceId, r), + _ => throw new InvalidOperationException("Unexpected AIAgent-shaped payload type."), + }; + await this.AddEventAsync(typedEvent, cancellationToken).ConfigureAwait(false); return; } Executor sourceExecutor = await this.EnsureExecutorAsync(sourceId, tracer: null, cancellationToken).ConfigureAwait(false); - if (!sourceExecutor.CanOutput(output.GetType())) + if (!isAgentResponseShaped && !sourceExecutor.CanOutput(output.GetType())) { + // AIAgent-shaped payloads bypass the per-executor declared-yield check (matching the + // legacy bypass branch above). The AIAgent host executor relays the agent's output + // without declaring AgentResponse(Update) in its Yields set, so a CanOutput probe + // here would always reject — but those payloads are always a valid output shape. throw new InvalidOperationException($"Cannot output object of type {output.GetType().Name}. Expecting one of [{string.Join(", ", sourceExecutor.OutputTypes)}]."); } - if (this._outputFilter.CanOutput(sourceId, output)) + if (!this._outputFilter.TryGetTags(sourceId, out HashSet? tags)) { - await this.AddEventAsync(new WorkflowOutputEvent(output, sourceId), cancellationToken).ConfigureAwait(false); + // Not designated as an output source — drop silently (matches Python semantics). + return; } + + WorkflowOutputEvent evt = output switch + { + AgentResponseUpdate u => new AgentResponseUpdateEvent(sourceId, u, tags), + AgentResponse r => new AgentResponseEvent(sourceId, r, tags), + _ => new WorkflowOutputEvent(output, sourceId, tags), + }; + await this.AddEventAsync(evt, cancellationToken).ConfigureAwait(false); } public IExternalRequestContext BindExternalRequestContext(string executorId) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs index 4470c4ee9a..c75a3d045b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/MagenticWorkflowBuilder.cs @@ -28,11 +28,9 @@ namespace Microsoft.Agents.AI.Workflows; /// /// [Experimental(DiagnosticConstants.ExperimentalFeatureDiagnostic)] -public class MagenticWorkflowBuilder(AIAgent managerAgent) +public class MagenticWorkflowBuilder(AIAgent managerAgent) : OrchestrationBuilderBase { private readonly List _team = new(); - private string? _name; - private string? _description; private int _maxStalls = TaskLimits.DefaultMaxStallCount; private int? _maxRounds; private int? _maxResets; @@ -45,20 +43,6 @@ public MagenticWorkflowBuilder AddParticipants(params IEnumerable agent return this; } - /// - public MagenticWorkflowBuilder WithName(string name) - { - this._name = name; - return this; - } - - /// - public MagenticWorkflowBuilder WithDescription(string description) - { - this._description = description; - return this; - } - /// /// Set the maximum number of coordination rounds. means unlimited. /// @@ -115,28 +99,29 @@ private WorkflowBuilder ReduceToWorkflowBuilder() ForwardIncomingMessages = false }; + Dictionary teamMap = new(AIAgentIDEqualityComparer.Instance); List teamBindings = []; foreach (AIAgent agent in team) { ExecutorBinding binding = agent.BindAsExecutor(options); teamBindings.Add(binding); + teamMap[agent] = binding; result.AddEdge(binding, orchestrator); } - result.AddFanOutEdge(orchestrator, teamBindings) - .WithOutputFrom(orchestrator); - - if (!string.IsNullOrWhiteSpace(this._name)) - { - result.WithName(this._name); - } + result.AddFanOutEdge(orchestrator, teamBindings); - if (!string.IsNullOrWhiteSpace(this._description)) + this.ApplyOutputDesignations(result, teamMap, "Magentic", () => { - result.WithDescription(this._description); - } - + result.WithOutputFrom(orchestrator); + if (teamMap.Count > 0) + { + result.WithIntermediateOutputFrom([.. teamMap.Values]); + } + }); + + this.ApplyMetadata(result); return result; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/OrchestrationBuilderBase.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/OrchestrationBuilderBase.cs new file mode 100644 index 0000000000..6d44c03509 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/OrchestrationBuilderBase.cs @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Common fluent surface shared by every orchestration-style workflow builder: +/// human-readable name + description, and the +/// / output-designation +/// pair with memoized defaults-suppression semantics. +/// +/// The concrete builder type, for fluent self-return. +public abstract class OrchestrationBuilderBase + where TBuilder : OrchestrationBuilderBase +{ + /// Optional workflow name; applied to the inner at Build(). + protected string? Name { get; private set; } + + /// Optional workflow description; applied to the inner at Build(). + protected string? Description { get; private set; } + + /// + /// Memoized output designations. means the user has not made any + /// explicit designation, and the orchestration-specific defaults will be applied at + /// Build() time. A non- (possibly empty) map means the user took + /// control and only these designations will be replayed onto the inner + /// . An entry's value is the set of tags requested for the + /// agent — an empty set encodes a terminal-only designation. + /// + protected Dictionary>? OutputDesignations { get; private set; } + + /// Sets the human-readable name for the workflow. + public TBuilder WithName(string name) + { + this.Name = name; + return (TBuilder)this; + } + + /// Sets the description for the workflow. + public TBuilder WithDescription(string description) + { + this.Description = description; + return (TBuilder)this; + } + + /// + /// Designates the given as sources of terminal workflow output. + /// Calling any output-designation method (this or ) + /// suppresses the orchestration-specific defaults: only the user-specified designations + /// reach the inner . + /// + public TBuilder WithOutputFrom(params IEnumerable agents) + { + Throw.IfNull(agents); + this.OutputDesignations ??= new(AIAgentIDEqualityComparer.Instance); + foreach (AIAgent agent in agents) + { + Throw.IfNull(agent, nameof(agents)); + if (!this.OutputDesignations.ContainsKey(agent)) + { + this.OutputDesignations[agent] = []; + } + } + return (TBuilder)this; + } + + /// + /// Designates the given as sources of intermediate workflow + /// output. See for the defaults-suppression semantics. + /// + public TBuilder WithIntermediateOutputFrom(IEnumerable agents) + { + Throw.IfNull(agents); + this.OutputDesignations ??= new(AIAgentIDEqualityComparer.Instance); + foreach (AIAgent agent in agents) + { + Throw.IfNull(agent, nameof(agents)); + if (!this.OutputDesignations.TryGetValue(agent, out HashSet? tags)) + { + tags = []; + this.OutputDesignations[agent] = tags; + } + tags.Add(OutputTag.Intermediate); + } + return (TBuilder)this; + } + + /// + /// Applies the optional and to . + /// Subclasses should call this from their Build() implementation. + /// + protected void ApplyMetadata(WorkflowBuilder builder) + { + Throw.IfNull(builder); + if (!string.IsNullOrWhiteSpace(this.Name)) + { + builder.WithName(this.Name!); + } + if (!string.IsNullOrWhiteSpace(this.Description)) + { + builder.WithDescription(this.Description!); + } + } + + /// + /// Applies the user's memoized output designations to , or invokes + /// if the user made no explicit designation. + /// + /// The inner . + /// Map from participating to its bound executor. + /// Used in the not-a-participant error message (e.g. "sequential", "group chat"). + /// Action invoked when no explicit designation was made. + protected void ApplyOutputDesignations( + WorkflowBuilder builder, + IReadOnlyDictionary agentMap, + string orchestrationKind, + Action applyDefaults) + { + Throw.IfNull(builder); + Throw.IfNull(agentMap); + Throw.IfNull(applyDefaults); + + if (this.OutputDesignations is null) + { + applyDefaults(); + return; + } + + foreach (AIAgent agent in this.OutputDesignations.Keys) + { + if (!agentMap.TryGetValue(agent, out ExecutorBinding? binding)) + { + throw new InvalidOperationException( + $"Output designation references agent '{agent.Name ?? agent.Id}', which is not a participant in this {orchestrationKind} workflow."); + } + + HashSet tags = this.OutputDesignations[agent]; + if (tags.Count == 0) + { + builder.WithOutputFrom(binding); + } + else + { + foreach (OutputTag tag in tags) + { + builder.WithOutputFrom(binding, tag); + } + } + } + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/OutputTag.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/OutputTag.cs new file mode 100644 index 0000000000..5e5f71c22f --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/OutputTag.cs @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Text.Json.Serialization; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Identifies the kind of output that a represents. +/// A thin ChatRole-style wrapper around a normalized string , +/// with value equality and a closed set of well-known singletons (the constructor is +/// for now). +/// +[JsonConverter(typeof(OutputTagJsonConverter))] +public readonly struct OutputTag : IEquatable +{ + /// + /// The normalized string identifier of the tag. Compared with ordinal equality. + /// + public string? Value { get; } + + internal OutputTag(string value) + { + this.Value = Throw.IfNullOrEmpty(value); + } + + /// + /// The tag denoting an intermediate workflow output — emitted by executors + /// registered via . + /// Terminal (non-intermediate) outputs carry no tag. + /// + public static OutputTag Intermediate { get; } = new("intermediate"); + + /// + public bool Equals(OutputTag other) => string.Equals(this.Value, other.Value, StringComparison.Ordinal); + + /// + public override bool Equals(object? obj) => obj is OutputTag other && this.Equals(other); + + /// + public override int GetHashCode() => this.Value is null ? 0 : StringComparer.Ordinal.GetHashCode(this.Value); + + /// Determines whether two values are equal. + public static bool operator ==(OutputTag left, OutputTag right) => left.Equals(right); + + /// Determines whether two values are not equal. + public static bool operator !=(OutputTag left, OutputTag right) => !left.Equals(right); + + /// + public override string ToString() => this.Value ?? string.Empty; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/OutputTagJsonConverter.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/OutputTagJsonConverter.cs new file mode 100644 index 0000000000..4b19e3b2cb --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/OutputTagJsonConverter.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// JSON converter for that round-trips the underlying +/// as a bare JSON string. +/// +internal sealed class OutputTagJsonConverter : JsonConverter +{ + public override OutputTag Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + string? value = reader.GetString(); + if (string.IsNullOrEmpty(value)) + { + return default; + } + + // Reuse the well-known singleton where possible so callers can do reference + // comparisons on the common case without paying the extra allocation cost. + if (string.Equals(value, OutputTag.Intermediate.Value, StringComparison.Ordinal)) + { + return OutputTag.Intermediate; + } + + return new OutputTag(value!); + } + + public override void Write(Utf8JsonWriter writer, OutputTag value, JsonSerializerOptions options) + { + if (value.Value is null) + { + writer.WriteNullValue(); + return; + } + + writer.WriteStringValue(value.Value); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SequentialWorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SequentialWorkflowBuilder.cs new file mode 100644 index 0000000000..cfe1d1e1d4 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SequentialWorkflowBuilder.cs @@ -0,0 +1,84 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Fluent builder for sequential agent workflows: a pipeline where the output of one +/// agent is the input to the next, terminating in an aggregator that yields the +/// accumulated s as the workflow output. +/// +/// +/// When no explicit output designations are made, the default is the Python-aligned +/// shape: the terminal aggregator is the workflow output, and every participating agent +/// is designated as an intermediate output source. Calling +/// +/// or +/// at all suppresses these defaults. +/// +public sealed class SequentialWorkflowBuilder : OrchestrationBuilderBase +{ + private readonly List _agents = []; + + /// + /// Initializes a new with the given pipeline + /// of . + /// + public SequentialWorkflowBuilder(params IEnumerable agents) + { + Throw.IfNull(agents); + foreach (AIAgent agent in agents) + { + Throw.IfNull(agent, nameof(agents)); + this._agents.Add(agent); + } + } + + /// Builds the configured sequential workflow. + public Workflow Build() + { + if (this._agents.Count == 0) + { + throw new ArgumentException("At least one agent must be provided to the SequentialWorkflowBuilder.", "agents"); + } + + AIAgentHostOptions options = new() + { + ReassignOtherAgentsAsUsers = true, + ForwardIncomingMessages = true, + }; + + Dictionary agentMap = new(AIAgentIDEqualityComparer.Instance); + List agentExecutors = new(this._agents.Count); + foreach (AIAgent agent in this._agents) + { + ExecutorBinding binding = agent.BindAsExecutor(options); + agentExecutors.Add(binding); + agentMap[agent] = binding; + } + + ExecutorBinding previous = agentExecutors[0]; + WorkflowBuilder builder = new(previous); + foreach (ExecutorBinding next in agentExecutors.Skip(1)) + { + builder.AddEdge(previous, next); + previous = next; + } + + OutputMessagesExecutor end = new(); + builder.AddEdge(previous, end).BindExecutor(end); + + this.ApplyMetadata(builder); + this.ApplyOutputDesignations(builder, agentMap, "sequential", () => + { + builder.WithOutputFrom(end); + builder.WithIntermediateOutputFrom(agentExecutors); + }); + + return builder.Build(); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs index eff1cfb9a3..03c8f6a920 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs @@ -24,7 +24,7 @@ public class Workflow internal Dictionary ExecutorBindings { get; init; } = []; internal Dictionary> Edges { get; init; } = []; - internal HashSet OutputExecutors { get; init; } = []; + internal Dictionary> OutputExecutors { get; init; } = new(StringComparer.Ordinal); /// /// Gets the collection of edges grouped by their source node identifier. @@ -221,7 +221,7 @@ public async ValueTask DescribeProtocolAsync(CancellationTok startExecutor.AttachRequestContext(new NoOpExternalRequestContext()); ProtocolDescriptor inputProtocol = startExecutor.DescribeProtocol(); - IEnumerable> outputExecutorTasks = this.OutputExecutors.Select(executorId => this.ExecutorBindings[executorId].CreateInstanceAsync(string.Empty).AsTask()); + IEnumerable> outputExecutorTasks = this.OutputExecutors.Keys.Select(executorId => this.ExecutorBindings[executorId].CreateInstanceAsync(string.Empty).AsTask()); Executor[] outputExecutors = await Task.WhenAll(outputExecutorTasks).ConfigureAwait(false); IEnumerable yieldedTypes = outputExecutors.SelectMany(executor => executor.DescribeProtocol().Yields); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs index e29abca5ab..3acb0c3f61 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs @@ -33,7 +33,7 @@ private readonly record struct EdgeConnection(string SourceId, string TargetId) private readonly HashSet _unboundExecutors = []; private readonly HashSet _conditionlessConnections = []; private readonly Dictionary _requestPorts = []; - private readonly HashSet _outputExecutors = []; + private readonly Dictionary> _outputExecutors = new(StringComparer.Ordinal); private readonly string _startExecutorId; private string? _name; @@ -97,22 +97,89 @@ private ExecutorBinding Track(ExecutorBinding binding) } /// - /// Register executors as an output source. Executors can use to yield output values. - /// By default, message handlers with a non-void return type will also be yielded, unless - /// is set to . + /// Register executors as a source of terminal workflow outputs. Executors can use + /// to yield output values; yielded values from + /// registered executors are surfaced as (or one of its + /// subclasses) with an empty set. + /// By default, message handlers with a non-void return type will also be yielded, unless + /// is set to . /// - /// - /// + /// + /// AIAgent payloads ( / ) only + /// participate in this designation when + /// is + /// ; otherwise they are emitted unconditionally and untagged. + /// + /// The executors to register as output sources. + /// The current instance, enabling fluent configuration. public WorkflowBuilder WithOutputFrom(params ExecutorBinding[] executors) { foreach (ExecutorBinding executor in executors) { - this._outputExecutors.Add(this.Track(executor).Id); + this.EnsureOutputExecutor(this.Track(executor).Id); } return this; } + /// + /// Register executors as a source of workflow outputs carrying the given . + /// Tags accumulate across repeated calls; the registered id always exists with the union of all + /// tags applied across all calls (and an empty set if only the untagged + /// overload was used). + /// + /// + /// Forward-looking surface for when the constructor opens to + /// user-defined tags. Today, prefer + /// + /// for the case. + /// + /// The executors to register. + /// The tag to apply to events yielded by the listed executors. + /// The current instance, enabling fluent configuration. + public WorkflowBuilder WithOutputFrom(IEnumerable executors, OutputTag tag) + { + Throw.IfNull(executors); + + foreach (ExecutorBinding executor in executors) + { + this.EnsureOutputExecutor(this.Track(executor).Id).Add(tag); + } + + return this; + } + + /// + /// Register a single executor as a source of workflow outputs carrying the given . + /// Convenience overload for the single-executor case; equivalent to passing a one-element sequence + /// to . + /// + /// The executor to register. + /// The tag to apply to events yielded by the executor. + /// The current instance, enabling fluent configuration. + public WorkflowBuilder WithOutputFrom(ExecutorBinding executor, OutputTag tag) + { + Throw.IfNull(executor); + + this.EnsureOutputExecutor(this.Track(executor).Id).Add(tag); + + return this; + } + + /// + /// Ensures the executor id is present in ; if newly added, + /// initializes with an empty tag set. Returns the tag set for the id (mutable). + /// + private HashSet EnsureOutputExecutor(string executorId) + { + if (!this._outputExecutors.TryGetValue(executorId, out HashSet? tags)) + { + tags = []; + this._outputExecutors[executorId] = tags; + } + return tags; + } + /// /// Sets the human-readable name for the workflow. /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilderExtensions.cs index a22aa8e722..6db047255d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilderExtensions.cs @@ -211,4 +211,28 @@ public static WorkflowBuilder AddSwitch(this WorkflowBuilder builder, ExecutorBi return switchBuilder.ReduceToFanOut(builder, source); } + + /// + /// Register executors as a source of intermediate workflow outputs. The resulting + /// s carry in their + /// set, and + /// returns + /// . Use this for progress updates, partial results, and other + /// non-terminal emissions that downstream consumers (DevUI, logging, Workflow-as-Agent + /// surfaces) should see distinctly from the workflow's final output. + /// + /// + /// AIAgent payloads ( / ) only + /// participate in this designation when + /// is + /// ; otherwise they bypass the filter and are emitted untagged. + /// + /// The workflow builder to register executors on. + /// The executors to register as intermediate output sources. + /// The , enabling fluent configuration. + public static WorkflowBuilder WithIntermediateOutputFrom(this WorkflowBuilder builder, IEnumerable executors) + { + Throw.IfNull(builder); + return builder.WithOutputFrom(executors, OutputTag.Intermediate); + } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowOutputEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowOutputEvent.cs index f0fe884f6d..73b60f9643 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowOutputEvent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowOutputEvent.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. using System; +using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Text.Json.Serialization; @@ -14,13 +15,36 @@ namespace Microsoft.Agents.AI.Workflows; public class WorkflowOutputEvent : WorkflowEvent { /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class with no tags. /// /// The output data. /// The identifier of the executor that yielded this output. - public WorkflowOutputEvent(object data, string executorId) : base(data) + public WorkflowOutputEvent(object data, string executorId) : this(data, executorId, tags: null) + { + } + + /// + /// Initializes a new instance of the class carrying the + /// given output tag. + /// + /// The output data. + /// The identifier of the executor that yielded this output. + /// The single output tag to associate with this event. + public WorkflowOutputEvent(object data, string executorId, OutputTag tag) : this(data, executorId, tags: new[] { tag }) + { + } + + /// + /// Initializes a new instance of the class carrying the + /// given output tags (deduplicated). + /// + /// The output data. + /// The identifier of the executor that yielded this output. + /// The output tags to associate with this event. May be or empty (the event is then untagged). + public WorkflowOutputEvent(object data, string executorId, IEnumerable? tags) : base(data) { this.ExecutorId = executorId; + this.Tags = tags is null ? new HashSet() : new HashSet(tags); } /// @@ -32,8 +56,21 @@ public WorkflowOutputEvent(object data, string executorId) : base(data) /// The unique identifier of the executor that yielded this output. /// [Obsolete("Use ExecutorId instead.")] + [JsonIgnore] public string SourceId => this.ExecutorId; + /// + /// The set of output tags associated with this event. Never ; + /// empty for terminal/regular outputs. The presence of + /// marks this event as an intermediate output. + /// + public HashSet Tags { get; } + + /// + /// Returns if this event carries the given tag. + /// + public bool HasTag(OutputTag tag) => this.Tags.Contains(tag); + /// /// Determines whether the underlying data is of the specified type or a derived type. /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowOutputEventExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowOutputEventExtensions.cs new file mode 100644 index 0000000000..1bbad70f87 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowOutputEventExtensions.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Extension helpers for inspecting tag membership. +/// +public static class WorkflowOutputEventExtensions +{ + /// + /// Returns if the event carries + /// in its . + /// + public static bool IsIntermediate(this WorkflowOutputEvent evt) + { + Throw.IfNull(evt); + return evt.HasTag(OutputTag.Intermediate); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index 719b72e112..2caba8f3ec 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -520,7 +520,12 @@ IAsyncEnumerable InvokeStageAsync( goto default; case AgentResponseEvent agentResponse: - if (!this._includeWorkflowOutputsInResponse) + // Under Futures.EnableAgentResponseOutputTaggingAndFiltering=true, mirror + // AgentResponseUpdateEvent's behavior: always forward, regardless of the + // _includeWorkflowOutputsInResponse host flag. Under the legacy default, + // keep today's behavior — gated by the include flag. + if (!Futures.EnableAgentResponseOutputTaggingAndFiltering && + !this._includeWorkflowOutputsInResponse) { goto default; } @@ -539,7 +544,11 @@ IAsyncEnumerable InvokeStageAsync( _ => null }; - if (!this._includeWorkflowOutputsInResponse || updateMessages == null) + // Same gating asymmetry as AgentResponseEvent: intermediate outputs are + // forwarded unconditionally; terminal/untagged outputs require the host + // to opt in via _includeWorkflowOutputsInResponse. + if (updateMessages == null || + (!output.IsIntermediate() && !this._includeWorkflowOutputsInResponse)) { goto default; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs index 8b3d3e4ce8..7cf01aa0cd 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowsJsonUtilities.cs @@ -80,9 +80,8 @@ private static JsonSerializerOptions CreateDefaultOptions() [JsonSerializable(typeof(ExecutorIdentity))] [JsonSerializable(typeof(RunnerStateData))] - // Workflow Representation Types - [JsonSerializable(typeof(WorkflowInfo))] - [JsonSerializable(typeof(EdgeConnection))] + // Workflow Output Types + [JsonSerializable(typeof(OutputTag))] // Workflow-as-Agent [JsonSerializable(typeof(WorkflowChatHistoryProvider.StoreState))] diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs index 9dcd928314..d615b8bc74 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentWorkflowBuilderTests.cs @@ -1,15 +1,12 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; -using System.Text; -using System.Text.Json; using System.Text.RegularExpressions; -using System.Threading; using System.Threading.Tasks; -using Microsoft.Agents.AI.Workflows.InProc; +using FluentAssertions; using Microsoft.Extensions.AI; #pragma warning disable SYSLIB1045 // Use GeneratedRegex @@ -17,325 +14,164 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests; +/// +/// Tests targeting the static helper surface — +/// , +/// , +/// and the various Create*BuilderWith factories. Per-builder unit tests live in their own +/// files (, , etc.). +/// public class AgentWorkflowBuilderTests { [Fact] - public void BuildSequential_InvalidArguments_Throws() + public void Test_AgentWorkflowBuilder_BuildSequential_InvalidArguments_Throws() { Assert.Throws("agents", () => AgentWorkflowBuilder.BuildSequential(workflowName: null!, null!)); Assert.Throws("agents", () => AgentWorkflowBuilder.BuildSequential()); } - [Fact] - public void BuildConcurrent_InvalidArguments_Throws() - { - Assert.Throws("agents", () => AgentWorkflowBuilder.BuildConcurrent(null!)); - } - - [Fact] - public void BuildGroupChat_InvalidArguments_Throws() + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + public async Task Test_AgentWorkflowBuilder_BuildSequential_DelegatesToBuilderAsync(int numAgents) { - Assert.Throws("managerFactory", () => AgentWorkflowBuilder.CreateGroupChatBuilderWith(null!)); + Workflow workflow = AgentWorkflowBuilder.BuildSequential( + from i in Enumerable.Range(1, numAgents) + select new OrchestrationTestHelpers.DoubleEchoAgent($"agent{i}")); - var groupChat = AgentWorkflowBuilder.CreateGroupChatBuilderWith(_ => new RoundRobinGroupChatManager([new DoubleEchoAgent("a1")])); - Assert.NotNull(groupChat); - Assert.Throws("agents", () => groupChat.AddParticipants(null!)); - Assert.Throws("agents", () => groupChat.AddParticipants([null!])); - Assert.Throws("agents", () => groupChat.AddParticipants(new DoubleEchoAgent("a1"), null!)); + // Smoke: end-to-end run produces a non-empty result. Detailed pipeline-ordering + // assertions live in SequentialWorkflowBuilderTests. + (string updateText, List? result, _, _) = + await OrchestrationTestHelpers.RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, "abc")]); - Assert.Throws("agents", () => new RoundRobinGroupChatManager(null!)); + Assert.NotNull(result); + Assert.Equal(numAgents + 1, result.Count); + Assert.NotEmpty(updateText); } [Fact] - public void GroupChatManager_MaximumIterationCount_Invalid_Throws() + public void Test_AgentWorkflowBuilder_BuildSequential_WithWorkflowNameSetsNameOnWorkflow() { - var manager = new RoundRobinGroupChatManager([new DoubleEchoAgent("a1")]); - - const int DefaultMaxIterations = 40; - Assert.Equal(DefaultMaxIterations, manager.MaximumIterationCount); - Assert.Throws("value", void () => manager.MaximumIterationCount = 0); - Assert.Throws("value", void () => manager.MaximumIterationCount = -1); - Assert.Equal(DefaultMaxIterations, manager.MaximumIterationCount); - - manager.MaximumIterationCount = 30; - Assert.Equal(30, manager.MaximumIterationCount); + Workflow workflow = AgentWorkflowBuilder.BuildSequential( + "static-sequential", + new OrchestrationTestHelpers.DoubleEchoAgent("agent1")); - manager.MaximumIterationCount = 1; - Assert.Equal(1, manager.MaximumIterationCount); - - manager.MaximumIterationCount = int.MaxValue; - Assert.Equal(int.MaxValue, manager.MaximumIterationCount); + workflow.Name.Should().Be("static-sequential"); } [Fact] - public void BuildGroupChat_WithNameAndDescription_SetsWorkflowNameAndDescription() + public void Test_AgentWorkflowBuilder_BuildConcurrent_InvalidArguments_Throws() { - const string WorkflowName = "Test Group Chat"; - const string WorkflowDescription = "A test group chat workflow"; - - var workflow = AgentWorkflowBuilder - .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 2 }) - .AddParticipants(new DoubleEchoAgent("agent1"), new DoubleEchoAgent("agent2")) - .WithName(WorkflowName) - .WithDescription(WorkflowDescription) - .Build(); - - Assert.Equal(WorkflowName, workflow.Name); - Assert.Equal(WorkflowDescription, workflow.Description); + Assert.Throws("agents", () => AgentWorkflowBuilder.BuildConcurrent(null!)); } [Fact] - public void BuildGroupChat_WithNameOnly_SetsWorkflowName() + public async Task Test_AgentWorkflowBuilder_BuildConcurrent_DelegatesToBuilderAsync() { - const string WorkflowName = "Named Group Chat"; + StrongBox> barrier = new(); + StrongBox remaining = new(); - var workflow = AgentWorkflowBuilder - .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 2 }) - .AddParticipants(new DoubleEchoAgent("agent1")) - .WithName(WorkflowName) - .Build(); + Workflow workflow = AgentWorkflowBuilder.BuildConcurrent( + [ + new OrchestrationTestHelpers.DoubleEchoAgentWithBarrier("agent1", barrier, remaining), + new OrchestrationTestHelpers.DoubleEchoAgentWithBarrier("agent2", barrier, remaining), + ]); - Assert.Equal(WorkflowName, workflow.Name); - Assert.Null(workflow.Description); - } + barrier.Value = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + remaining.Value = 2; - [Fact] - public void BuildGroupChat_WithoutNameOrDescription_DefaultsToNull() - { - var workflow = AgentWorkflowBuilder - .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 2 }) - .AddParticipants(new DoubleEchoAgent("agent1")) - .Build(); + (string updateText, List? result, _, _) = + await OrchestrationTestHelpers.RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, "abc")]); - Assert.Null(workflow.Name); - Assert.Null(workflow.Description); + Assert.NotEmpty(updateText); + Assert.NotNull(result); + Assert.Equal(2, result.Count); + Assert.Single(Regex.Matches(updateText, "agent1")); + Assert.Single(Regex.Matches(updateText, "agent2")); } - [Theory] - [InlineData(1)] - [InlineData(2)] - [InlineData(3)] - [InlineData(4)] - [InlineData(5)] - public async Task BuildSequential_AgentsRunInOrderAsync(int numAgents) + [Fact] + public void Test_AgentWorkflowBuilder_BuildConcurrent_WithWorkflowNameSetsNameOnWorkflow() { - var workflow = AgentWorkflowBuilder.BuildSequential( - from i in Enumerable.Range(1, numAgents) - select new DoubleEchoAgent($"agent{i}")); - - for (int iter = 0; iter < 3; iter++) - { - const string UserInput = "abc"; - (string updateText, List? result, _, _) = await RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, UserInput)]); - - Assert.NotNull(result); - Assert.Equal(numAgents + 1, result.Count); - - Assert.Equal(ChatRole.User, result[0].Role); - Assert.Null(result[0].AuthorName); - Assert.Equal(UserInput, result[0].Text); - - string[] texts = new string[numAgents + 1]; - texts[0] = UserInput; - string expectedTotal = string.Empty; - for (int i = 1; i < numAgents + 1; i++) - { - string id = $"agent{((i - 1) % numAgents) + 1}"; - texts[i] = $"{id}{Double(string.Concat(texts.Take(i)))}"; - Assert.Equal(ChatRole.Assistant, result[i].Role); - Assert.Equal(id, result[i].AuthorName); - Assert.Equal(texts[i], result[i].Text); - expectedTotal += texts[i]; - } - - Assert.Equal(expectedTotal, updateText); - Assert.Equal(UserInput + expectedTotal, string.Concat(result)); - - static string Double(string s) => s + s; - } + Workflow workflow = AgentWorkflowBuilder.BuildConcurrent( + "static-concurrent", + [new OrchestrationTestHelpers.DoubleEchoAgent("agent1")]); + + workflow.Name.Should().Be("static-concurrent"); } - private class DoubleEchoAgent(string name) : AIAgent + [Fact] + public async Task Test_AgentWorkflowBuilder_BuildConcurrent_AggregatorIsHonoredAsync() { - public override string Name => name; - - protected override ValueTask CreateSessionCoreAsync(CancellationToken cancellationToken = default) - => new(new DoubleEchoAgentSession()); - - protected override ValueTask DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) - => new(new DoubleEchoAgentSession()); - - protected override ValueTask SerializeSessionCoreAsync(AgentSession session, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) - => default; + // Replace the default ("last message from each agent") with a custom aggregator, + // and confirm the workflow yields its result. + List sentinel = [new(ChatRole.Assistant, "custom-aggregator-result")]; - protected override Task RunCoreAsync( - IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) => - throw new NotImplementedException(); + Workflow workflow = AgentWorkflowBuilder.BuildConcurrent( + [new OrchestrationTestHelpers.DoubleEchoAgent("agent1")], + aggregator: _ => sentinel); - protected override async IAsyncEnumerable RunCoreStreamingAsync( - IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - await Task.Yield(); + (_, List? result, _, _) = + await OrchestrationTestHelpers.RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, "abc")]); - var contents = messages.SelectMany(m => m.Contents).ToList(); - string id = Guid.NewGuid().ToString("N"); - yield return new AgentResponseUpdate(ChatRole.Assistant, this.Name) { AuthorName = this.Name, MessageId = id }; - yield return new AgentResponseUpdate(ChatRole.Assistant, contents) { AuthorName = this.Name, MessageId = id }; - yield return new AgentResponseUpdate(ChatRole.Assistant, contents) { AuthorName = this.Name, MessageId = id }; - } + result.Should().NotBeNull().And.ContainSingle(); + result![0].Text.Should().Be("custom-aggregator-result"); } - private sealed class DoubleEchoAgentSession() : AgentSession(); + [Fact] + public void Test_AgentWorkflowBuilder_CreateSequentialBuilderWith_RejectsNull() + { + Assert.Throws("agents", () => AgentWorkflowBuilder.CreateSequentialBuilderWith(null!)); + } [Fact] - public async Task BuildConcurrent_AgentsRunInParallelAsync() + public void Test_AgentWorkflowBuilder_CreateSequentialBuilderWith_ReturnsConfigurableBuilder() { - StrongBox> barrier = new(); - StrongBox remaining = new(); + OrchestrationTestHelpers.DoubleEchoAgent agent = new("agent1"); - var workflow = AgentWorkflowBuilder.BuildConcurrent( - [ - new DoubleEchoAgentWithBarrier("agent1", barrier, remaining), - new DoubleEchoAgentWithBarrier("agent2", barrier, remaining), - ]); + SequentialWorkflowBuilder builder = AgentWorkflowBuilder.CreateSequentialBuilderWith(agent); + Workflow workflow = builder.WithName("via-factory").Build(); - for (int iter = 0; iter < 3; iter++) - { - barrier.Value = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - remaining.Value = 2; - - (string updateText, List? result, _, _) = await RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, "abc")]); - Assert.NotEmpty(updateText); - Assert.NotNull(result); - - // TODO: https://github.com/microsoft/agent-framework/issues/784 - // These asserts are flaky until we guarantee message delivery order. - Assert.Single(Regex.Matches(updateText, "agent1")); - Assert.Single(Regex.Matches(updateText, "agent2")); - Assert.Equal(4, Regex.Matches(updateText, "abc").Count); - Assert.Equal(2, result.Count); - } + workflow.Name.Should().Be("via-factory"); } - [Theory] - [InlineData(1)] - [InlineData(2)] - [InlineData(3)] - [InlineData(4)] - [InlineData(5)] - public async Task BuildGroupChat_AgentsRunInOrderAsync(int maxIterations) + [Fact] + public void Test_AgentWorkflowBuilder_CreateConcurrentBuilderWith_RejectsNull() { - const int NumAgents = 3; - var workflow = AgentWorkflowBuilder.CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = maxIterations }) - .AddParticipants(new DoubleEchoAgent("agent1"), new DoubleEchoAgent("agent2")) - .AddParticipants(new DoubleEchoAgent("agent3")) - .Build(); - - for (int iter = 0; iter < 3; iter++) - { - const string UserInput = "abc"; - (string updateText, List? result, _, _) = await RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, UserInput)]); - - Assert.NotNull(result); - Assert.Equal(maxIterations + 1, result.Count); - - Assert.Equal(ChatRole.User, result[0].Role); - Assert.Null(result[0].AuthorName); - Assert.Equal(UserInput, result[0].Text); - - string[] texts = new string[maxIterations + 1]; - texts[0] = UserInput; - string expectedTotal = string.Empty; - for (int i = 1; i < maxIterations + 1; i++) - { - string id = $"agent{((i - 1) % NumAgents) + 1}"; - texts[i] = $"{id}{Double(string.Concat(texts.Take(i)))}"; - Assert.Equal(ChatRole.Assistant, result[i].Role); - Assert.Equal(id, result[i].AuthorName); - Assert.Equal(texts[i], result[i].Text); - expectedTotal += texts[i]; - } - - Assert.Equal(expectedTotal, updateText); - Assert.Equal(UserInput + expectedTotal, string.Concat(result)); - - static string Double(string s) => s + s; - } + Assert.Throws("agents", () => AgentWorkflowBuilder.CreateConcurrentBuilderWith(null!)); } - private sealed record WorkflowRunResult(string UpdateText, List? Result, CheckpointInfo? LastCheckpoint, List PendingRequests); - - private static async Task RunWorkflowCheckpointedAsync( - Workflow workflow, List input, InProcessExecutionEnvironment environment, CheckpointInfo? fromCheckpoint = null) + [Fact] + public void Test_AgentWorkflowBuilder_CreateConcurrentBuilderWith_ReturnsConfigurableBuilder() { - await using StreamingRun run = - fromCheckpoint != null ? await environment.ResumeStreamingAsync(workflow, fromCheckpoint) - : await environment.OpenStreamingAsync(workflow); + OrchestrationTestHelpers.DoubleEchoAgent agent = new("agent1"); - await run.TrySendMessageAsync(input); - await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + ConcurrentWorkflowBuilder builder = AgentWorkflowBuilder.CreateConcurrentBuilderWith(agent); + Workflow workflow = builder.WithName("via-factory").Build(); - return await ProcessWorkflowRunAsync(run); + workflow.Name.Should().Be("via-factory"); } - private static async Task ProcessWorkflowRunAsync(StreamingRun run) + [Fact] + public void Test_AgentWorkflowBuilder_CreateHandoffBuilderWith_RejectsNull() { - StringBuilder sb = new(); - WorkflowOutputEvent? output = null; - CheckpointInfo? lastCheckpoint = null; - - List pendingRequests = []; - - await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false).ConfigureAwait(false)) - { - switch (evt) - { - case AgentResponseUpdateEvent responseUpdate: - sb.Append(responseUpdate.Data); - break; - - case RequestInfoEvent requestInfo: - pendingRequests.Add(requestInfo); - break; - - case WorkflowOutputEvent e: - output = e; - break; - - case WorkflowErrorEvent errorEvent: - Assert.Fail($"Workflow execution failed with error: {errorEvent.Exception}"); - break; - - case SuperStepCompletedEvent stepCompleted: - lastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint; - break; - } - } - - return new(sb.ToString(), output?.As>(), lastCheckpoint, pendingRequests); +#pragma warning disable MAAIW001 + Assert.Throws("initialAgent", () => AgentWorkflowBuilder.CreateHandoffBuilderWith(null!)); +#pragma warning restore MAAIW001 } - private static Task RunWorkflowAsync( - Workflow workflow, List input, ExecutionEnvironment executionEnvironment = ExecutionEnvironment.InProcess_Lockstep) - => RunWorkflowCheckpointedAsync(workflow, input, executionEnvironment.ToWorkflowExecutionEnvironment()); + [Fact] + public void Test_AgentWorkflowBuilder_CreateGroupChatBuilderWith_RejectsNull() + { + Assert.Throws("managerFactory", () => AgentWorkflowBuilder.CreateGroupChatBuilderWith(null!)); + } - private sealed class DoubleEchoAgentWithBarrier(string name, StrongBox> barrier, StrongBox remaining) : DoubleEchoAgent(name) + [Fact] + public void Test_AgentWorkflowBuilder_CreateMagenticBuilderWith_RejectsNull() { - protected override async IAsyncEnumerable RunCoreStreamingAsync( - IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - if (Interlocked.Decrement(ref remaining.Value) == 0) - { - barrier.Value!.SetResult(true); - } - - await barrier.Value!.Task.ConfigureAwait(false); - - await foreach (var update in base.RunCoreStreamingAsync(messages, session, options, cancellationToken)) - { - await Task.Yield(); - yield return update; - } - } +#pragma warning disable MAAIW001 + Assert.Throws("managerAgent", () => AgentWorkflowBuilder.CreateMagenticBuilderWith(null!)); +#pragma warning restore MAAIW001 } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/BackwardsCompatibility/JsonCheckpointSerializationTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/BackwardsCompatibility/JsonCheckpointSerializationTests.cs new file mode 100644 index 0000000000..dadaddaabd --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/BackwardsCompatibility/JsonCheckpointSerializationTests.cs @@ -0,0 +1,121 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Text.Json; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.Checkpointing; + +namespace Microsoft.Agents.AI.Workflows.UnitTests.BackwardsCompatibility; + +/// +/// Tests pinning the JSON shape of checkpoint-adjacent types so older payloads keep +/// deserializing correctly after the Outputs overhaul (see implementation-plan §5.7). +/// +public class JsonCheckpointSerializationTests +{ + private static readonly JsonSerializerOptions s_options = WorkflowsJsonUtilities.DefaultOptions; + + private static WorkflowInfo BuildInfoWithOutputExecutors(Dictionary> outputs) + => new( + executors: new Dictionary(), + edges: new Dictionary>(), + requestPorts: [], + startExecutorId: "start", + outputExecutorIds: outputs); + + // ---------- WorkflowOutputEvent.Tags in-process round-trip (no JSON) ---------- + + [Fact] + public void Test_WorkflowOutputEvent_SingleTagCtorPopulatesTags() + { + WorkflowOutputEvent evt = new(data: "hello", executorId: "e1", tag: OutputTag.Intermediate); + + evt.ExecutorId.Should().Be("e1"); + evt.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + evt.HasTag(OutputTag.Intermediate).Should().BeTrue(); + evt.IsIntermediate().Should().BeTrue(); + } + + [Fact] + public void Test_WorkflowOutputEvent_NoTagsCtorIsUntagged() + { + WorkflowOutputEvent evt = new(data: "hello", executorId: "e1"); + + evt.Tags.Should().BeEmpty(); + evt.IsIntermediate().Should().BeFalse("an event with no tags is a terminal/regular output"); + } + + [Fact] + public void Test_WorkflowOutputEvent_MultiTagCtorPreservesAllTags() + { + OutputTag customTag = JsonSerializer.Deserialize("\"custom\"", s_options); + + WorkflowOutputEvent evt = new(data: "hello", executorId: "e1", tags: new[] { OutputTag.Intermediate, customTag }); + + evt.Tags.Should().HaveCount(2); + evt.HasTag(OutputTag.Intermediate).Should().BeTrue(); + evt.HasTag(customTag).Should().BeTrue(); + evt.IsIntermediate().Should().BeTrue(); + } + + // ---------- WorkflowInfo.OutputExecutorIds shape ---------- + // + // Note: per the comment in WorkflowsJsonUtilities, WorkflowEvent / WorkflowOutputEvent + // is *not* currently a serialized checkpoint shape (events are not persisted into + // checkpoints today), so we do not pin a JSON round-trip for Tags on the event itself + // here. The tag JSON round-trip is exercised by OutputTagTests; the + // OutputExecutorIds map shape is the actually-load-bearing back-compat surface. + + [Fact] + public void Test_JsonCheckpoint_WorkflowOutputExecutorsReadsLegacyArrayShape() + { + const string LegacyJson = """ + { + "executors": {}, + "edges": {}, + "requestPorts": [], + "startExecutorId": "start", + "outputExecutorIds": ["a", "b"] + } + """; + + WorkflowInfo? info = JsonSerializer.Deserialize(LegacyJson, s_options); + + info.Should().NotBeNull(); + info!.OutputExecutorIds.Should().HaveCount(2); + info.OutputExecutorIds["a"].Should().BeEmpty("legacy ids are untagged regular outputs"); + info.OutputExecutorIds["b"].Should().BeEmpty(); + } + + [Fact] + public void Test_JsonCheckpoint_WorkflowOutputExecutorsWritesMapShape() + { + Dictionary> outputs = new() + { + ["a"] = [], + ["b"] = [OutputTag.Intermediate], + }; + + WorkflowInfo info = BuildInfoWithOutputExecutors(outputs); + + string json = JsonSerializer.Serialize(info, s_options); + + WorkflowInfo? back = JsonSerializer.Deserialize(json, s_options); + + back.Should().NotBeNull(); + back!.OutputExecutorIds.Should().HaveCount(2); + back.OutputExecutorIds["a"].Should().BeEmpty(); + back.OutputExecutorIds["b"].Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + + // The map shape is detectable in the serialized JSON: the property value starts with `{`, not `[`. + int idx = json.IndexOf("\"outputExecutorIds\"", System.StringComparison.Ordinal); + idx.Should().BeGreaterThan(-1); + int colon = json.IndexOf(':', idx); + int firstNonSpace = colon + 1; + while (firstNonSpace < json.Length && char.IsWhiteSpace(json[firstNonSpace])) + { + firstNonSpace++; + } + json[firstNonSpace].Should().Be('{', "OutputExecutorIds is written in the new map shape"); + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ConcurrentWorkflowBuilderTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ConcurrentWorkflowBuilderTests.cs new file mode 100644 index 0000000000..c03a98bcec --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ConcurrentWorkflowBuilderTests.cs @@ -0,0 +1,165 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.UnitTests.Futures; +using Microsoft.Extensions.AI; +using Xunit; + +#pragma warning disable SYSLIB1045 // Use GeneratedRegex +#pragma warning disable RCS1186 // Use Regex instance instead of static method + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class ConcurrentWorkflowBuilderTests +{ + [Fact] + public void Test_ConcurrentWorkflowBuilder_InvalidArguments_Throws() + { + Assert.Throws("agents", () => new ConcurrentWorkflowBuilder(null!)); + Assert.Throws("agents", () => new ConcurrentWorkflowBuilder().Build()); + + Assert.Throws("agents", () => AgentWorkflowBuilder.BuildConcurrent(null!)); + Assert.Throws("agents", () => AgentWorkflowBuilder.CreateConcurrentBuilderWith(null!)); + } + + [Fact] + public async Task Test_ConcurrentWorkflowBuilder_AgentsRunInParallelAsync() + { + StrongBox> barrier = new(); + StrongBox remaining = new(); + + var workflow = new ConcurrentWorkflowBuilder( + new OrchestrationTestHelpers.DoubleEchoAgentWithBarrier("agent1", barrier, remaining), + new OrchestrationTestHelpers.DoubleEchoAgentWithBarrier("agent2", barrier, remaining)) + .Build(); + + for (int iter = 0; iter < 3; iter++) + { + barrier.Value = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + remaining.Value = 2; + + (string updateText, List? result, _, _) = + await OrchestrationTestHelpers.RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, "abc")]); + Assert.NotEmpty(updateText); + Assert.NotNull(result); + + // TODO: https://github.com/microsoft/agent-framework/issues/784 + // These asserts are flaky until we guarantee message delivery order. + Assert.Single(Regex.Matches(updateText, "agent1")); + Assert.Single(Regex.Matches(updateText, "agent2")); + Assert.Equal(4, Regex.Matches(updateText, "abc").Count); + Assert.Equal(2, result.Count); + } + } + + [Fact] + public void Test_ConcurrentWorkflowBuilder_DefaultDesignationsMatchSpec() + { + Workflow workflow = new ConcurrentWorkflowBuilder( + new OrchestrationTestHelpers.DoubleEchoAgent("agent1"), + new OrchestrationTestHelpers.DoubleEchoAgent("agent2"), + new OrchestrationTestHelpers.DoubleEchoAgent("agent3")) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + designations.Where(kvp => kvp.Value.Count == 0) + .Should().ContainSingle("ConcurrentEndExecutor is the sole terminal output by default"); + designations.Where(kvp => kvp.Value.Contains(OutputTag.Intermediate)) + .Should().HaveCount(6, "every agent (3) and per-agent accumulator (3) is designated intermediate by default"); + } + + [Fact] + public void Test_ConcurrentWorkflowBuilder_ExplicitDesignationsReplaceDefaults() + { + OrchestrationTestHelpers.DoubleEchoAgent a1 = new("agent1"); + OrchestrationTestHelpers.DoubleEchoAgent a2 = new("agent2"); + OrchestrationTestHelpers.DoubleEchoAgent a3 = new("agent3"); + + Workflow workflow = new ConcurrentWorkflowBuilder(a1, a2, a3) + .WithOutputFrom(a1) + .WithIntermediateOutputFrom([a2]) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + + designations.Should().HaveCount(2, + "only the two explicitly-designated agents land on the inner builder; the end + accumulator defaults are suppressed"); + designations.Values.Where(tags => tags.Count == 0) + .Should().ContainSingle("agent1 is the only terminal designation"); + designations.Values.Where(tags => tags.Contains(OutputTag.Intermediate)) + .Should().ContainSingle("agent2 is the only intermediate designation"); + } + + [Fact] + public void Test_ConcurrentWorkflowBuilder_DesignationForNonParticipantThrows() + { + OrchestrationTestHelpers.DoubleEchoAgent participant = new("p1"); + OrchestrationTestHelpers.DoubleEchoAgent stranger = new("stranger"); + + ConcurrentWorkflowBuilder builder = new ConcurrentWorkflowBuilder(participant) + .WithIntermediateOutputFrom([stranger]); + + Action build = () => builder.Build(); + build.Should().Throw().WithMessage("*stranger*"); + } + + [Fact] + public void Test_ConcurrentWorkflowBuilder_WithNamePropagatesToWorkflow() + { + Workflow workflow = new ConcurrentWorkflowBuilder(new OrchestrationTestHelpers.DoubleEchoAgent("agent1")) + .WithName("named-concurrent") + .Build(); + + workflow.Name.Should().Be("named-concurrent"); + } + + [Fact] + public void Test_ConcurrentWorkflowBuilder_WithDescriptionPropagatesToWorkflow() + { + Workflow workflow = new ConcurrentWorkflowBuilder(new OrchestrationTestHelpers.DoubleEchoAgent("agent1")) + .WithDescription("describes the concurrent fan-out/fan-in") + .Build(); + + workflow.Description.Should().Be("describes the concurrent fan-out/fan-in"); + } + + [Collection(FuturesSerialCollection.Name)] + public class AsAgentForwarding + { + [Fact] + public async Task Test_ConcurrentWorkflowBuilder_AsAgent_OnlyTerminalDesignationSurfacesAsync() + { + using FuturesScope _ = new(enabled: true); + + OrchestrationTestHelpers.DoubleEchoAgent agent1 = new("agent1"); + OrchestrationTestHelpers.DoubleEchoAgent agent2 = new("agent2"); + + // Designate only agent1 as a terminal output source — agent2 and the fan-in + // aggregator default-intermediate designations are suppressed. + Workflow workflow = new ConcurrentWorkflowBuilder(agent1, agent2) + .WithOutputFrom(agent1) + .Build(); + + List updates = await workflow + .AsAIAgent("WorkflowAgent") + .RunStreamingAsync(new ChatMessage(ChatRole.User, "abc")) + .ToListAsync(); + + HashSet authoredBy = updates + .Select(u => u.AuthorName) + .Where(n => !string.IsNullOrEmpty(n)) + .Select(n => n!) + .ToHashSet(); + + authoredBy.Should().Contain("agent1", "the designated agent must surface"); + authoredBy.Should().NotContain("agent2", + "the undesignated agent must not surface when only one is designated under Futures-on"); + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/Futures.AgentResponseOutputFilteringAndTaggingTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/Futures.AgentResponseOutputFilteringAndTaggingTests.cs new file mode 100644 index 0000000000..ba364f7e64 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/Futures.AgentResponseOutputFilteringAndTaggingTests.cs @@ -0,0 +1,290 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Extensions.AI; +using Xunit; + +namespace Microsoft.Agents.AI.Workflows.UnitTests.Futures; + +/// +/// Runner-level coverage for . +/// Exercises every combination of (flag on/off) × (designation kind) × (payload shape) to pin the +/// runner's behavior in both the legacy bypass path and the unified filter-and-tag path. +/// +public static partial class FuturesTests +{ + [Collection(FuturesSerialCollection.Name)] + public class AgentResponseOutputFilteringAndTaggingTests + { + private const string SourceId = "yielder"; + + private static AgentResponse SampleResponse(string text = "hi") + => new(new ChatMessage(ChatRole.Assistant, text)); + + private static AgentResponseUpdate SampleUpdate(string text = "tick") + => new(ChatRole.Assistant, text); + + private static async Task> RunAsync(Workflow workflow, T input) where T : notnull + { + List events = []; + await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input).ConfigureAwait(false); + await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false)) + { + events.Add(evt); + } + return events; + } + + private static Workflow BuildAgentResponseWorkflow(Action? designate = null) + { + YieldAgentResponseExecutor exec = new(SourceId); + WorkflowBuilder builder = new(exec); + designate?.Invoke(builder, exec); + return builder.Build(); + } + + private static Workflow BuildAgentResponseUpdateWorkflow(Action? designate = null) + { + YieldAgentResponseUpdateExecutor exec = new(SourceId); + WorkflowBuilder builder = new(exec); + designate?.Invoke(builder, exec); + return builder.Build(); + } + + private static Workflow BuildPocoWorkflow(Action? designate = null) + { + YieldPocoExecutor exec = new(SourceId); + WorkflowBuilder builder = new(exec); + designate?.Invoke(builder, exec); + return builder.Build(); + } + + // F1 + [Fact] + public async Task Test_Runner_LegacyAgentResponseBypass_RaisesUntaggedEventAsync() + { + using FuturesScope _ = new(enabled: false); + Workflow workflow = BuildAgentResponseWorkflow(designate: null); + + List events = await RunAsync(workflow, "go"); + + AgentResponseEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.ExecutorId.Should().Be(SourceId); + emitted.Tags.Should().BeEmpty("legacy bypass attaches no tags"); + emitted.IsIntermediate().Should().BeFalse(); + } + + // F2 + [Fact] + public async Task Test_Runner_LegacyAgentResponseUpdateBypass_RaisesUntaggedEventAsync() + { + using FuturesScope _ = new(enabled: false); + Workflow workflow = BuildAgentResponseUpdateWorkflow(designate: null); + + List events = await RunAsync(workflow, "go"); + + AgentResponseUpdateEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEmpty(); + } + + // F3 + [Fact] + public async Task Test_Runner_LegacyBypassIgnoresDesignationAsync() + { + using FuturesScope _ = new(enabled: false); + Workflow workflow = BuildAgentResponseWorkflow(static (b, e) => b.WithIntermediateOutputFrom([e])); + + List events = await RunAsync(workflow, "go"); + + AgentResponseEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEmpty("legacy bypass ignores the designation entirely"); + emitted.IsIntermediate().Should().BeFalse("legacy bypass does not propagate tags"); + } + + // F4 + [Fact] + public async Task Test_Runner_LegacyPocoIsFilteredAsync() + { + using FuturesScope _ = new(enabled: false); + Workflow workflow = BuildPocoWorkflow(designate: null); + + List events = await RunAsync(workflow, "go"); + + events.OfType().Should().BeEmpty("POCO outputs always go through the filter; undesignated source is dropped"); + } + + // F5 + [Fact] + public async Task Test_Runner_UndesignatedAgentResponseIsFilteredWhenFuturesOnAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildAgentResponseWorkflow(designate: null); + + List events = await RunAsync(workflow, "go"); + + events.OfType().Should().BeEmpty( + "with the future on, AgentResponse must be designated to surface"); + } + + // F6 + [Fact] + public async Task Test_Runner_DesignatedTerminalAgentResponseHasEmptyTagsAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildAgentResponseWorkflow(static (b, e) => b.WithOutputFrom(e)); + + List events = await RunAsync(workflow, "go"); + + AgentResponseEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEmpty("terminal designation carries no tag"); + emitted.IsIntermediate().Should().BeFalse(); + } + + // F7 + [Fact] + public async Task Test_Runner_DesignatedIntermediateAgentResponseHasIntermediateTagAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildAgentResponseWorkflow(static (b, e) => b.WithIntermediateOutputFrom([e])); + + List events = await RunAsync(workflow, "go"); + + AgentResponseEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + emitted.IsIntermediate().Should().BeTrue(); + } + + // F8 + [Fact] + public async Task Test_Runner_DesignatedIntermediateAgentResponseUpdateHasIntermediateTagAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildAgentResponseUpdateWorkflow(static (b, e) => b.WithIntermediateOutputFrom([e])); + + List events = await RunAsync(workflow, "go"); + + AgentResponseUpdateEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + emitted.IsIntermediate().Should().BeTrue(); + } + + // F9 + [Fact] + public async Task Test_Runner_TagsAccumulateOutputThenIntermediateAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildAgentResponseWorkflow(static (b, e) => + { + b.WithOutputFrom(e); + b.WithIntermediateOutputFrom([e]); + }); + + List events = await RunAsync(workflow, "go"); + + AgentResponseEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }, + "terminal+intermediate union is {{ Intermediate }} (terminal contributes the entry but no tag)"); + emitted.IsIntermediate().Should().BeTrue(); + } + + // F10 + [Fact] + public async Task Test_Runner_TagsAccumulateIntermediateThenOutputAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildAgentResponseWorkflow(static (b, e) => + { + b.WithIntermediateOutputFrom([e]); + b.WithOutputFrom(e); + }); + + List events = await RunAsync(workflow, "go"); + + AgentResponseEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }, "designation order is irrelevant"); + emitted.IsIntermediate().Should().BeTrue(); + } + + // F11 + [Fact] + public async Task Test_Runner_DesignatedIntermediatePocoHasIntermediateTagAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildPocoWorkflow(static (b, e) => b.WithIntermediateOutputFrom([e])); + + List events = await RunAsync(workflow, "go"); + + WorkflowOutputEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Should().NotBeOfType(); + emitted.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + emitted.IsIntermediate().Should().BeTrue(); + } + + // F12 + [Fact] + public async Task Test_Runner_DesignatedTerminalPocoHasEmptyTagsAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildPocoWorkflow(static (b, e) => b.WithOutputFrom(e)); + + List events = await RunAsync(workflow, "go"); + + WorkflowOutputEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEmpty(); + emitted.IsIntermediate().Should().BeFalse(); + } + + // F13 + [Fact] + public async Task Test_Runner_RepeatedTerminalDesignationDedupesAsync() + { + using FuturesScope _ = new(enabled: true); + Workflow workflow = BuildAgentResponseWorkflow(static (b, e) => + { + b.WithOutputFrom(e); + b.WithOutputFrom(e); + }); + + List events = await RunAsync(workflow, "go"); + + AgentResponseEvent emitted = events.OfType().Should().ContainSingle().Subject; + emitted.Tags.Should().BeEmpty("repeated terminal designation contributes no tag"); + } + + // ---- Executors ----------------------------------------------------------- + + internal sealed class YieldAgentResponseExecutor(string id) : Executor(id) + { + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + => protocolBuilder.ConfigureRoutes(rb => rb.AddHandler(this.HandleAsync)); + + private ValueTask HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken) + => new(SampleResponse(input)); + } + + internal sealed class YieldAgentResponseUpdateExecutor(string id) : Executor(id) + { + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + => protocolBuilder.ConfigureRoutes(rb => rb.AddHandler(this.HandleAsync)); + + private ValueTask HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken) + => new(SampleUpdate(input)); + } + + public sealed record Poco(string Value); + + internal sealed class YieldPocoExecutor(string id) : Executor(id) + { + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + => protocolBuilder.ConfigureRoutes(rb => rb.AddHandler(this.HandleAsync)); + + private ValueTask HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken) + => new(new Poco(input)); + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/FuturesScope.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/FuturesScope.cs new file mode 100644 index 0000000000..9eba02cf06 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/FuturesScope.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; + +namespace Microsoft.Agents.AI.Workflows.UnitTests.Futures; + +/// +/// Sets for +/// the lifetime of the scope, restoring the prior value on dispose. Pair every use with +/// using and run inside the FuturesSerial xUnit collection to avoid leaking +/// state across parallel tests. +/// +internal sealed class FuturesScope : IDisposable +{ + private readonly bool _previous; + + public FuturesScope(bool enabled) + { + this._previous = Workflows.Futures.EnableAgentResponseOutputTaggingAndFiltering; + Workflows.Futures.EnableAgentResponseOutputTaggingAndFiltering = enabled; + } + + public void Dispose() + { + Workflows.Futures.EnableAgentResponseOutputTaggingAndFiltering = this._previous; + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/FuturesSerialCollection.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/FuturesSerialCollection.cs new file mode 100644 index 0000000000..2725fe6a50 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Futures/FuturesSerialCollection.cs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics.CodeAnalysis; + +namespace Microsoft.Agents.AI.Workflows.UnitTests.Futures; + +/// +/// xUnit collection marker for tests that mutate the process-global +/// switches. Membership in this collection serializes +/// the tests against each other so that cannot leak state +/// into a concurrently running test. +/// +[CollectionDefinition(Name, DisableParallelization = true)] +[SuppressMessage("Naming", "CA1711:Identifiers should not have incorrect suffix", + Justification = "xUnit's [CollectionDefinition] pattern names the marker type after the collection's purpose; the 'Collection' suffix is idiomatic.")] +public sealed class FuturesSerialCollection +{ + public const string Name = "FuturesSerial"; +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/GroupChatWorkflowBuilderTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/GroupChatWorkflowBuilderTests.cs new file mode 100644 index 0000000000..a3588b2bc3 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/GroupChatWorkflowBuilderTests.cs @@ -0,0 +1,197 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class GroupChatWorkflowBuilderTests +{ + [Fact] + public void BuildGroupChat_InvalidArguments_Throws() + { + Assert.Throws("managerFactory", () => AgentWorkflowBuilder.CreateGroupChatBuilderWith(null!)); + + var groupChat = AgentWorkflowBuilder.CreateGroupChatBuilderWith(_ => new RoundRobinGroupChatManager([new OrchestrationTestHelpers.DoubleEchoAgent("a1")])); + Assert.NotNull(groupChat); + Assert.Throws("agents", () => groupChat.AddParticipants(null!)); + Assert.Throws("agents", () => groupChat.AddParticipants([null!])); + Assert.Throws("agents", () => groupChat.AddParticipants(new OrchestrationTestHelpers.DoubleEchoAgent("a1"), null!)); + + Assert.Throws("agents", () => new RoundRobinGroupChatManager(null!)); + } + + [Fact] + public void GroupChatManager_MaximumIterationCount_Invalid_Throws() + { + var manager = new RoundRobinGroupChatManager([new OrchestrationTestHelpers.DoubleEchoAgent("a1")]); + + const int DefaultMaxIterations = 40; + Assert.Equal(DefaultMaxIterations, manager.MaximumIterationCount); + Assert.Throws("value", void () => manager.MaximumIterationCount = 0); + Assert.Throws("value", void () => manager.MaximumIterationCount = -1); + Assert.Equal(DefaultMaxIterations, manager.MaximumIterationCount); + + manager.MaximumIterationCount = 30; + Assert.Equal(30, manager.MaximumIterationCount); + + manager.MaximumIterationCount = 1; + Assert.Equal(1, manager.MaximumIterationCount); + + manager.MaximumIterationCount = int.MaxValue; + Assert.Equal(int.MaxValue, manager.MaximumIterationCount); + } + + [Fact] + public void BuildGroupChat_WithNameAndDescription_SetsWorkflowNameAndDescription() + { + const string WorkflowName = "Test Group Chat"; + const string WorkflowDescription = "A test group chat workflow"; + + var workflow = AgentWorkflowBuilder + .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 2 }) + .AddParticipants(new OrchestrationTestHelpers.DoubleEchoAgent("agent1"), new OrchestrationTestHelpers.DoubleEchoAgent("agent2")) + .WithName(WorkflowName) + .WithDescription(WorkflowDescription) + .Build(); + + Assert.Equal(WorkflowName, workflow.Name); + Assert.Equal(WorkflowDescription, workflow.Description); + } + + [Fact] + public void BuildGroupChat_WithNameOnly_SetsWorkflowName() + { + const string WorkflowName = "Named Group Chat"; + + var workflow = AgentWorkflowBuilder + .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 2 }) + .AddParticipants(new OrchestrationTestHelpers.DoubleEchoAgent("agent1")) + .WithName(WorkflowName) + .Build(); + + Assert.Equal(WorkflowName, workflow.Name); + Assert.Null(workflow.Description); + } + + [Fact] + public void BuildGroupChat_WithoutNameOrDescription_DefaultsToNull() + { + var workflow = AgentWorkflowBuilder + .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 2 }) + .AddParticipants(new OrchestrationTestHelpers.DoubleEchoAgent("agent1")) + .Build(); + + Assert.Null(workflow.Name); + Assert.Null(workflow.Description); + } + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + [InlineData(4)] + [InlineData(5)] + public async Task BuildGroupChat_AgentsRunInOrderAsync(int maxIterations) + { + const int NumAgents = 3; + var workflow = AgentWorkflowBuilder.CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = maxIterations }) + .AddParticipants(new OrchestrationTestHelpers.DoubleEchoAgent("agent1"), new OrchestrationTestHelpers.DoubleEchoAgent("agent2")) + .AddParticipants(new OrchestrationTestHelpers.DoubleEchoAgent("agent3")) + .Build(); + + for (int iter = 0; iter < 3; iter++) + { + const string UserInput = "abc"; + (string updateText, List? result, _, _) = await OrchestrationTestHelpers.RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, UserInput)]); + + Assert.NotNull(result); + Assert.Equal(maxIterations + 1, result.Count); + + Assert.Equal(ChatRole.User, result[0].Role); + Assert.Null(result[0].AuthorName); + Assert.Equal(UserInput, result[0].Text); + + string[] texts = new string[maxIterations + 1]; + texts[0] = UserInput; + string expectedTotal = string.Empty; + for (int i = 1; i < maxIterations + 1; i++) + { + string id = $"agent{((i - 1) % NumAgents) + 1}"; + texts[i] = $"{id}{Double(string.Concat(texts.Take(i)))}"; + Assert.Equal(ChatRole.Assistant, result[i].Role); + Assert.Equal(id, result[i].AuthorName); + Assert.Equal(texts[i], result[i].Text); + expectedTotal += texts[i]; + } + + Assert.Equal(expectedTotal, updateText); + Assert.Equal(UserInput + expectedTotal, string.Concat(result)); + + static string Double(string s) => s + s; + } + } + + [Fact] + public void Test_GroupChatWorkflowBuilder_DefaultDesignationsMatchSpec() + { + OrchestrationTestHelpers.DoubleEchoAgent a1 = new("agent1"); + OrchestrationTestHelpers.DoubleEchoAgent a2 = new("agent2"); + OrchestrationTestHelpers.DoubleEchoAgent a3 = new("agent3"); + + Workflow workflow = AgentWorkflowBuilder + .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 1 }) + .AddParticipants(a1, a2, a3) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + + designations.Where(kvp => kvp.Value.Count == 0) + .Should().ContainSingle("group-chat host is the sole terminal output executor by default"); + designations.Where(kvp => kvp.Value.Contains(OutputTag.Intermediate)) + .Should().HaveCount(3, "every participant is designated intermediate by default"); + } + + [Fact] + public void Test_GroupChatWorkflowBuilder_ExplicitDesignationsReplaceDefaults() + { + OrchestrationTestHelpers.DoubleEchoAgent a1 = new("agent1"); + OrchestrationTestHelpers.DoubleEchoAgent a2 = new("agent2"); + OrchestrationTestHelpers.DoubleEchoAgent a3 = new("agent3"); + + Workflow workflow = AgentWorkflowBuilder + .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 1 }) + .AddParticipants(a1, a2, a3) + .WithOutputFrom(a1) + .WithIntermediateOutputFrom([a2]) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + + designations.Should().HaveCount(2, + "only the two explicitly-designated agents land on the inner builder; the host default is suppressed"); + designations.Values.Where(tags => tags.Count == 0) + .Should().ContainSingle("agent1 is the only terminal designation"); + designations.Values.Where(tags => tags.Contains(OutputTag.Intermediate)) + .Should().ContainSingle("agent2 is the only intermediate designation"); + } + + [Fact] + public void Test_GroupChatWorkflowBuilder_DesignationForNonParticipantThrows() + { + OrchestrationTestHelpers.DoubleEchoAgent participant = new("p1"); + OrchestrationTestHelpers.DoubleEchoAgent stranger = new("stranger"); + + GroupChatWorkflowBuilder builder = AgentWorkflowBuilder + .CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 1 }) + .AddParticipants(participant) + .WithOutputFrom(stranger); + + Action build = () => builder.Build(); + build.Should().Throw().WithMessage("*stranger*"); + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/HandoffWorkflowBuilderTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/HandoffWorkflowBuilderTests.cs new file mode 100644 index 0000000000..c3b9eb6a90 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/HandoffWorkflowBuilderTests.cs @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using FluentAssertions; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +/// +/// Tests focused on 's output-designation surface — +/// the Python-aligned defaults applied at +/// when the user has not made explicit designations, and the memoized +/// WithOutputFrom / WithIntermediateOutputFrom replay otherwise. +/// +#pragma warning disable MAAIW001 // Experimental: HandoffWorkflowBuilder +public class HandoffWorkflowBuilderTests +{ + [Fact] + public void Test_HandoffWorkflowBuilder_DefaultDesignationsMatchSpec() + { + OrchestrationTestHelpers.DoubleEchoAgent coordinator = new("coordinator"); + OrchestrationTestHelpers.DoubleEchoAgent specialist = new("specialist"); + + Workflow workflow = AgentWorkflowBuilder + .CreateHandoffBuilderWith(coordinator) + .WithHandoff(coordinator, specialist) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + + designations.Where(kvp => kvp.Value.Count == 0) + .Should().ContainSingle("the handoff end executor is the sole terminal output by default"); + designations.Where(kvp => kvp.Value.Contains(OutputTag.Intermediate)) + .Should().HaveCount(2, "both the coordinator and the specialist are designated intermediate by default"); + } + + [Fact] + public void Test_HandoffWorkflowBuilder_ExplicitDesignationsReplaceDefaults() + { + OrchestrationTestHelpers.DoubleEchoAgent coordinator = new("coordinator"); + OrchestrationTestHelpers.DoubleEchoAgent specialist = new("specialist"); + + Workflow workflow = AgentWorkflowBuilder + .CreateHandoffBuilderWith(coordinator) + .WithHandoff(coordinator, specialist) + .WithOutputFrom(coordinator) + .WithIntermediateOutputFrom([specialist]) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + + designations.Should().HaveCount(2, + "only the user-specified designations land on the inner builder; the handoff-end default is suppressed"); + designations.Values.Where(tags => tags.Count == 0) + .Should().ContainSingle("coordinator is the only terminal designation"); + designations.Values.Where(tags => tags.Contains(OutputTag.Intermediate)) + .Should().ContainSingle("specialist is the only intermediate designation"); + } + + [Fact] + public void Test_HandoffWorkflowBuilder_DesignationForNonParticipantThrows() + { + OrchestrationTestHelpers.DoubleEchoAgent coordinator = new("coordinator"); + OrchestrationTestHelpers.DoubleEchoAgent specialist = new("specialist"); + OrchestrationTestHelpers.DoubleEchoAgent stranger = new("stranger"); + + HandoffWorkflowBuilder builder = AgentWorkflowBuilder + .CreateHandoffBuilderWith(coordinator) + .WithHandoff(coordinator, specialist) + .WithIntermediateOutputFrom([stranger]); + + Action build = () => builder.Build(); + build.Should().Throw().WithMessage("*stranger*"); + } +} +#pragma warning restore MAAIW001 diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterTests.cs similarity index 73% rename from dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs rename to dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterTests.cs index c7c231c63e..2b4d88dc9e 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System; using System.Threading; @@ -122,50 +122,3 @@ public async Task InputWaiter_WaitForInputAsync_CompletesWhenTimeoutExpiresAsync await waitTask; } } - -public class OutputFilterTests -{ - private static OutputFilter CreateFilterWithOutputFrom(string outputExecutorId) - { - NoOpExecutor start = new("start"); - NoOpExecutor end = new("end"); - - Workflow workflow = new WorkflowBuilder("start") - .AddEdge(start, end) - .WithOutputFrom(outputExecutorId == "end" ? end : start) - .Build(); - - return new OutputFilter(workflow); - } - - [Fact] - public void OutputFilter_CanOutput_ReturnsTrueForRegisteredExecutor() - { - OutputFilter filter = CreateFilterWithOutputFrom("end"); - - filter.CanOutput("end", "some output").Should().BeTrue("the executor was registered via WithOutputFrom"); - } - - [Fact] - public void OutputFilter_CanOutput_ReturnsFalseForUnregisteredExecutor() - { - OutputFilter filter = CreateFilterWithOutputFrom("end"); - - filter.CanOutput("start", "some output").Should().BeFalse("start was not registered as an output executor"); - } - - [Fact] - public void OutputFilter_CanOutput_ReturnsFalseForNonExistentExecutor() - { - OutputFilter filter = CreateFilterWithOutputFrom("end"); - - filter.CanOutput("nonexistent", "some output").Should().BeFalse("an executor not in the workflow should not be an output executor"); - } - - private sealed class NoOpExecutor(string id) : Executor(id) - { - protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) - => protocolBuilder.ConfigureRoutes(routeBuilder => - routeBuilder.AddHandler((msg, ctx) => ctx.SendMessageAsync(msg))); - } -} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs index 8fed6fca5b..983937c984 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/JsonSerializationTests.cs @@ -187,8 +187,12 @@ private static void ValidateWorkflowInfo(WorkflowInfo actual, WorkflowInfo proto actual.InputType.Should().Match(prototype.InputType.CreateValidator()); actual.StartExecutorId.Should().Be(prototype.StartExecutorId); - actual.OutputExecutorIds.Should().HaveCount(prototype.OutputExecutorIds.Count) - .And.AllSatisfy(id => prototype.OutputExecutorIds.Contains(id)); + actual.OutputExecutorIds.Should().HaveCount(prototype.OutputExecutorIds.Count); + foreach (KeyValuePair> kvp in prototype.OutputExecutorIds) + { + actual.OutputExecutorIds.Should().ContainKey(kvp.Key); + actual.OutputExecutorIds[kvp.Key].Should().BeEquivalentTo(kvp.Value); + } void ValidateExecutorDictionary(Dictionary expected, Dictionary> expectedEdges, diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticWorkflowBuilderTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticWorkflowBuilderTests.cs new file mode 100644 index 0000000000..1b83708aef --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MagenticWorkflowBuilderTests.cs @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using FluentAssertions; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +/// +/// Tests focused on 's output-designation surface — +/// the Python-aligned defaults applied at when +/// the user has not made explicit designations, and the memoized +/// WithOutputFrom / WithIntermediateOutputFrom replay otherwise. +/// +#pragma warning disable MAAIW001 // Experimental: MagenticWorkflowBuilder +public class MagenticWorkflowBuilderTests +{ + [Fact] + public void Test_MagenticWorkflowBuilder_DefaultDesignationsMatchSpec() + { + TestReplayAgent manager = new(name: "Manager"); + TestEchoAgent member1 = new(name: "Worker1"); + TestEchoAgent member2 = new(name: "Worker2"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(member1, member2) + .RequirePlanSignoff(false) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + + designations.Where(kvp => kvp.Value.Count == 0) + .Should().ContainSingle("the Magentic orchestrator is the sole terminal output by default"); + designations.Where(kvp => kvp.Value.Contains(OutputTag.Intermediate)) + .Should().HaveCount(2, "every team member is designated intermediate by default"); + } + + [Fact] + public void Test_MagenticWorkflowBuilder_ExplicitDesignationsReplaceDefaults() + { + TestReplayAgent manager = new(name: "Manager"); + TestEchoAgent member1 = new(name: "Worker1"); + TestEchoAgent member2 = new(name: "Worker2"); + + Workflow workflow = new MagenticWorkflowBuilder(manager) + .AddParticipants(member1, member2) + .RequirePlanSignoff(false) + .WithOutputFrom(member1) + .WithIntermediateOutputFrom([member2]) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + + designations.Should().HaveCount(2, + "only the user-specified designations land on the inner builder; the orchestrator default is suppressed"); + designations.Values.Where(tags => tags.Count == 0) + .Should().ContainSingle("member1 is the only terminal designation"); + designations.Values.Where(tags => tags.Contains(OutputTag.Intermediate)) + .Should().ContainSingle("member2 is the only intermediate designation"); + } + + [Fact] + public void Test_MagenticWorkflowBuilder_DesignationForNonParticipantThrows() + { + TestReplayAgent manager = new(name: "Manager"); + TestEchoAgent member = new(name: "Worker"); + TestEchoAgent stranger = new(name: "Stranger"); + + MagenticWorkflowBuilder builder = new MagenticWorkflowBuilder(manager) + .AddParticipants(member) + .RequirePlanSignoff(false) + .WithIntermediateOutputFrom([stranger]); + + Action build = () => builder.Build(); + build.Should().Throw().WithMessage("*Stranger*"); + } +} +#pragma warning restore MAAIW001 diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OrchestrationTestHelpers.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OrchestrationTestHelpers.cs new file mode 100644 index 0000000000..1f6b56655d --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OrchestrationTestHelpers.cs @@ -0,0 +1,131 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.InProc; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +/// +/// Container for shared test helpers used by every orchestration-builder test class — +/// the DoubleEchoAgent family and the RunWorkflow* methods. The actual +/// test methods live in per-builder files (SequentialWorkflowBuilderTests, +/// ConcurrentWorkflowBuilderTests, GroupChatWorkflowBuilderTests, etc.). +/// +public static class OrchestrationTestHelpers +{ + internal class DoubleEchoAgent(string name) : AIAgent + { + public override string Name => name; + + protected override ValueTask CreateSessionCoreAsync(CancellationToken cancellationToken = default) + => new(new DoubleEchoAgentSession()); + + protected override ValueTask DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) + => new(new DoubleEchoAgentSession()); + + protected override ValueTask SerializeSessionCoreAsync(AgentSession session, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) + => default; + + protected override Task RunCoreAsync( + IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + protected override async IAsyncEnumerable RunCoreStreamingAsync( + IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await Task.Yield(); + + var contents = messages.SelectMany(m => m.Contents).ToList(); + string id = Guid.NewGuid().ToString("N"); + yield return new AgentResponseUpdate(ChatRole.Assistant, this.Name) { AuthorName = this.Name, MessageId = id }; + yield return new AgentResponseUpdate(ChatRole.Assistant, contents) { AuthorName = this.Name, MessageId = id }; + yield return new AgentResponseUpdate(ChatRole.Assistant, contents) { AuthorName = this.Name, MessageId = id }; + } + } + + internal sealed class DoubleEchoAgentSession() : AgentSession(); + + internal sealed class DoubleEchoAgentWithBarrier(string name, StrongBox> barrier, StrongBox remaining) : DoubleEchoAgent(name) + { + protected override async IAsyncEnumerable RunCoreStreamingAsync( + IEnumerable messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + if (Interlocked.Decrement(ref remaining.Value) == 0) + { + barrier.Value!.SetResult(true); + } + + await barrier.Value!.Task.ConfigureAwait(false); + + await foreach (var update in base.RunCoreStreamingAsync(messages, session, options, cancellationToken)) + { + await Task.Yield(); + yield return update; + } + } + } + + internal sealed record WorkflowRunResult(string UpdateText, List? Result, CheckpointInfo? LastCheckpoint, List PendingRequests); + + internal static async Task RunWorkflowCheckpointedAsync( + Workflow workflow, List input, InProcessExecutionEnvironment environment, CheckpointInfo? fromCheckpoint = null) + { + await using StreamingRun run = + fromCheckpoint != null ? await environment.ResumeStreamingAsync(workflow, fromCheckpoint) + : await environment.OpenStreamingAsync(workflow); + + await run.TrySendMessageAsync(input); + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + + return await ProcessWorkflowRunAsync(run); + } + + internal static async Task ProcessWorkflowRunAsync(StreamingRun run) + { + StringBuilder sb = new(); + WorkflowOutputEvent? output = null; + CheckpointInfo? lastCheckpoint = null; + + List pendingRequests = []; + + await foreach (WorkflowEvent evt in run.WatchStreamAsync(blockOnPendingRequest: false).ConfigureAwait(false)) + { + switch (evt) + { + case AgentResponseUpdateEvent responseUpdate: + sb.Append(responseUpdate.Data); + break; + + case RequestInfoEvent requestInfo: + pendingRequests.Add(requestInfo); + break; + + case WorkflowOutputEvent e: + output = e; + break; + + case WorkflowErrorEvent errorEvent: + Assert.Fail($"Workflow execution failed with error: {errorEvent.Exception}"); + break; + + case SuperStepCompletedEvent stepCompleted: + lastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint; + break; + } + } + + return new(sb.ToString(), output?.As>(), lastCheckpoint, pendingRequests); + } + + internal static Task RunWorkflowAsync( + Workflow workflow, List input, ExecutionEnvironment executionEnvironment = ExecutionEnvironment.InProcess_Lockstep) + => RunWorkflowCheckpointedAsync(workflow, input, executionEnvironment.ToWorkflowExecutionEnvironment()); +} \ No newline at end of file diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OutputFilterTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OutputFilterTests.cs new file mode 100644 index 0000000000..55bfacf9a1 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OutputFilterTests.cs @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.Execution; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class OutputFilterTests +{ + private static OutputFilter CreateFilterWithOutputFrom(string outputExecutorId) + { + NoOpExecutor start = new("start"); + NoOpExecutor end = new("end"); + + Workflow workflow = new WorkflowBuilder("start") + .AddEdge(start, end) + .WithOutputFrom(outputExecutorId == "end" ? end : start) + .Build(); + + return new OutputFilter(workflow); + } + + [Fact] + public void OutputFilter_CanOutput_ReturnsTrueForRegisteredExecutor() + { + OutputFilter filter = CreateFilterWithOutputFrom("end"); + + filter.CanOutput("end", "some output").Should().BeTrue("the executor was registered via WithOutputFrom"); + } + + [Fact] + public void OutputFilter_CanOutput_ReturnsFalseForUnregisteredExecutor() + { + OutputFilter filter = CreateFilterWithOutputFrom("end"); + + filter.CanOutput("start", "some output").Should().BeFalse("start was not registered as an output executor"); + } + + [Fact] + public void OutputFilter_CanOutput_ReturnsFalseForNonExistentExecutor() + { + OutputFilter filter = CreateFilterWithOutputFrom("end"); + + filter.CanOutput("nonexistent", "some output").Should().BeFalse("an executor not in the workflow should not be an output executor"); + } + + [Fact] + public void Test_OutputFilter_ReturnsEmptyTagSetWhenRegisteredViaWithOutputFrom() + { + OutputFilter filter = CreateFilterWithOutputFrom("end"); + + filter.TryGetTags("end", out HashSet? tags).Should().BeTrue(); + tags.Should().NotBeNull().And.BeEmpty("terminal designation carries no tag"); + } + + [Fact] + public void Test_OutputFilter_ReturnsIntermediateTagWhenRegisteredViaWithIntermediateOutputFrom() + { + NoOpExecutor start = new("start"); + NoOpExecutor end = new("end"); + + Workflow workflow = new WorkflowBuilder("start") + .AddEdge(start, end) + .WithIntermediateOutputFrom([end]) + .Build(); + + OutputFilter filter = new(workflow); + + filter.TryGetTags("end", out HashSet? tags).Should().BeTrue(); + tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + } + + [Fact] + public void Test_OutputFilter_ReturnsIntermediateTagForAccumulatedDesignation() + { + NoOpExecutor start = new("start"); + NoOpExecutor end = new("end"); + + Workflow workflow = new WorkflowBuilder("start") + .AddEdge(start, end) + .WithOutputFrom(end) + .WithIntermediateOutputFrom([end]) + .Build(); + + OutputFilter filter = new(workflow); + + filter.TryGetTags("end", out HashSet? tags).Should().BeTrue(); + tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }, + "terminal designation contributes no tag; the union is the intermediate set"); + } + + [Fact] + public void Test_OutputFilter_TryGetTagsReturnsFalseForUnregisteredExecutor() + { + OutputFilter filter = CreateFilterWithOutputFrom("end"); + + filter.TryGetTags("start", out HashSet? tags).Should().BeFalse(); + tags.Should().BeNull(); + } + + private sealed class NoOpExecutor(string id) : Executor(id) + { + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + => protocolBuilder.ConfigureRoutes(routeBuilder => + routeBuilder.AddHandler((msg, ctx) => ctx.SendMessageAsync(msg))); + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OutputTagTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OutputTagTests.cs new file mode 100644 index 0000000000..da100baadf --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/OutputTagTests.cs @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Reflection; +using System.Text.Json; +using FluentAssertions; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class OutputTagTests +{ + [Fact] + public void Test_OutputTag_KnownValues() + { + OutputTag.Intermediate.Value.Should().Be("intermediate"); + } + + [Fact] + public void Test_OutputTag_EqualityIsOrdinalOnValue() + { + OutputTag.Intermediate.Should().Be(OutputTag.Intermediate); + (OutputTag.Intermediate == OutputTag.Intermediate).Should().BeTrue(); + + // Same Value via independent construction (via JSON round-trip below) is equal. + OutputTag rebuilt = JsonSerializer.Deserialize("\"intermediate\"", WorkflowsJsonUtilities.DefaultOptions); + rebuilt.Should().Be(OutputTag.Intermediate); + } + + [Fact] + public void Test_OutputTag_DefaultStructValueIsDistinct() + { + OutputTag def = default; + def.Value.Should().BeNull(); + def.Should().NotBe(OutputTag.Intermediate); + def.GetHashCode().Should().Be(0); + + HashSet set = [OutputTag.Intermediate]; + set.Contains(def).Should().BeFalse("default(OutputTag) must not collide with the well-known singleton in a HashSet"); + } + + [Fact] + public void Test_OutputTag_GetHashCodeMatchesEquals() + { + OutputTag a = OutputTag.Intermediate; + OutputTag b = JsonSerializer.Deserialize("\"intermediate\"", WorkflowsJsonUtilities.DefaultOptions); + + a.Equals(b).Should().BeTrue(); + a.GetHashCode().Should().Be(b.GetHashCode()); + } + + [Fact] + public void Test_OutputTag_JsonConverter_RoundtripsValueAsString() + { + string intermediateJson = JsonSerializer.Serialize(OutputTag.Intermediate, WorkflowsJsonUtilities.DefaultOptions); + intermediateJson.Should().Be("\"intermediate\""); + + OutputTag back = JsonSerializer.Deserialize("\"intermediate\"", WorkflowsJsonUtilities.DefaultOptions); + back.Should().Be(OutputTag.Intermediate); + + OutputTag fromUnknown = JsonSerializer.Deserialize("\"custom\"", WorkflowsJsonUtilities.DefaultOptions); + fromUnknown.Value.Should().Be("custom"); + } + + [Fact] + public void Test_OutputTag_ConstructorIsInternal() + { + ConstructorInfo? ctor = typeof(OutputTag).GetConstructor( + BindingFlags.Instance | BindingFlags.NonPublic, + binder: null, + types: [typeof(string)], + modifiers: null); + + ctor.Should().NotBeNull("OutputTag(string) must exist as an internal constructor"); + ctor!.IsAssembly.Should().BeTrue("OutputTag(string) must be `internal` so external assemblies cannot synthesize tags"); + ctor.IsPublic.Should().BeFalse(); + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SequentialWorkflowBuilderTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SequentialWorkflowBuilderTests.cs new file mode 100644 index 0000000000..dd3ce4dcfa --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SequentialWorkflowBuilderTests.cs @@ -0,0 +1,184 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.UnitTests.Futures; +using Microsoft.Extensions.AI; +using Xunit; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class SequentialWorkflowBuilderTests +{ + [Fact] + public void Test_SequentialWorkflowBuilder_InvalidArguments_Throws() + { + Assert.Throws("agents", () => new SequentialWorkflowBuilder(null!)); + Assert.Throws("agents", () => new SequentialWorkflowBuilder().Build()); + + Assert.Throws("agents", () => AgentWorkflowBuilder.BuildSequential(workflowName: null!, null!)); + Assert.Throws("agents", () => AgentWorkflowBuilder.BuildSequential()); + Assert.Throws("agents", () => AgentWorkflowBuilder.CreateSequentialBuilderWith(null!)); + } + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + [InlineData(4)] + [InlineData(5)] + public async Task Test_SequentialWorkflowBuilder_AgentsRunInOrderAsync(int numAgents) + { + var workflow = new SequentialWorkflowBuilder( + from i in Enumerable.Range(1, numAgents) + select new OrchestrationTestHelpers.DoubleEchoAgent($"agent{i}")) + .Build(); + + for (int iter = 0; iter < 3; iter++) + { + const string UserInput = "abc"; + (string updateText, List? result, _, _) = + await OrchestrationTestHelpers.RunWorkflowAsync(workflow, [new ChatMessage(ChatRole.User, UserInput)]); + + Assert.NotNull(result); + Assert.Equal(numAgents + 1, result.Count); + + Assert.Equal(ChatRole.User, result[0].Role); + Assert.Null(result[0].AuthorName); + Assert.Equal(UserInput, result[0].Text); + + string[] texts = new string[numAgents + 1]; + texts[0] = UserInput; + string expectedTotal = string.Empty; + for (int i = 1; i < numAgents + 1; i++) + { + string id = $"agent{((i - 1) % numAgents) + 1}"; + texts[i] = $"{id}{Double(string.Concat(texts.Take(i)))}"; + Assert.Equal(ChatRole.Assistant, result[i].Role); + Assert.Equal(id, result[i].AuthorName); + Assert.Equal(texts[i], result[i].Text); + expectedTotal += texts[i]; + } + + Assert.Equal(expectedTotal, updateText); + Assert.Equal(UserInput + expectedTotal, string.Concat(result)); + + static string Double(string s) => s + s; + } + } + + [Fact] + public void Test_SequentialWorkflowBuilder_DefaultDesignationsMatchSpec() + { + Workflow workflow = new SequentialWorkflowBuilder( + new OrchestrationTestHelpers.DoubleEchoAgent("agent1"), + new OrchestrationTestHelpers.DoubleEchoAgent("agent2"), + new OrchestrationTestHelpers.DoubleEchoAgent("agent3")) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + designations.Where(kvp => kvp.Value.Count == 0) + .Should().ContainSingle("OutputMessagesExecutor is the sole terminal output by default"); + designations.Where(kvp => kvp.Value.Contains(OutputTag.Intermediate)) + .Should().HaveCount(3, "every pipeline agent is designated intermediate by default"); + } + + [Fact] + public void Test_SequentialWorkflowBuilder_ExplicitDesignationsReplaceDefaults() + { + OrchestrationTestHelpers.DoubleEchoAgent a1 = new("agent1"); + OrchestrationTestHelpers.DoubleEchoAgent a2 = new("agent2"); + OrchestrationTestHelpers.DoubleEchoAgent a3 = new("agent3"); + + Workflow workflow = new SequentialWorkflowBuilder(a1, a2, a3) + .WithOutputFrom(a1) + .WithIntermediateOutputFrom([a2]) + .Build(); + + Dictionary> designations = workflow.OutputExecutors; + + designations.Should().HaveCount(2, + "only the two explicitly-designated agents land on the inner builder; the end default is suppressed"); + designations.Values.Where(tags => tags.Count == 0) + .Should().ContainSingle("agent1 is the only terminal designation"); + designations.Values.Where(tags => tags.Contains(OutputTag.Intermediate)) + .Should().ContainSingle("agent2 is the only intermediate designation"); + } + + [Fact] + public void Test_SequentialWorkflowBuilder_DesignationForNonParticipantThrows() + { + OrchestrationTestHelpers.DoubleEchoAgent participant = new("p1"); + OrchestrationTestHelpers.DoubleEchoAgent stranger = new("stranger"); + + SequentialWorkflowBuilder builder = new SequentialWorkflowBuilder(participant) + .WithIntermediateOutputFrom([stranger]); + + Action build = () => builder.Build(); + build.Should().Throw().WithMessage("*stranger*"); + } + + [Fact] + public void Test_SequentialWorkflowBuilder_WithNamePropagatesToWorkflow() + { + Workflow workflow = new SequentialWorkflowBuilder(new OrchestrationTestHelpers.DoubleEchoAgent("agent1")) + .WithName("named-sequential") + .Build(); + + workflow.Name.Should().Be("named-sequential"); + } + + [Fact] + public void Test_SequentialWorkflowBuilder_WithDescriptionPropagatesToWorkflow() + { + Workflow workflow = new SequentialWorkflowBuilder(new OrchestrationTestHelpers.DoubleEchoAgent("agent1")) + .WithDescription("describes the sequential pipeline") + .Build(); + + workflow.Description.Should().Be("describes the sequential pipeline"); + } + + [Collection(FuturesSerialCollection.Name)] + public class AsAgentForwarding + { + [Fact] + public async Task Test_SequentialWorkflowBuilder_AsAgent_OnlyTerminalDesignationSurfacesAsync() + { + using FuturesScope _ = new(enabled: true); + + OrchestrationTestHelpers.DoubleEchoAgent agent1 = new("agent1"); + OrchestrationTestHelpers.DoubleEchoAgent agent2 = new("agent2"); + OrchestrationTestHelpers.DoubleEchoAgent agent3 = new("agent3"); + + // Explicitly designate ONLY the last agent — defaults (which would tag every agent + // intermediate) are suppressed, so under Futures-on, agent1/agent2 produce no + // AgentResponse(Update)Events and nothing of theirs reaches the AsAgent stream. + Workflow workflow = new SequentialWorkflowBuilder(agent1, agent2, agent3) + .WithOutputFrom(agent3) + .Build(); + + List updates = await workflow + .AsAIAgent("WorkflowAgent") + .RunStreamingAsync(new ChatMessage(ChatRole.User, "abc")) + .ToListAsync(); + + // Filter by AuthorName — distinguishes which agent originated each update + // (text-content checks are unreliable because agent3 echoes earlier agents' markers + // as part of the cumulative pipeline payload). + HashSet authoredBy = updates + .Select(u => u.AuthorName) + .Where(n => !string.IsNullOrEmpty(n)) + .Select(n => n!) + .ToHashSet(); + + authoredBy.Should().Contain("agent3", "the terminal agent must surface"); + authoredBy.Should().NotContain("agent1", + "the intermediate agent must not surface when only the terminal is designated"); + authoredBy.Should().NotContain("agent2", + "the intermediate agent must not surface when only the terminal is designated"); + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderTests.cs similarity index 82% rename from dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderSmokeTests.cs rename to dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderTests.cs index c2b855b8bf..bc56a5ecef 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowBuilderTests.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using System; using System.Collections.Generic; @@ -6,7 +6,7 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests; -public partial class WorkflowBuilderSmokeTests +public partial class WorkflowBuilderTests { private sealed class NoOpExecutor(string id) : Executor(id) { @@ -455,4 +455,112 @@ public void SwitchBuilder_InvalidArguments_Throw() /// private static Edge GetSingleEdge(Workflow workflow, string sourceId) => workflow.Edges[sourceId].Should().ContainSingle().Subject; + + // --- Tag-aware WithOutputFrom / WithIntermediateOutputFrom tests --- + + [Fact] + public void Test_WithOutputFrom_RegistersWithEmptyTagSet() + { + NoOpExecutor a = new("a"); + NoOpExecutor b = new("b"); + Workflow workflow = new WorkflowBuilder("a") + .AddEdge(a, b) + .WithOutputFrom(b) + .Build(); + + workflow.OutputExecutors.Should().ContainKey("b"); + workflow.OutputExecutors["b"].Should().BeEmpty("regular outputs are untagged"); + } + + [Fact] + public void Test_WithIntermediateOutputFrom_AddsIntermediateTag() + { + NoOpExecutor a = new("a"); + NoOpExecutor b = new("b"); + Workflow workflow = new WorkflowBuilder("a") + .AddEdge(a, b) + .WithIntermediateOutputFrom([b]) + .Build(); + + workflow.OutputExecutors["b"].Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + } + + [Fact] + public void Test_WithOutputFrom_MultipleExecutorsAllUntagged() + { + NoOpExecutor a = new("a"); + NoOpExecutor b = new("b"); + NoOpExecutor c = new("c"); + + Workflow workflow = new WorkflowBuilder("a") + .AddEdge(a, b).AddEdge(a, c) + .WithOutputFrom(b, c) + .Build(); + + workflow.OutputExecutors.Should().HaveCount(2); + workflow.OutputExecutors["b"].Should().BeEmpty(); + workflow.OutputExecutors["c"].Should().BeEmpty(); + } + + [Fact] + public void Test_WithOutputFrom_ThenIntermediate_AccumulatesTags() + { + NoOpExecutor a = new("a"); + NoOpExecutor b = new("b"); + Workflow workflow = new WorkflowBuilder("a") + .AddEdge(a, b) + .WithOutputFrom(b) + .WithIntermediateOutputFrom([b]) + .Build(); + + // WithOutputFrom doesn't add a tag; WithIntermediateOutputFrom adds Intermediate. + workflow.OutputExecutors["b"].Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + } + + [Fact] + public void Test_WithIntermediateOutputFrom_RepeatedDedupes() + { + NoOpExecutor a = new("a"); + NoOpExecutor b = new("b"); + Workflow workflow = new WorkflowBuilder("a") + .AddEdge(a, b) + .WithIntermediateOutputFrom([b]) + .WithIntermediateOutputFrom([b]) + .Build(); + + workflow.OutputExecutors["b"].Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + } + + [Fact] + public void Test_WithIntermediateOutputFrom_OnlyRegistersWithoutPriorWithOutputFrom() + { + // WithIntermediateOutputFrom on its own is sufficient to register the executor as an + // output source — the call ensures the id is in the dict with the Intermediate tag. + NoOpExecutor a = new("a"); + NoOpExecutor b = new("b"); + Workflow workflow = new WorkflowBuilder("a") + .AddEdge(a, b) + .WithIntermediateOutputFrom([b]) + .Build(); + + workflow.OutputExecutors.Should().ContainKey("b"); + workflow.OutputExecutors["b"].Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + } + + [Fact] + public void Test_WithOutputFrom_TracksExecutorBinding() + { + // A placeholder binding referenced via WithOutputFrom must end up bound by the time we Build. + NoOpExecutor a = new("a"); + NoOpExecutor future = new("future"); + + Workflow workflow = new WorkflowBuilder("a") + .AddEdge(a, "future") + .WithIntermediateOutputFrom(["future"]) + .BindExecutor(future) + .Build(); + + workflow.OutputExecutors.Should().ContainKey("future"); + workflow.OutputExecutors["future"].Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs index 7b9b428871..4402142fae 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowHostSmokeTests.cs @@ -824,4 +824,130 @@ public Task Test_Handoffs_AsAgent_OutgoingMessagesInHistoryAsync(bool runAsync) Workflow handoffWorkflow = new HandoffWorkflowBuilder(agent).Build(); return this.Run_AsAgent_OutgoingMessagesInHistoryAsync(handoffWorkflow, runAsync); } + + // ----- Phase 5: Workflow-as-Agent intermediate forwarding ----------------- + + [Collection(Futures.FuturesSerialCollection.Name)] + public class IntermediateForwarding + { + private const string InterText = "progress"; + private const string FinalText = "final"; + + private static async Task> RunStreamingAsync( + Workflow workflow, + bool includeWorkflowOutputsInResponse = false) + { + return await workflow + .AsAIAgent("WorkflowAgent", includeWorkflowOutputsInResponse: includeWorkflowOutputsInResponse) + .RunStreamingAsync(new ChatMessage(ChatRole.User, "hi")) + .ToListAsync(); + } + + [Fact] + public async Task Test_WorkflowHostAgent_IntermediateAgentResponseForwardedInStreamingAsync() + { + using Futures.FuturesScope _ = new(enabled: true); + TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText)); + ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true }); + Workflow workflow = new WorkflowBuilder(binding) + .WithIntermediateOutputFrom([binding]) + .Build(); + + // Under Futures-on, AgentResponseEvent mirrors AgentResponseUpdateEvent: always + // forwarded regardless of the include flag. The intermediate tag is observable on + // the surfaced event for consumers that care to distinguish. + List updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: false); + + updates.Any(u => u.RawRepresentation is AgentResponseEvent are && are.IsIntermediate() && u.Text == InterText) + .Should().BeTrue("AgentResponseEvent is forwarded under Futures-on regardless of the include flag"); + } + + [Fact] + public async Task Test_WorkflowHostAgent_TerminalAgentResponseForwardedUnconditionallyWhenFuturesOnAsync() + { + using Futures.FuturesScope _ = new(enabled: true); + TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(FinalText)); + ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true }); + Workflow workflow = new WorkflowBuilder(binding) + .WithOutputFrom(binding) + .Build(); + + // Even a terminal-only designation surfaces without the include flag — the gating + // asymmetry between AgentResponse and AgentResponseUpdate is gone under Futures-on. + List updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: false); + + updates.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText) + .Should().BeTrue("terminal AgentResponseEvent is forwarded under Futures-on regardless of the include flag"); + } + + [Fact] + public async Task Test_WorkflowHostAgent_TerminalAgentResponseGatedWhenFuturesOffAsync() + { + using Futures.FuturesScope _ = new(enabled: false); + + static Workflow Build() + { + TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(FinalText)); + ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true }); + return new WorkflowBuilder(binding).WithOutputFrom(binding).Build(); + } + + // Legacy semantics: AgentResponseEvent stays behind the include flag when Futures + // is off. Two fresh workflows because in-process runs aren't reentrant. + List gated = await RunStreamingAsync(Build(), includeWorkflowOutputsInResponse: false); + gated.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText) + .Should().BeFalse("terminal AgentResponseEvent stays gated under Futures-off"); + + List included = await RunStreamingAsync(Build(), includeWorkflowOutputsInResponse: true); + included.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == FinalText) + .Should().BeTrue("opting in via includeWorkflowOutputsInResponse surfaces it"); + } + + [Fact] + public async Task Test_WorkflowHostAgent_UndesignatedExecutorEmitsNoAgentResponseEventWhenFuturesOnAsync() + { + using Futures.FuturesScope _ = new(enabled: true); + TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText)); + ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true }); + // No designation — under Futures-on, the AgentResponse is dropped by the filter. + Workflow workflow = new WorkflowBuilder(binding).Build(); + + List updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: true); + + updates.Any(u => u.RawRepresentation is AgentResponseEvent) + .Should().BeFalse("an undesignated AIAgent executor produces no AgentResponseEvent under Futures-on"); + } + + [Fact] + public async Task Test_WorkflowHostAgent_UndesignatedAgentResponseSurfacesWhenFuturesOffAsync() + { + using Futures.FuturesScope _ = new(enabled: false); + TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText)); + ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true }); + Workflow workflow = new WorkflowBuilder(binding).Build(); + + List updates = await RunStreamingAsync(workflow, includeWorkflowOutputsInResponse: true); + + updates.Any(u => u.RawRepresentation is AgentResponseEvent && u.Text == InterText) + .Should().BeTrue("legacy bypass still emits AgentResponseEvent regardless of designation"); + } + + [Fact] + public async Task Test_WorkflowHostAgent_IntermediateTagAvailableViaRawRepresentationAsync() + { + using Futures.FuturesScope _ = new(enabled: true); + TestReplayAgent agent = new(TestReplayAgent.ToChatMessages(InterText)); + ExecutorBinding binding = agent.BindAsExecutor(new AIAgentHostOptions { EmitAgentResponseEvents = true }); + Workflow workflow = new WorkflowBuilder(binding) + .WithIntermediateOutputFrom([binding]) + .Build(); + + List updates = await RunStreamingAsync(workflow); + + AgentResponseUpdate progress = updates.First(u => u.RawRepresentation is AgentResponseEvent && u.Text == InterText); + AgentResponseEvent raw = (AgentResponseEvent)progress.RawRepresentation!; + raw.IsIntermediate().Should().BeTrue(); + raw.Tags.Should().BeEquivalentTo(new[] { OutputTag.Intermediate }); + } + } }