Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ Welcome! This section collects small, focused demos that show **how to compose b
* **Enterprise Messaging Workflow Suite**
End-to-end messaging examples for envelopes, content routing, recipient lists, splitters, aggregators, routing slips, sagas, mailboxes, idempotent receivers, inboxes, outboxes, and generated messaging factories. See [Enterprise Messaging Workflow Suite](enterprise-messaging-workflows.md).

* **Order Event-Driven Consumer**
Shows fluent and source-generated push consumers side by side, with an importable `IServiceCollection` extension. See [Order Event-Driven Consumer](order-event-driven-consumer.md).

* **Generated Message Envelope**
Shows fluent and source-generated message envelope contracts side by side, with an importable `IServiceCollection` extension. See [Generated Message Envelope](generated-message-envelope.md).

Expand Down
12 changes: 12 additions & 0 deletions docs/examples/order-event-driven-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Order Event-Driven Consumer

The order event-driven consumer example handles pushed order-accepted events and writes an audit entry.

```csharp
services.AddOrderEventDrivenConsumerDemo();

var service = provider.GetRequiredService<OrderEventDrivenConsumerService>();
var summary = service.Accept(new OrderAcceptedEvent("ORDER-100", 42.50m));
```

The example includes fluent and source-generated construction, a push-based service boundary, and `IServiceCollection` registration for existing .NET applications.
3 changes: 3 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
- name: Warehouse Polling Consumer
href: warehouse-polling-consumer.md

- name: Order Event-Driven Consumer
href: order-event-driven-consumer.md

- name: Patterns Showcase — Integrated Order Processing
href: patterns-showcase.md

Expand Down
21 changes: 21 additions & 0 deletions docs/generators/event-driven-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Event-Driven Consumer Generator

`[GenerateEventDrivenConsumer]` creates a typed `EventDrivenConsumer<TPayload>` factory.

```csharp
[GenerateEventDrivenConsumer(typeof(OrderAcceptedEvent), FactoryName = "Create", ConsumerName = "order-accepted-consumer")]
public static partial class OrderAcceptedConsumer
{
[EventDrivenConsumerHandler("audit")]
private static EventDrivenConsumerHandlerResult Audit(Message<OrderAcceptedEvent> message, MessageContext context)
=> EventDrivenConsumerHandlerResult.Success("audit");
}
```

Handler methods must be static, return `EventDrivenConsumerHandlerResult`, and accept `Message<TPayload>` plus `MessageContext`.

Diagnostics:

- `PKEVT001`: host type must be partial.
- `PKEVT002`: at least one event-driven consumer handler is required.
- `PKEVT003`: event-driven consumer handler signature is invalid.
1 change: 1 addition & 0 deletions docs/generators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato
| [**Dispatcher**](dispatcher.md) | Mediator pattern with commands, notifications, and streams | `[GenerateDispatcher]` |
| [**Message Channel**](message-channel.md) | Typed channel factories for in-process message queues | `[GenerateMessageChannel]` |
| [**Polling Consumer**](polling-consumer.md) | Pull-based message consumer factories | `[GeneratePollingConsumer]` |
| [**Event-Driven Consumer**](event-driven-consumer.md) | Push-based message consumer factories | `[GenerateEventDrivenConsumer]` |
| [**Message Envelope**](messaging.md#generated-message-envelope) | Required message metadata contracts | `[GenerateMessageEnvelope]` |
| [**Message Translator**](message-translator.md) | Partner and transport event normalization | `[GenerateMessageTranslator]` |
| [**Claim Check**](claim-check.md) | External payload storage references | `[GenerateClaimCheck]` |
Expand Down
3 changes: 3 additions & 0 deletions docs/generators/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
- name: Polling Consumer
href: polling-consumer.md

- name: Event-Driven Consumer
href: event-driven-consumer.md

- name: Message Translator
href: message-translator.md

Expand Down
1 change: 1 addition & 0 deletions docs/guides/pattern-coverage.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr
| --- | --- | --- | --- |
| Enterprise Integration | Message Channel | `MessageChannel<TPayload>` | Message Channel generator |
| Enterprise Integration | Polling Consumer | `PollingConsumer<TPayload>` | Polling Consumer generator |
| Enterprise Integration | Event-Driven Consumer | `EventDrivenConsumer<TPayload>` | Event-Driven Consumer generator |
| Enterprise Integration | Message Envelope | `Message<TPayload>`, headers, context | Messaging generator |
| Enterprise Integration | Message Translator | `MessageTranslator<TInput, TOutput>` | Message Translator generator |
| Enterprise Integration | Claim Check | `ClaimCheck<TPayload>` | Claim Check generator |
Expand Down
6 changes: 6 additions & 0 deletions docs/patterns/messaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ Bounded or unbounded in-process inboxes serialize async message handling through

[Learn More](mailbox.md)

## Event-Driven Consumer

Push-based consumers handle messages when a broker callback, background service, webhook, in-memory bus, or application event source delivers them.

[Learn More](event-driven-consumer.md)

## Idempotent Receiver, Inbox, and Outbox

Idempotency and handoff helpers compose message handlers with pluggable stores, inbox boundaries, and outbox records without claiming broker durability or exactly-once delivery.
Expand Down
20 changes: 20 additions & 0 deletions docs/patterns/messaging/event-driven-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Event-Driven Consumer

Event-Driven Consumer reacts when application code delivers a message to the consumer.

```csharp
var consumer = EventDrivenConsumer<OrderAcceptedEvent>
.Create("order-accepted-consumer")
.Handle("audit", (message, context) =>
{
audit.Append(message.Payload.OrderId);
return EventDrivenConsumerHandlerResult.Success("audit");
})
.Build();

var result = consumer.Accept(Message<OrderAcceptedEvent>.Create(orderAccepted));
```

Use it when the message arrival cadence is controlled by a broker callback, background service, webhook, in-memory bus, or application event source. The runtime path records handler failures and can either stop on the first failure or continue invoking remaining handlers.

The source-generated path uses `[GenerateEventDrivenConsumer]` and `[EventDrivenConsumerHandler]`. Import the order event example through `AddOrderEventDrivenConsumerDemo()` or `AddPatternKitExamples()`.
2 changes: 2 additions & 0 deletions docs/patterns/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@
href: messaging/message-channel.md
- name: Polling Consumer
href: messaging/polling-consumer.md
- name: Event-Driven Consumer
href: messaging/event-driven-consumer.md
- name: Message Envelope and Context
href: messaging/message-envelope.md
- name: Message Translator
Expand Down
168 changes: 168 additions & 0 deletions src/PatternKit.Core/Messaging/Consumers/EventDrivenConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
namespace PatternKit.Messaging.Consumers;

/// <summary>Push-based consumer that handles messages when application code delivers them.</summary>
public sealed class EventDrivenConsumer<TPayload>
{
public delegate EventDrivenConsumerHandlerResult Handler(Message<TPayload> message, MessageContext context);

private readonly IReadOnlyList<HandlerRegistration> _handlers;
private readonly EventDrivenConsumerErrorPolicy _errorPolicy;

private EventDrivenConsumer(string name, IReadOnlyList<HandlerRegistration> handlers, EventDrivenConsumerErrorPolicy errorPolicy)
=> (Name, _handlers, _errorPolicy) = (name, handlers, errorPolicy);

public string Name { get; }

public EventDrivenConsumerResult<TPayload> Accept(Message<TPayload> message, MessageContext? context = null)
{
if (message is null)
throw new ArgumentNullException(nameof(message));

var effectiveContext = context ?? MessageContext.From(message);
var failures = new List<EventDrivenConsumerHandlerResult>();
var invoked = 0;

foreach (var registration in _handlers)
{
invoked++;
EventDrivenConsumerHandlerResult result;
try
{
result = registration.Handler(message, effectiveContext);
}
catch (Exception ex)
{
result = EventDrivenConsumerHandlerResult.Failure(registration.Name, ex.Message, ex);
}

if (!result.Succeeded)
{
failures.Add(result);
if (_errorPolicy == EventDrivenConsumerErrorPolicy.StopOnFirstFailure)
break;
}
}

return new EventDrivenConsumerResult<TPayload>(Name, message, invoked, failures.AsReadOnly());
}

public static Builder Create(string name = "event-driven-consumer") => new(name);

public sealed class Builder
{
private readonly string _name;
private readonly List<HandlerRegistration> _handlers = new();
private EventDrivenConsumerErrorPolicy _errorPolicy = EventDrivenConsumerErrorPolicy.StopOnFirstFailure;

internal Builder(string name)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Event-driven consumer name cannot be null, empty, or whitespace.", nameof(name));

_name = name;
}

public Builder Handle(string handlerName, Handler handler)
{
if (string.IsNullOrWhiteSpace(handlerName))
throw new ArgumentException("Handler name cannot be null, empty, or whitespace.", nameof(handlerName));

_handlers.Add(new HandlerRegistration(handlerName, handler ?? throw new ArgumentNullException(nameof(handler))));
return this;
}

public Builder Handle(string handlerName, Action<Message<TPayload>, MessageContext> handler)
{
if (handler is null)
throw new ArgumentNullException(nameof(handler));

return Handle(handlerName, (message, context) =>
{
handler(message, context);
return EventDrivenConsumerHandlerResult.Success(handlerName);
});
}

public Builder OnError(EventDrivenConsumerErrorPolicy policy)
{
_errorPolicy = policy;
return this;
}

public EventDrivenConsumer<TPayload> Build()
{
if (_handlers.Count == 0)
throw new InvalidOperationException("Event-driven consumer requires at least one handler.");

return new(_name, _handlers.ToArray(), _errorPolicy);
}
}

private sealed class HandlerRegistration
{
public HandlerRegistration(string name, Handler handler)
=> (Name, Handler) = (name, handler);

public string Name { get; }

public Handler Handler { get; }
}
}

public enum EventDrivenConsumerErrorPolicy
{
StopOnFirstFailure,
Continue
}

public sealed class EventDrivenConsumerHandlerResult
{
private EventDrivenConsumerHandlerResult(string handlerName, bool succeeded, string? reason, Exception? exception)
=> (HandlerName, Succeeded, Reason, Exception) = (handlerName, succeeded, reason, exception);

public string HandlerName { get; }

public bool Succeeded { get; }

public string? Reason { get; }

public Exception? Exception { get; }

public static EventDrivenConsumerHandlerResult Success(string handlerName)
{
if (string.IsNullOrWhiteSpace(handlerName))
throw new ArgumentException("Handler name cannot be null, empty, or whitespace.", nameof(handlerName));

return new(handlerName, true, null, null);
}

public static EventDrivenConsumerHandlerResult Failure(string handlerName, string reason, Exception? exception = null)
{
if (string.IsNullOrWhiteSpace(handlerName))
throw new ArgumentException("Handler name cannot be null, empty, or whitespace.", nameof(handlerName));
if (string.IsNullOrWhiteSpace(reason))
throw new ArgumentException("Failure reason cannot be null, empty, or whitespace.", nameof(reason));

return new(handlerName, false, reason, exception);
}
}

public sealed class EventDrivenConsumerResult<TPayload>
{
internal EventDrivenConsumerResult(
string consumerName,
Message<TPayload> message,
int handlerCount,
IReadOnlyList<EventDrivenConsumerHandlerResult> failures)
=> (ConsumerName, Message, HandlerCount, Failures) = (consumerName, message, handlerCount, failures);

public string ConsumerName { get; }

public Message<TPayload> Message { get; }

public int HandlerCount { get; }

public IReadOnlyList<EventDrivenConsumerHandlerResult> Failures { get; }

public bool Accepted => Failures.Count == 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public sealed record EventProcessingVisitorExample(Func<Task> RunAsync);
public sealed record MessageRouterVisitorExample(Func<RoutingSummary> Run);
public sealed record InventoryMessageChannelExampleService(MessageChannel<InventoryAdjustment> Channel, InventoryMessageChannelService Service);
public sealed record WarehousePollingConsumerExampleService(PollingConsumer<ReplenishmentRequest> Consumer, WarehousePollingConsumerService Service);
public sealed record OrderEventDrivenConsumerExampleService(EventDrivenConsumer<OrderAcceptedEvent> Consumer, OrderEventDrivenConsumerService Service);
public sealed record GeneratedMessageEnvelopeExample(MessageEnvelopeExampleRunner Runner);
public sealed record GeneratedMessageTranslatorExample(PartnerEventTranslatorExampleRunner Runner, PartnerOrderImportService Service);
public sealed record GeneratedClaimCheckExample(LargeDocumentClaimCheckExampleRunner Runner, LargeDocumentWorkflow Workflow);
Expand Down Expand Up @@ -207,6 +208,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s
.AddMessageRouterVisitorExample()
.AddInventoryMessageChannelExample()
.AddWarehousePollingConsumerExample()
.AddOrderEventDrivenConsumerExample()
.AddGeneratedMessageEnvelopeExample()
.AddGeneratedMessageTranslatorExample()
.AddGeneratedClaimCheckExample()
Expand Down Expand Up @@ -475,6 +477,15 @@ public static IServiceCollection AddWarehousePollingConsumerExample(this IServic
return services.RegisterExample<WarehousePollingConsumerExampleService>("Warehouse Polling Consumer", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost);
}

public static IServiceCollection AddOrderEventDrivenConsumerExample(this IServiceCollection services)
{
services.AddOrderEventDrivenConsumerDemo();
services.AddSingleton<OrderEventDrivenConsumerExampleService>(sp => new(
sp.GetRequiredService<EventDrivenConsumer<OrderAcceptedEvent>>(),
sp.GetRequiredService<OrderEventDrivenConsumerService>()));
return services.RegisterExample<OrderEventDrivenConsumerExampleService>("Order Event-Driven Consumer", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost);
}

public static IServiceCollection AddGeneratedMessageEnvelopeExample(this IServiceCollection services)
{
services.AddMessageEnvelopeExample();
Expand Down
Loading
Loading