Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/examples/inventory-message-channel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Inventory Message Channel

The inventory message channel example queues inventory adjustments and lets an application service explicitly process the next available message.

```csharp
services.AddInventoryMessageChannelDemo();

var service = provider.GetRequiredService<InventoryMessageChannelService>();
service.Enqueue(new InventoryAdjustment("SKU-100", 3, "cycle-count"));
var processed = service.TryProcessNext();
```

The example includes fluent and source-generated construction, bounded channel configuration, and `IServiceCollection` registration for existing .NET applications.
3 changes: 3 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
- name: Visitor — Message Router (Background Worker)
href: message-router-visitor.md

- name: Inventory Message Channel
href: inventory-message-channel.md

- name: Patterns Showcase — Integrated Order Processing
href: patterns-showcase.md

Expand Down
1 change: 1 addition & 0 deletions docs/generators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato
| Generator | Description | Attribute |
|---|---|---|
| [**Dispatcher**](dispatcher.md) | Mediator pattern with commands, notifications, and streams | `[GenerateDispatcher]` |
| [**Message Channel**](message-channel.md) | Typed channel factories for in-process message queues | `[GenerateMessageChannel]` |
| [**Message Envelope**](messaging.md#generated-message-envelope) | Required message metadata contracts | `[GenerateMessageEnvelope]` |
| [**Message Translator**](message-translator.md) | Partner and transport event normalization | `[GenerateMessageTranslator]` |
| [**Claim Check**](claim-check.md) | External payload storage references | `[GenerateClaimCheck]` |
Expand Down
15 changes: 15 additions & 0 deletions docs/generators/message-channel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Message Channel Generator

`[GenerateMessageChannel]` creates a typed `MessageChannel<TPayload>` factory.

```csharp
[GenerateMessageChannel(typeof(InventoryAdjustment), FactoryName = "Create", ChannelName = "inventory-adjustments", Capacity = 32)]
public static partial class InventoryChannel;
```

Set `Capacity` to a positive value for bounded channels, or leave it at `-1` for unbounded channels. `BackpressurePolicy` maps to `MessageChannelBackpressurePolicy` and defaults to `Reject`.

Diagnostics:

- `PKCHN001`: host type must be partial.
- `PKCHN002`: capacity must be `-1` or greater than zero.
3 changes: 3 additions & 0 deletions docs/generators/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
- name: Messaging Generators
href: messaging.md

- name: Message Channel
href: message-channel.md

- name: Message Translator
href: message-translator.md

Expand Down
1 change: 1 addition & 0 deletions docs/guides/pattern-coverage.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr

| Family | Pattern | Fluent/runtime path | Source-generated path |
| --- | --- | --- | --- |
| Enterprise Integration | Message Channel | `MessageChannel<TPayload>` | Message Channel generator |
| Enterprise Integration | Message Envelope | `Message<TPayload>`, headers, context | Messaging generator |
| Enterprise Integration | Message Translator | `MessageTranslator<TInput, TOutput>` | Message Translator generator |
| Enterprise Integration | Claim Check | `ClaimCheck<TPayload>` | Claim Check generator |
Expand Down
17 changes: 17 additions & 0 deletions docs/patterns/messaging/message-channel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Message Channel

Message Channel provides a typed queue between message producers and consumers.

```csharp
var channel = MessageChannel<InventoryAdjustment>
.Create("inventory-adjustments")
.WithCapacity(32)
.Build();

channel.Send(Message<InventoryAdjustment>.Create(new("SKU-100", 3, "cycle-count")));
var next = channel.TryReceive();
```

Use it when an application needs an explicit messaging boundary between code that produces work and code that processes work. The fluent runtime path supports unbounded channels, bounded channels, reject or drop-oldest backpressure, snapshots, and typed receive results.

The source-generated path uses `[GenerateMessageChannel]`. Import the inventory example through `AddInventoryMessageChannelDemo()` or `AddPatternKitExamples()`.
2 changes: 2 additions & 0 deletions docs/patterns/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@
- name: Messaging
href: messaging/README.md
items:
- name: Message Channel
href: messaging/message-channel.md
- name: Message Envelope and Context
href: messaging/message-envelope.md
- name: Message Translator
Expand Down
133 changes: 133 additions & 0 deletions src/PatternKit.Core/Messaging/Channels/MessageChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
namespace PatternKit.Messaging.Channels;

/// <summary>Thread-safe in-memory channel for typed PatternKit messages.</summary>
public sealed class MessageChannel<TPayload>
{
private readonly object _gate = new();
private readonly Queue<Message<TPayload>> _messages = new();
private readonly int? _capacity;
private readonly MessageChannelBackpressurePolicy _backpressure;

private MessageChannel(string name, int? capacity, MessageChannelBackpressurePolicy backpressure)
=> (Name, _capacity, _backpressure) = (name, capacity, backpressure);

public string Name { get; }

public int Count
{
get
{
lock (_gate)
return _messages.Count;
}
}

public MessageChannelSendResult Send(Message<TPayload> message)
{
if (message is null)
throw new ArgumentNullException(nameof(message));

lock (_gate)
{
if (_capacity.HasValue && _messages.Count >= _capacity.Value)
{
if (_backpressure == MessageChannelBackpressurePolicy.Reject)
return MessageChannelSendResult.Failed(Name, _messages.Count, "Channel capacity has been reached.");

_messages.Dequeue();
}

_messages.Enqueue(message);
return MessageChannelSendResult.Success(Name, _messages.Count);
}
}

public MessageChannelReceiveResult<TPayload> TryReceive()
{
lock (_gate)
{
if (_messages.Count == 0)
return MessageChannelReceiveResult<TPayload>.Empty(Name);

var message = _messages.Dequeue();
return MessageChannelReceiveResult<TPayload>.Success(Name, message, _messages.Count);
}
}

public IReadOnlyList<Message<TPayload>> Snapshot()
{
lock (_gate)
return _messages.ToArray();
}

public static Builder Create(string name = "message-channel") => new(name);

public sealed class Builder
{
private readonly string _name;
private int? _capacity;
private MessageChannelBackpressurePolicy _backpressure = MessageChannelBackpressurePolicy.Reject;

internal Builder(string name)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Message channel name cannot be null, empty, or whitespace.", nameof(name));

_name = name;
}

public Builder WithCapacity(int capacity, MessageChannelBackpressurePolicy backpressure = MessageChannelBackpressurePolicy.Reject)
{
if (capacity <= 0)
throw new ArgumentOutOfRangeException(nameof(capacity), "Channel capacity must be greater than zero.");

_capacity = capacity;
_backpressure = backpressure;
return this;
}

public MessageChannel<TPayload> Build() => new(_name, _capacity, _backpressure);
}
}

public enum MessageChannelBackpressurePolicy
{
Reject,
DropOldest
}

public sealed class MessageChannelSendResult
{
private MessageChannelSendResult(string channelName, bool accepted, int count, string? rejectionReason)
=> (ChannelName, Accepted, Count, RejectionReason) = (channelName, accepted, count, rejectionReason);

public string ChannelName { get; }

public bool Accepted { get; }

public int Count { get; }

public string? RejectionReason { get; }

internal static MessageChannelSendResult Success(string channelName, int count) => new(channelName, true, count, null);

internal static MessageChannelSendResult Failed(string channelName, int count, string reason) => new(channelName, false, count, reason);
}

public sealed class MessageChannelReceiveResult<TPayload>
{
private MessageChannelReceiveResult(string channelName, bool received, Message<TPayload>? message, int count)
=> (ChannelName, Received, Message, Count) = (channelName, received, message, count);

public string ChannelName { get; }

public bool Received { get; }

public Message<TPayload>? Message { get; }

public int Count { get; }

internal static MessageChannelReceiveResult<TPayload> Success(string channelName, Message<TPayload> message, int count) => new(channelName, true, message, count);

internal static MessageChannelReceiveResult<TPayload> Empty(string channelName) => new(channelName, false, null, 0);
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
using PatternKit.Examples.TransactionScriptDemo;
using PatternKit.Examples.UnitOfWorkDemo;
using PatternKit.Examples.VisitorDemo;
using PatternKit.Messaging.Channels;
using PatternKit.Messaging.Routing;
using PatternKit.Messaging.Storage;
using PatternKit.Messaging.ControlBus;
Expand Down Expand Up @@ -123,6 +124,7 @@ public sealed record PosTenderVisitorExample(TypeDispatcher<VisitorTender, strin
public sealed record ApiExceptionMappingVisitorExample(Func<Task> RunAsync);
public sealed record EventProcessingVisitorExample(Func<Task> RunAsync);
public sealed record MessageRouterVisitorExample(Func<RoutingSummary> Run);
public sealed record InventoryMessageChannelExampleService(MessageChannel<InventoryAdjustment> Channel, InventoryMessageChannelService Service);
public sealed record GeneratedMessageEnvelopeExample(MessageEnvelopeExampleRunner Runner);
public sealed record GeneratedMessageTranslatorExample(PartnerEventTranslatorExampleRunner Runner, PartnerOrderImportService Service);
public sealed record GeneratedClaimCheckExample(LargeDocumentClaimCheckExampleRunner Runner, LargeDocumentWorkflow Workflow);
Expand Down Expand Up @@ -201,6 +203,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s
.AddApiExceptionMappingVisitorExample()
.AddEventProcessingVisitorExample()
.AddMessageRouterVisitorExample()
.AddInventoryMessageChannelExample()
.AddGeneratedMessageEnvelopeExample()
.AddGeneratedMessageTranslatorExample()
.AddGeneratedClaimCheckExample()
Expand Down Expand Up @@ -451,6 +454,15 @@ public static IServiceCollection AddMessageRoutingExample(this IServiceCollectio
return services;
}

public static IServiceCollection AddInventoryMessageChannelExample(this IServiceCollection services)
{
services.AddInventoryMessageChannelDemo();
services.AddSingleton<InventoryMessageChannelExampleService>(sp => new(
sp.GetRequiredService<MessageChannel<InventoryAdjustment>>(),
sp.GetRequiredService<InventoryMessageChannelService>()));
return services.RegisterExample<InventoryMessageChannelExampleService>("Inventory Message Channel", ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost);
}

public static IServiceCollection AddGeneratedMessageEnvelopeExample(this IServiceCollection services)
{
services.AddMessageEnvelopeExample();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using Microsoft.Extensions.DependencyInjection;
using PatternKit.Generators.Messaging;
using PatternKit.Messaging;
using PatternKit.Messaging.Channels;

namespace PatternKit.Examples.Messaging;

public sealed record InventoryAdjustment(string Sku, int Quantity, string Reason);

public sealed record InventoryChannelSummary(bool Accepted, string? ReceivedSku, int RemainingMessages, string? RejectionReason);

public sealed class InventoryMessageChannelService(MessageChannel<InventoryAdjustment> channel)
{
public InventoryChannelSummary Enqueue(InventoryAdjustment adjustment)
{
var result = channel.Send(Message<InventoryAdjustment>.Create(adjustment).WithCorrelationId(adjustment.Sku));
return new(result.Accepted, null, result.Count, result.RejectionReason);
}

public InventoryChannelSummary TryProcessNext()
{
var result = channel.TryReceive();
return new(true, result.Message?.Payload.Sku, result.Count, null);
}
}

public static class InventoryMessageChannels
{
public static MessageChannel<InventoryAdjustment> Create()
=> MessageChannel<InventoryAdjustment>.Create("inventory-adjustments")
.WithCapacity(32)
.Build();
}

[GenerateMessageChannel(typeof(InventoryAdjustment), FactoryName = "Create", ChannelName = "inventory-adjustments", Capacity = 32)]
public static partial class GeneratedInventoryMessageChannel;

public sealed class InventoryMessageChannelExampleRunner(InventoryMessageChannelService service)
{
public InventoryChannelSummary RunGenerated(InventoryAdjustment adjustment)
{
service.Enqueue(adjustment);
return service.TryProcessNext();
}

public static InventoryChannelSummary RunFluent(InventoryAdjustment adjustment)
{
var service = new InventoryMessageChannelService(InventoryMessageChannels.Create());
service.Enqueue(adjustment);
return service.TryProcessNext();
}
}

public static class InventoryMessageChannelExampleServiceCollectionExtensions
{
public static IServiceCollection AddInventoryMessageChannelDemo(this IServiceCollection services)
{
services.AddSingleton(_ => GeneratedInventoryMessageChannel.Create());
services.AddSingleton<InventoryMessageChannelService>();
services.AddSingleton<InventoryMessageChannelExampleRunner>();
return services;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ public sealed class PatternKitExampleCatalog : IPatternKitExampleCatalog
ExampleIntegrationSurface.Messaging,
["Visitor", "ContentRouter"],
["message dispatch", "route fallback", "typed handlers"]),
Descriptor(
"Inventory Message Channel",
"src/PatternKit.Examples/Messaging/InventoryMessageChannelExample.cs",
"test/PatternKit.Examples.Tests/Messaging/InventoryMessageChannelExampleTests.cs",
"docs/examples/inventory-message-channel.md",
ExampleIntegrationSurface.Messaging | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost,
["MessageChannel"],
["typed queue boundary", "source-generated channel factory", "DI composition"]),
Descriptor(
"Patterns Showcase",
"src/PatternKit.Examples/PatternShowcase/PatternShowcase.cs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,19 @@ public sealed class PatternKitPatternCatalog : IPatternKitPatternCatalog
"test/PatternKit.Examples.Tests/Generators/VisitorGeneratorExamplesTests.cs",
["fluent visitor", "generated visitor", "document processing example"]),

Pattern("Message Channel", PatternFamily.EnterpriseIntegration,
"docs/patterns/messaging/message-channel.md",
"src/PatternKit.Core/Messaging/Channels/MessageChannel.cs",
"test/PatternKit.Tests/Messaging/Channels/MessageChannelTests.cs",
"docs/generators/message-channel.md",
"src/PatternKit.Generators/Messaging/MessageChannelGenerator.cs",
"test/PatternKit.Generators.Tests/MessageChannelGeneratorTests.cs",
null,
"docs/examples/inventory-message-channel.md",
"src/PatternKit.Examples/Messaging/InventoryMessageChannelExample.cs",
"test/PatternKit.Examples.Tests/Messaging/InventoryMessageChannelExampleTests.cs",
["fluent typed message queue", "generated channel factory", "DI-importable inventory example"]),

Pattern("Message Envelope", PatternFamily.EnterpriseIntegration,
"docs/patterns/messaging/message-envelope.md",
"src/PatternKit.Core/Messaging/Message.cs",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

namespace PatternKit.Generators.Messaging;

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = false)]
public sealed class GenerateMessageChannelAttribute : Attribute
{
public GenerateMessageChannelAttribute(Type payloadType)
=> PayloadType = payloadType ?? throw new ArgumentNullException(nameof(payloadType));

public Type PayloadType { get; }

public string FactoryName { get; set; } = "Create";

public string ChannelName { get; set; } = "message-channel";

public int Capacity { get; set; } = -1;

public string BackpressurePolicy { get; set; } = "Reject";
}
2 changes: 2 additions & 0 deletions src/PatternKit.Generators/AnalyzerReleases.Unshipped.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,5 @@ PKSCG005 | PatternKit.Generators.Messaging | Error | Scatter-Gather recipient na
PKRSEQ001 | PatternKit.Generators.Messaging | Error | Resequencer host type must be partial.
PKRSEQ002 | PatternKit.Generators.Messaging | Error | Resequencer must declare exactly one sequence selector.
PKRSEQ003 | PatternKit.Generators.Messaging | Error | Resequencer sequence selector signature is invalid.
PKCHN001 | PatternKit.Generators.Messaging | Error | Message Channel host type must be partial.
PKCHN002 | PatternKit.Generators.Messaging | Error | Message Channel capacity is invalid.
Loading
Loading