Skip to content

Latest commit

 

History

History
151 lines (112 loc) · 6.26 KB

File metadata and controls

151 lines (112 loc) · 6.26 KB
Uniflight Logo

Uniflight

crate.io docs.rs MSRV CI Coverage License This crate was developed as part of the Oxidizer project

Coalesces duplicate async tasks into a single execution.

This crate provides Merger, a mechanism for deduplicating concurrent async operations. When multiple tasks request the same work (identified by a key), only the first task (the “leader”) performs the actual work while subsequent tasks (the “followers”) wait and receive a clone of the result.

When to Use

Use Merger when you have expensive or rate-limited operations that may be requested concurrently with the same parameters:

  • Cache population: Prevent thundering herd when a cache entry expires
  • API calls: Deduplicate concurrent requests to the same endpoint
  • Database queries: Coalesce identical queries issued simultaneously
  • File I/O: Avoid reading the same file multiple times concurrently

Example

use uniflight::Merger;

let group: Merger<String, String> = Merger::new();

// Multiple concurrent calls with the same key will share a single execution.
// Note: you can pass &str directly when the key type is String.
let result = group
    .execute("user:123", || async {
        // This expensive operation runs only once, even if called concurrently
        "expensive_result".to_string()
    })
    .await
    .expect("leader should not panic");

Flexible Key Types

The Merger::execute method accepts keys using Borrow semantics, allowing you to pass borrowed forms of the key type. For example, with Merger<String, T>, you can pass &str directly without allocating:

let merger: Merger<String, i32> = Merger::new();

// Pass &str directly - no need to call .to_string()
let result = merger.execute("my-key", || async { 42 }).await;
assert_eq!(result, Ok(42));

Thread-Aware Scoping

Merger supports thread-aware scoping via a Strategy type parameter. This controls how the internal state is partitioned across threads/NUMA nodes:

  • PerProcess (default): Single global state, maximum deduplication
  • PerNuma: Separate state per NUMA node, NUMA-local memory access
  • PerCore: Separate state per core, no deduplication (useful for already-partitioned work)
use thread_aware::PerNuma;
use uniflight::Merger;

// NUMA-aware merger - each NUMA node gets its own deduplication scope
let merger: Merger<String, String, PerNuma> = Merger::new_per_numa();

Cancellation and Panic Handling

Merger handles task cancellation and panics explicitly:

  • If the leader task is cancelled or dropped, a follower becomes the new leader
  • If the leader task panics, followers receive LeaderPanicked error with the panic message
  • Followers that join before the leader completes receive the value the leader returns

When a panic occurs, followers are notified via the error type rather than silently retrying. The panic message is captured and available via LeaderPanicked::message:

let merger: Merger<String, String> = Merger::new();
match merger
    .execute("key", || async { "result".to_string() })
    .await
{
    Ok(value) => println!("got {value}"),
    Err(err) => {
        println!("leader panicked: {}", err.message());
        // Decide whether to retry
    }
}

Memory Management

Completed entries are automatically removed from the internal map when the last caller finishes. This ensures no stale entries accumulate over time.

Type Requirements

The value type T must implement Clone because followers receive a clone of the leader’s result. The key type K must implement Hash and Eq.

Thread Safety

Merger is Send and Sync, and can be shared across threads. The returned futures are Send when the closure, future, key, and value types are Send.

Performance

Run benchmarks with cargo bench -p uniflight. The suite covers:

  • single_call: Baseline latency with no contention
  • high_contention_100: 100 concurrent tasks on the same key
  • distributed_10x10: 10 keys with 10 tasks each

Use --save-baseline and --baseline flags to track regressions over time.


This crate was developed as part of The Oxidizer Project. Browse this crate's source code.