Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/examples/fulfillment-priority-queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Fulfillment Priority Queue

The fulfillment priority queue example schedules enterprise or expedited orders ahead of standard fulfillment work.

```csharp
services.AddFulfillmentPriorityQueueDemo();

var service = provider.GetRequiredService<FulfillmentPriorityQueueService>();
var summary = service.Schedule(
new FulfillmentPriorityWork("order-standard", "standard", expedited: false),
new FulfillmentPriorityWork("order-enterprise", "enterprise", expedited: false));
```
Comment on lines +8 to +12

The example includes fluent and source-generated construction, stable ordering for matching priorities, and `IServiceCollection` registration for existing .NET applications.
3 changes: 3 additions & 0 deletions docs/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ Welcome! This section collects small, focused demos that show **how to compose b
* **Inventory Service Activator**
Shows fluent and source-generated message-to-service activation with an importable `IServiceCollection` extension. See [Inventory Service Activator](inventory-service-activator.md).

* **Fulfillment Priority Queue**
Shows fluent and source-generated business-priority queues with an importable `IServiceCollection` extension. See [Fulfillment Priority Queue](fulfillment-priority-queue.md).

* **Generated Message Envelope**
Shows fluent and source-generated message envelope contracts side by side, with an importable `IServiceCollection` extension. See [Generated Message Envelope](generated-message-envelope.md).

Expand Down
3 changes: 3 additions & 0 deletions docs/examples/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@
- name: Fulfillment Queue Load Leveling
href: fulfillment-queue-load-leveling.md

- name: Fulfillment Priority Queue
href: fulfillment-priority-queue.md

- name: Product Catalog Cache-Aside
href: product-catalog-cache-aside.md

Expand Down
1 change: 1 addition & 0 deletions docs/generators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato
| [**Circuit Breaker**](circuit-breaker.md) | Dependency isolation policy factories with open and half-open states | `[GenerateCircuitBreakerPolicy]` |
| [**Bulkhead**](bulkhead.md) | Bounded concurrency and queue isolation policy factories | `[GenerateBulkheadPolicy]` |
| [**Queue Load Leveling**](queue-load-leveling.md) | Bounded worker queue policy factories | `[GenerateQueueLoadLevelingPolicy]` |
| [**Priority Queue**](priority-queue.md) | Business-priority queue factories | `[GeneratePriorityQueue]` |
| [**Cache-Aside**](cache-aside.md) | Read-through cache policy factories with TTL and cache predicates | `[GenerateCacheAsidePolicy]` |
| [**Rate Limiting**](rate-limiting.md) | Key-partitioned fixed-window rate limit policy factories | `[GenerateRateLimitPolicy]` |
| [**External Configuration Store**](external-configuration-store.md) | Typed centralized configuration loaders | `[GenerateExternalConfigurationStore]` |
Expand Down
20 changes: 20 additions & 0 deletions docs/generators/priority-queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Priority Queue Generator

`[GeneratePriorityQueue]` creates a typed `PriorityQueuePolicy<TItem, TPriority>` factory from a priority selector method.

```csharp
[GeneratePriorityQueue(typeof(FulfillmentPriorityWork), typeof(int), FactoryMethodName = "Create", QueueName = "fulfillment-priority")]
public static partial class FulfillmentPriorityQueue
{
[PriorityQueuePrioritySelector]
private static int GetPriority(FulfillmentPriorityWork work) => work.Expedited ? 10 : 1;
}
```

The generated factory is parameterless, so applications can register it directly in `IServiceCollection` and inject the resulting queue into hosted workers or application services.

Diagnostics:

- `PKPQ001`: host type must be partial.
- `PKPQ002`: exactly one priority selector is required.
- `PKPQ003`: priority selector signature is invalid.
3 changes: 3 additions & 0 deletions docs/generators/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@
- name: Queue Load Leveling
href: queue-load-leveling.md

- name: Priority Queue
href: priority-queue.md

- name: Repository
href: repository.md

Expand Down
1 change: 1 addition & 0 deletions docs/guides/pattern-coverage.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ The source of truth is `PatternKitPatternCatalog` in `src/PatternKit.Examples/Pr
| Cloud Architecture | Circuit Breaker | `CircuitBreakerPolicy<T>` | Circuit Breaker generator |
| Cloud Architecture | Bulkhead | `BulkheadPolicy<T>` | Bulkhead generator |
| Cloud Architecture | Queue-Based Load Leveling | `QueueLoadLevelingPolicy<T>` | Queue Load Leveling generator |
| Cloud Architecture | Priority Queue | `PriorityQueuePolicy<TItem, TPriority>` | Priority Queue generator |
| Cloud Architecture | Cache-Aside | `CacheAsidePolicy<T>` | Cache-Aside generator |
| Cloud Architecture | Rate Limiting | `RateLimitPolicy<T>` | Rate Limiting generator |
| Cloud Architecture | External Configuration Store | `ExternalConfigurationStore<TSettings>` | External Configuration Store generator |
Expand Down
18 changes: 18 additions & 0 deletions docs/patterns/cloud/priority-queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Priority Queue

Priority Queue schedules queued work by business priority instead of arrival order alone.

```csharp
var queue = PriorityQueuePolicy<FulfillmentPriorityWork, int>
.Create("fulfillment-priority")
.WithPrioritySelector(work => work.Expedited ? 10 : 1)
.DequeueHighestPriorityFirst()
.Build();

queue.Enqueue(new FulfillmentPriorityWork("order-100", "enterprise", expedited: false));
var next = queue.Dequeue();
Comment on lines +12 to +13
```

Use it when high-value, urgent, or otherwise prioritized work should be processed ahead of normal work while preserving FIFO ordering for matching priorities. The fluent API supports custom comparers and highest-first or lowest-first ordering.

The source-generated path uses `[GeneratePriorityQueue]` and `[PriorityQueuePrioritySelector]`. Import the fulfillment example through `AddFulfillmentPriorityQueueDemo()` or `AddPatternKitExamples()`.
2 changes: 2 additions & 0 deletions docs/patterns/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@
href: cloud/bulkhead.md
- name: Queue-Based Load Leveling
href: cloud/queue-load-leveling.md
- name: Priority Queue
href: cloud/priority-queue.md
- name: Cache-Aside
href: cloud/cache-aside.md
- name: Rate Limiting
Expand Down
187 changes: 187 additions & 0 deletions src/PatternKit.Core/Cloud/PriorityQueue/PriorityQueuePolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
namespace PatternKit.Cloud.PriorityQueue;

public sealed class PriorityQueueEnqueueResult<TItem, TPriority>
{
public PriorityQueueEnqueueResult(string queueName, TItem item, TPriority priority, int count)
=> (QueueName, Item, Priority, Count) = (queueName, item, priority, count);

public string QueueName { get; }

public TItem Item { get; }

public TPriority Priority { get; }

public int Count { get; }
}

public sealed class PriorityQueueDequeueResult<TItem, TPriority>
{
private PriorityQueueDequeueResult(string queueName, TItem? item, TPriority? priority, bool hasItem, int remainingCount)
=> (QueueName, Item, Priority, HasItem, RemainingCount) = (queueName, item, priority, hasItem, remainingCount);

public string QueueName { get; }

public TItem? Item { get; }

public TPriority? Priority { get; }

public bool HasItem { get; }

public int RemainingCount { get; }

public static PriorityQueueDequeueResult<TItem, TPriority> Empty(string queueName)
=> new(queueName, default, default, false, 0);

public static PriorityQueueDequeueResult<TItem, TPriority> ItemDequeued(string queueName, TItem item, TPriority priority, int remainingCount)
=> new(queueName, item, priority, true, remainingCount);
}

public sealed class PriorityQueuePolicy<TItem, TPriority>
{
private readonly object _gate = new();
private readonly List<Entry> _items = [];
private readonly Func<TItem, TPriority> _prioritySelector;
private readonly IComparer<TPriority> _comparer;
private readonly bool _dequeueHighestPriorityFirst;
private long _nextSequence;

Comment on lines +41 to +47
private PriorityQueuePolicy(
string name,
Func<TItem, TPriority> prioritySelector,
IComparer<TPriority> comparer,
bool dequeueHighestPriorityFirst)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Priority queue name is required.", nameof(name));

Name = name;
_prioritySelector = prioritySelector ?? throw new ArgumentNullException(nameof(prioritySelector));
_comparer = comparer ?? throw new ArgumentNullException(nameof(comparer));
_dequeueHighestPriorityFirst = dequeueHighestPriorityFirst;
}

public string Name { get; }

public int Count
{
get
{
lock (_gate)
return _items.Count;
}
}

public PriorityQueueEnqueueResult<TItem, TPriority> Enqueue(TItem item)
{
if (item is null)
throw new ArgumentNullException(nameof(item));

var priority = _prioritySelector(item);
lock (_gate)
{
_items.Add(new(item, priority, _nextSequence++));
return new(Name, item, priority, _items.Count);
}
}

public PriorityQueueDequeueResult<TItem, TPriority> Dequeue()
{
lock (_gate)
{
if (_items.Count == 0)
return PriorityQueueDequeueResult<TItem, TPriority>.Empty(Name);

var index = FindBestIndex();
var entry = _items[index];
_items.RemoveAt(index);
return PriorityQueueDequeueResult<TItem, TPriority>.ItemDequeued(Name, entry.Item, entry.Priority, _items.Count);
}
}

public PriorityQueueDequeueResult<TItem, TPriority> Peek()
{
lock (_gate)
{
if (_items.Count == 0)
return PriorityQueueDequeueResult<TItem, TPriority>.Empty(Name);

var entry = _items[FindBestIndex()];
return PriorityQueueDequeueResult<TItem, TPriority>.ItemDequeued(Name, entry.Item, entry.Priority, _items.Count);
}
}

public static Builder Create(string name = "priority-queue") => new(name);

private int FindBestIndex()
{
var best = 0;
for (var i = 1; i < _items.Count; i++)
{
var comparison = _comparer.Compare(_items[i].Priority, _items[best].Priority);
if (_dequeueHighestPriorityFirst ? comparison > 0 : comparison < 0)
{
best = i;
continue;
}

if (comparison == 0 && _items[i].Sequence < _items[best].Sequence)
best = i;
}

return best;
}

public sealed class Builder
{
private readonly string _name;
private Func<TItem, TPriority>? _prioritySelector;
private IComparer<TPriority> _comparer = Comparer<TPriority>.Default;
private bool _dequeueHighestPriorityFirst = true;

internal Builder(string name) => _name = name;

public Builder WithPrioritySelector(Func<TItem, TPriority> prioritySelector)
{
_prioritySelector = prioritySelector ?? throw new ArgumentNullException(nameof(prioritySelector));
return this;
}

public Builder WithComparer(IComparer<TPriority> comparer)
{
_comparer = comparer ?? throw new ArgumentNullException(nameof(comparer));
return this;
}

public Builder DequeueHighestPriorityFirst()
{
_dequeueHighestPriorityFirst = true;
return this;
}

public Builder DequeueLowestPriorityFirst()
{
_dequeueHighestPriorityFirst = false;
return this;
}

public PriorityQueuePolicy<TItem, TPriority> Build()
{
if (_prioritySelector is null)
throw new InvalidOperationException("Priority queue requires a priority selector.");

return new(_name, _prioritySelector, _comparer, _dequeueHighestPriorityFirst);
}
}

private sealed class Entry
{
public Entry(TItem item, TPriority priority, long sequence)
=> (Item, Priority, Sequence) = (item, priority, sequence);

public TItem Item { get; }

public TPriority Priority { get; }

public long Sequence { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using PatternKit.Cloud.Bulkhead;
using PatternKit.Cloud.CacheAside;
using PatternKit.Cloud.CircuitBreaker;
using PatternKit.Cloud.PriorityQueue;
using PatternKit.Cloud.RateLimiting;
using PatternKit.Cloud.QueueLoadLeveling;
using PatternKit.Cloud.Retry;
Expand Down Expand Up @@ -43,6 +44,7 @@
using PatternKit.Examples.PointOfSale;
using PatternKit.Examples.Pricing;
using PatternKit.Examples.ProductionReadiness;
using PatternKit.Examples.PriorityQueueDemo;
using PatternKit.Examples.PrototypeDemo;
using PatternKit.Examples.ProxyDemo;
using PatternKit.Examples.QueueLoadLevelingDemo;
Expand Down Expand Up @@ -185,6 +187,7 @@ public sealed record InventoryRetryExample(RetryPolicy<InventoryResponse> Policy
public sealed record FulfillmentCircuitBreakerExample(CircuitBreakerPolicy<FulfillmentResponse> Policy, FulfillmentCircuitBreakerService Service);
public sealed record ShippingBulkheadExample(BulkheadPolicy<ShippingAllocation> Policy, ShippingBulkheadService Service);
public sealed record FulfillmentQueueLoadLevelingExample(QueueLoadLevelingPolicy<FulfillmentQueueResult> Policy, FulfillmentQueueLoadLevelingService Service);
public sealed record FulfillmentPriorityQueueExample(PriorityQueuePolicy<FulfillmentPriorityWork, int> Queue, FulfillmentPriorityQueueService Service);
public sealed record ProductCatalogCacheAsideExample(CacheAsidePolicy<ProductReadModel> Policy, ProductCatalogCacheAsideService Service);
public sealed record ProductSearchRateLimitingExample(RateLimitPolicy<SearchResponse> Policy, ProductSearchRateLimitService Service);
public sealed record TenantExternalConfigurationStoreExample(TenantExternalConfigurationStoreDemoRunner Runner, TenantExternalConfigurationService Service);
Expand Down Expand Up @@ -269,6 +272,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s
.AddFulfillmentCircuitBreakerExample()
.AddShippingBulkheadExample()
.AddFulfillmentQueueLoadLevelingExample()
.AddFulfillmentPriorityQueueExample()
.AddProductCatalogCacheAsideExample()
.AddProductSearchRateLimitingExample()
.AddTenantExternalConfigurationStoreExample();
Expand Down Expand Up @@ -928,6 +932,15 @@ public static IServiceCollection AddFulfillmentQueueLoadLevelingExample(this ISe
return services.RegisterExample<FulfillmentQueueLoadLevelingExample>("Fulfillment Queue Load Leveling", ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection);
}

public static IServiceCollection AddFulfillmentPriorityQueueExample(this IServiceCollection services)
{
services.AddFulfillmentPriorityQueueDemo();
services.AddSingleton<FulfillmentPriorityQueueExample>(sp => new(
sp.GetRequiredService<PriorityQueuePolicy<FulfillmentPriorityWork, int>>(),
sp.GetRequiredService<FulfillmentPriorityQueueService>()));
return services.RegisterExample<FulfillmentPriorityQueueExample>("Fulfillment Priority Queue", ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost);
}

public static IServiceCollection AddProductCatalogCacheAsideExample(this IServiceCollection services)
{
services.AddProductCatalogCacheAsideDemo();
Expand Down
Loading
Loading