Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
- name: Inventory Message Channel
href: inventory-message-channel.md

- name: Warehouse Polling Consumer
href: warehouse-polling-consumer.md

- name: Patterns Showcase — Integrated Order Processing
href: patterns-showcase.md

Expand Down
13 changes: 13 additions & 0 deletions docs/examples/warehouse-polling-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Warehouse Polling Consumer

The warehouse polling consumer example explicitly pulls replenishment requests from a message source.

```csharp
services.AddWarehousePollingConsumerDemo();

GeneratedWarehousePollingConsumer.Enqueue(new ReplenishmentRequest("SKU-100", 4));
var service = provider.GetRequiredService<WarehousePollingConsumerService>();
var summary = service.Poll();
```

The example includes fluent and source-generated construction, a pull-based service boundary, and `IServiceCollection` registration for existing .NET applications.
1 change: 1 addition & 0 deletions docs/generators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato
|---|---|---|
| [**Dispatcher**](dispatcher.md) | Mediator pattern with commands, notifications, and streams | `[GenerateDispatcher]` |
| [**Message Channel**](message-channel.md) | Typed channel factories for in-process message queues | `[GenerateMessageChannel]` |
| [**Polling Consumer**](polling-consumer.md) | Pull-based message consumer factories | `[GeneratePollingConsumer]` |
| [**Message Envelope**](messaging.md#generated-message-envelope) | Required message metadata contracts | `[GenerateMessageEnvelope]` |
| [**Message Translator**](message-translator.md) | Partner and transport event normalization | `[GenerateMessageTranslator]` |
| [**Claim Check**](claim-check.md) | External payload storage references | `[GenerateClaimCheck]` |
Expand Down
20 changes: 20 additions & 0 deletions docs/generators/polling-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Polling Consumer Generator

`[GeneratePollingConsumer]` creates a typed `PollingConsumer<TPayload>` factory.

```csharp
[GeneratePollingConsumer(typeof(ReplenishmentRequest), FactoryName = "Create", ConsumerName = "warehouse-replenishment-poller")]
public static partial class WarehousePoller
{
[PollingConsumerSource]
private static Message<ReplenishmentRequest>? Poll(MessageContext context) => TryReadNext();
}
```

The source method must be static, return `Message<TPayload>?`, and accept a `MessageContext`.

Diagnostics:

- `PKPOLL001`: host type must be partial.
- `PKPOLL002`: exactly one polling source is required.
- `PKPOLL003`: polling source signature is invalid.
3 changes: 3 additions & 0 deletions docs/generators/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
- name: Message Channel
href: message-channel.md

- name: Polling Consumer
href: polling-consumer.md

- name: Message Translator
href: message-translator.md

Expand Down
1 change: 1 addition & 0 deletions docs/guides/pattern-coverage.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr
| Family | Pattern | Fluent/runtime path | Source-generated path |
| --- | --- | --- | --- |
| Enterprise Integration | Message Channel | `MessageChannel<TPayload>` | Message Channel generator |
| Enterprise Integration | Polling Consumer | `PollingConsumer<TPayload>` | Polling Consumer generator |
| Enterprise Integration | Message Envelope | `Message<TPayload>`, headers, context | Messaging generator |
| Enterprise Integration | Message Translator | `MessageTranslator<TInput, TOutput>` | Message Translator generator |
| Enterprise Integration | Claim Check | `ClaimCheck<TPayload>` | Claim Check generator |
Expand Down
16 changes: 16 additions & 0 deletions docs/patterns/messaging/polling-consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Polling Consumer

Polling Consumer explicitly asks a message source for the next available message.

```csharp
var consumer = PollingConsumer<ReplenishmentRequest>
.Create("warehouse-replenishment-poller")
.From(context => channel.TryReceive().Message)
.Build();

var result = consumer.Poll();
```

Use it when application code controls the polling cadence, backoff, and transaction boundary. The fluent runtime path accepts any synchronous poll source and returns typed empty or received results.

The source-generated path uses `[GeneratePollingConsumer]` and `[PollingConsumerSource]`. Import the warehouse example through `AddWarehousePollingConsumerDemo()` or `AddPatternKitExamples()`.
2 changes: 2 additions & 0 deletions docs/patterns/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@
items:
- name: Message Channel
href: messaging/message-channel.md
- name: Polling Consumer
href: messaging/polling-consumer.md
- name: Message Envelope and Context
href: messaging/message-envelope.md
- name: Message Translator
Expand Down
69 changes: 69 additions & 0 deletions src/PatternKit.Core/Messaging/Consumers/PollingConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
namespace PatternKit.Messaging.Consumers;

/// <summary>Pull-based consumer that explicitly polls a message source.</summary>
public sealed class PollingConsumer<TPayload>
{
public delegate Message<TPayload>? PollSource(MessageContext context);

private readonly PollSource _source;

private PollingConsumer(string name, PollSource source)
=> (Name, _source) = (name, source);

public string Name { get; }

public PollingConsumerResult<TPayload> Poll(MessageContext? context = null)
{
var effectiveContext = context ?? MessageContext.Empty;
var message = _source(effectiveContext);
return message is null
? PollingConsumerResult<TPayload>.Empty(Name)
: PollingConsumerResult<TPayload>.Success(Name, message);
}

public static Builder Create(string name = "polling-consumer") => new(name);

public sealed class Builder
{
private readonly string _name;
private PollSource? _source;

internal Builder(string name)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Polling consumer name cannot be null, empty, or whitespace.", nameof(name));

_name = name;
}

public Builder From(PollSource source)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
return this;
}

public PollingConsumer<TPayload> Build()
{
if (_source is null)
throw new InvalidOperationException("Polling consumer requires a message source.");

return new(_name, _source);
}
}
}

public sealed class PollingConsumerResult<TPayload>
{
private PollingConsumerResult(string consumerName, bool received, Message<TPayload>? message)
=> (ConsumerName, Received, Message) = (consumerName, received, message);

public string ConsumerName { get; }

public bool Received { get; }

public Message<TPayload>? Message { get; }

internal static PollingConsumerResult<TPayload> Success(string consumerName, Message<TPayload> message) => new(consumerName, true, message);

internal static PollingConsumerResult<TPayload> Empty(string consumerName) => new(consumerName, false, null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
using PatternKit.Examples.UnitOfWorkDemo;
using PatternKit.Examples.VisitorDemo;
using PatternKit.Messaging.Channels;
using PatternKit.Messaging.Consumers;
using PatternKit.Messaging.Routing;
using PatternKit.Messaging.Storage;
using PatternKit.Messaging.ControlBus;
Expand Down Expand Up @@ -125,6 +126,7 @@ public sealed record ApiExceptionMappingVisitorExample(Func<Task> RunAsync);
public sealed record EventProcessingVisitorExample(Func<Task> RunAsync);
public sealed record MessageRouterVisitorExample(Func<RoutingSummary> Run);
public sealed record InventoryMessageChannelExampleService(MessageChannel<InventoryAdjustment> Channel, InventoryMessageChannelService Service);
public sealed record WarehousePollingConsumerExampleService(PollingConsumer<ReplenishmentRequest> Consumer, WarehousePollingConsumerService Service);
public sealed record GeneratedMessageEnvelopeExample(MessageEnvelopeExampleRunner Runner);
public sealed record GeneratedMessageTranslatorExample(PartnerEventTranslatorExampleRunner Runner, PartnerOrderImportService Service);
public sealed record GeneratedClaimCheckExample(LargeDocumentClaimCheckExampleRunner Runner, LargeDocumentWorkflow Workflow);
Expand Down Expand Up @@ -204,6 +206,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s
.AddEventProcessingVisitorExample()
.AddMessageRouterVisitorExample()
.AddInventoryMessageChannelExample()
.AddWarehousePollingConsumerExample()
.AddGeneratedMessageEnvelopeExample()
.AddGeneratedMessageTranslatorExample()
.AddGeneratedClaimCheckExample()
Expand Down Expand Up @@ -463,6 +466,15 @@ public static IServiceCollection AddInventoryMessageChannelExample(this IService
return services.RegisterExample<InventoryMessageChannelExampleService>("Inventory Message Channel", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost);
}

public static IServiceCollection AddWarehousePollingConsumerExample(this IServiceCollection services)
{
services.AddWarehousePollingConsumerDemo();
services.AddSingleton<WarehousePollingConsumerExampleService>(sp => new(
sp.GetRequiredService<PollingConsumer<ReplenishmentRequest>>(),
sp.GetRequiredService<WarehousePollingConsumerService>()));
return services.RegisterExample<WarehousePollingConsumerExampleService>("Warehouse Polling Consumer", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost);
}

public static IServiceCollection AddGeneratedMessageEnvelopeExample(this IServiceCollection services)
{
services.AddMessageEnvelopeExample();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using Microsoft.Extensions.DependencyInjection;
using PatternKit.Generators.Messaging;
using PatternKit.Messaging;
using PatternKit.Messaging.Channels;
using PatternKit.Messaging.Consumers;

namespace PatternKit.Examples.Messaging;

public sealed record ReplenishmentRequest(string Sku, int Quantity);

public sealed record WarehousePollingSummary(bool Received, string? Sku);

public sealed class WarehousePollingConsumerService(PollingConsumer<ReplenishmentRequest> consumer)
{
public WarehousePollingSummary Poll()
{
var result = consumer.Poll();
return new(result.Received, result.Message?.Payload.Sku);
}
}

public static class WarehousePollingConsumers
{
public static PollingConsumer<ReplenishmentRequest> Create(MessageChannel<ReplenishmentRequest> channel)
=> PollingConsumer<ReplenishmentRequest>.Create("warehouse-replenishment-poller")
.From(_ => channel.TryReceive().Message)
.Build();
}

[GeneratePollingConsumer(typeof(ReplenishmentRequest), FactoryName = "Create", ConsumerName = "warehouse-replenishment-poller")]
public static partial class GeneratedWarehousePollingConsumer
{
private static readonly Queue<Message<ReplenishmentRequest>> Messages = new();

public static void Enqueue(ReplenishmentRequest request) => Messages.Enqueue(Message<ReplenishmentRequest>.Create(request));

[PollingConsumerSource]
private static Message<ReplenishmentRequest>? Poll(MessageContext context) => Messages.Count == 0 ? null : Messages.Dequeue();
Comment on lines +33 to +38
}

public sealed class WarehousePollingConsumerExampleRunner(WarehousePollingConsumerService service)
{
public WarehousePollingSummary RunGenerated() => service.Poll();

public static WarehousePollingSummary RunFluent(ReplenishmentRequest request)
{
var channel = MessageChannel<ReplenishmentRequest>.Create("warehouse-replenishment").Build();
channel.Send(Message<ReplenishmentRequest>.Create(request));
return new WarehousePollingConsumerService(WarehousePollingConsumers.Create(channel)).Poll();
}
}

public static class WarehousePollingConsumerExampleServiceCollectionExtensions
{
public static IServiceCollection AddWarehousePollingConsumerDemo(this IServiceCollection services)
{
services.AddSingleton(_ => GeneratedWarehousePollingConsumer.Create());
services.AddSingleton<WarehousePollingConsumerService>();
services.AddSingleton<WarehousePollingConsumerExampleRunner>();
return services;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public sealed class PatternKitExampleCatalog : IPatternKitExampleCatalog
ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost,
["MessageChannel"],
["typed queue boundary", "source-generated channel factory", "DI composition"]),
Descriptor(
"Warehouse Polling Consumer",
"src/PatternKit.Examples/Messaging/WarehousePollingConsumerExample.cs",
"test/PatternKit.Examples.Tests/Messaging/WarehousePollingConsumerExampleTests.cs",
"docs/examples/warehouse-polling-consumer.md",
ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost,
["PollingConsumer", "MessageChannel"],
["pull-based replenishment workflow", "source-generated polling consumer factory", "DI composition"]),
Descriptor(
"Patterns Showcase",
"src/PatternKit.Examples/PatternShowcase/PatternShowcase.cs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,19 @@ public sealed class PatternKitPatternCatalog : IPatternKitPatternCatalog
"test/PatternKit.Examples.Tests/Messaging/InventoryMessageChannelExampleTests.cs",
["fluent typed message queue", "generated channel factory", "DI-importable inventory example"]),

Pattern("Polling Consumer", PatternFamily.EnterpriseIntegration,
"docs/patterns/messaging/polling-consumer.md",
"src/PatternKit.Core/Messaging/Consumers/PollingConsumer.cs",
"test/PatternKit.Tests/Messaging/Consumers/PollingConsumerTests.cs",
"docs/generators/polling-consumer.md",
"src/PatternKit.Generators/Messaging/PollingConsumerGenerator.cs",
"test/PatternKit.Generators.Tests/PollingConsumerGeneratorTests.cs",
null,
"docs/examples/warehouse-polling-consumer.md",
"src/PatternKit.Examples/Messaging/WarehousePollingConsumerExample.cs",
"test/PatternKit.Examples.Tests/Messaging/WarehousePollingConsumerExampleTests.cs",
["fluent pull consumer", "generated polling source factory", "DI-importable warehouse replenishment example"]),

Pattern("Message Envelope", PatternFamily.EnterpriseIntegration,
"docs/patterns/messaging/message-envelope.md",
"src/PatternKit.Core/Messaging/Message.cs",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;

namespace PatternKit.Generators.Messaging;

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = false)]
public sealed class GeneratePollingConsumerAttribute : Attribute
{
public GeneratePollingConsumerAttribute(Type payloadType)
=> PayloadType = payloadType ?? throw new ArgumentNullException(nameof(payloadType));

public Type PayloadType { get; }

public string FactoryName { get; set; } = "Create";

public string ConsumerName { get; set; } = "polling-consumer";
}

[AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)]
public sealed class PollingConsumerSourceAttribute : Attribute
{
}
3 changes: 3 additions & 0 deletions src/PatternKit.Generators/AnalyzerReleases.Unshipped.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,6 @@ PKRSEQ002 | PatternKit.Generators.Messaging | Error | Resequencer must declare e
PKRSEQ003 | PatternKit.Generators.Messaging | Error | Resequencer sequence selector signature is invalid.
PKCHN001 | PatternKit.Generators.Messaging | Error | Message Channel host type must be partial.
PKCHN002 | PatternKit.Generators.Messaging | Error | Message Channel capacity is invalid.
PKPOLL001 | PatternKit.Generators.Messaging | Error | Polling Consumer host type must be partial.
PKPOLL002 | PatternKit.Generators.Messaging | Error | Polling Consumer must declare exactly one source.
PKPOLL003 | PatternKit.Generators.Messaging | Error | Polling Consumer source signature is invalid.
Loading
Loading