diff --git a/README.md b/README.md
index 37259015..6a74dc89 100644
--- a/README.md
+++ b/README.md
@@ -100,7 +100,6 @@ variants close the gap: they recompile the same source with `ISequencer` mapped
13. [Migration guides](#systemreactive-to-reactiveuiprimitives-migration-guide)
14. [Benchmarks and performance posture](#benchmarks-and-performance-posture)
15. [Repository layout](#repository-layout)
-16. [Validation commands](#validation-commands)
## Install
@@ -127,10 +126,15 @@ integration point. Every package below ships at the same version and targets the
| [ReactiveUI.Primitives.Extensions][Ext] | [![ExtB]][Ext] | The migrated non-async `ReactiveUI.Extensions` helper operators on lean Primitives. |
| [ReactiveUI.Primitives.Extensions.Reactive][ExtRx] | [![ExtRxB]][ExtRx] | Migrated extension helpers compiled against System.Reactive `Unit` and `IScheduler`. |
| [ReactiveUI.Primitives.Wpf][Wpf] | [![WpfB]][Wpf] | WPF dispatcher sequencer integration. |
+| [ReactiveUI.Primitives.Wpf.Reactive][WpfRx] | [![WpfRxB]][WpfRx] | WPF dispatcher scheduler integration for System.Reactive-first projects. |
| [ReactiveUI.Primitives.WinForms][WinForms] | [![WinFormsB]][WinForms] | Windows Forms control sequencer integration. |
+| [ReactiveUI.Primitives.WinForms.Reactive][WinFormsRx] | [![WinFormsRxB]][WinFormsRx] | Windows Forms control scheduler integration for System.Reactive-first projects. |
| [ReactiveUI.Primitives.WinUI][WinUI] | [![WinUIB]][WinUI] | WinUI dispatcher-queue sequencer integration. |
+| [ReactiveUI.Primitives.WinUI.Reactive][WinUIRx] | [![WinUIRxB]][WinUIRx] | WinUI dispatcher-queue scheduler integration for System.Reactive-first projects. |
| [ReactiveUI.Primitives.Blazor][Blazor] | [![BlazorB]][Blazor] | Blazor renderer sequencer integration. |
+| [ReactiveUI.Primitives.Blazor.Reactive][BlazorRx] | [![BlazorRxB]][BlazorRx] | Blazor renderer scheduler integration for System.Reactive-first projects. |
| [ReactiveUI.Primitives.Maui][Maui] | [![MauiB]][Maui] | MAUI dispatcher sequencer integration. |
+| [ReactiveUI.Primitives.Maui.Reactive][MauiRx] | [![MauiRxB]][MauiRx] | MAUI dispatcher scheduler integration for System.Reactive-first projects. |
[Disp]: https://www.nuget.org/packages/ReactiveUI.Disposables/
[DispB]: https://img.shields.io/nuget/v/ReactiveUI.Disposables.svg
@@ -154,14 +158,24 @@ integration point. Every package below ships at the same version and targets the
[ExtRxB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.Extensions.Reactive.svg
[Wpf]: https://www.nuget.org/packages/ReactiveUI.Primitives.Wpf/
[WpfB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.Wpf.svg
+[WpfRx]: https://www.nuget.org/packages/ReactiveUI.Primitives.Wpf.Reactive/
+[WpfRxB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.Wpf.Reactive.svg
[WinForms]: https://www.nuget.org/packages/ReactiveUI.Primitives.WinForms/
[WinFormsB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.WinForms.svg
+[WinFormsRx]: https://www.nuget.org/packages/ReactiveUI.Primitives.WinForms.Reactive/
+[WinFormsRxB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.WinForms.Reactive.svg
[WinUI]: https://www.nuget.org/packages/ReactiveUI.Primitives.WinUI/
[WinUIB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.WinUI.svg
+[WinUIRx]: https://www.nuget.org/packages/ReactiveUI.Primitives.WinUI.Reactive/
+[WinUIRxB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.WinUI.Reactive.svg
[Blazor]: https://www.nuget.org/packages/ReactiveUI.Primitives.Blazor/
[BlazorB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.Blazor.svg
+[BlazorRx]: https://www.nuget.org/packages/ReactiveUI.Primitives.Blazor.Reactive/
+[BlazorRxB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.Blazor.Reactive.svg
[Maui]: https://www.nuget.org/packages/ReactiveUI.Primitives.Maui/
[MauiB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.Maui.svg
+[MauiRx]: https://www.nuget.org/packages/ReactiveUI.Primitives.Maui.Reactive/
+[MauiRxB]: https://img.shields.io/nuget/v/ReactiveUI.Primitives.Maui.Reactive.svg
### How the packages layer
@@ -169,7 +183,7 @@ Each family (base, async, extensions) follows the same shape: a type-agnostic `.
**lean** leaf binds the abstract `RxVoid`/`ISequencer` types to its own lightweight implementations, while the
`.Reactive` leaf recompiles the same source against System.Reactive's `Unit`/`IScheduler`. Pick the lean leaf for a
dependency-free build, or the `.Reactive` leaf to interoperate with an existing System.Reactive codebase. The platform
-packages build on the lean base. (Arrows point from a package to what it depends on.)
+packages also come in lean and `.Reactive` leaves. (Arrows point from a package to what it depends on.)
```mermaid
graph TD
@@ -185,6 +199,7 @@ graph TD
Ext["...Extensions (lean)"]
ExtRx["...Extensions.Reactive"]
Plat["Wpf / WinForms / WinUI Blazor / Maui"]
+ PlatRx["Wpf.Reactive / WinForms.Reactive WinUI.Reactive / Blazor.Reactive / Maui.Reactive"]
Core --> Disp
Prim --> Core
@@ -202,6 +217,7 @@ graph TD
ExtRx --> Rx
ExtRx --> ExtCore
Plat --> Prim
+ PlatRx --> Rx
```
@@ -287,14 +303,17 @@ Package TFM groups are:
- `ReactiveUI.Primitives`: `$(LibraryTargetFrameworks)` plus `net10.0-android`, `net11.0-android`, and Apple platform
TFMs (`net10.0-ios`, `net11.0-ios`, `net10.0-tvos`, `net11.0-tvos`, `net10.0-macos`, `net11.0-macos`,
`net10.0-maccatalyst`, `net11.0-maccatalyst`) when the build OS supports restoring those workloads.
-- `ReactiveUI.Primitives.Wpf`: `net8.0-windows`, `net9.0-windows`, `net10.0-windows`, `net11.0-windows`, `net462`,
- `net472`, `net48`, `net481`.
-- `ReactiveUI.Primitives.WinForms`: `net8.0-windows`, `net9.0-windows`, `net10.0-windows`, `net11.0-windows`, `net462`,
- `net472`, `net48`, `net481`.
-- `ReactiveUI.Primitives.WinUI`: `net8.0-windows10.0.19041.0`, `net9.0-windows10.0.19041.0`,
- `net10.0-windows10.0.19041.0`, `net11.0-windows10.0.19041.0`.
-- `ReactiveUI.Primitives.Blazor`: `net8.0`, `net9.0`, `net10.0`, `net11.0`.
-- `ReactiveUI.Primitives.Maui`: `net9.0`, `net10.0`, `net11.0`.
+- `ReactiveUI.Primitives.Reactive`: the same matrix as `ReactiveUI.Primitives`, compiled with System.Reactive `Unit` and
+ `IScheduler` aliases.
+- `ReactiveUI.Primitives.Wpf` and `ReactiveUI.Primitives.Wpf.Reactive`: `net8.0-windows`, `net9.0-windows`,
+ `net10.0-windows`, `net11.0-windows`, `net462`, `net472`, `net48`, `net481`.
+- `ReactiveUI.Primitives.WinForms` and `ReactiveUI.Primitives.WinForms.Reactive`: `net8.0-windows`,
+ `net9.0-windows`, `net10.0-windows`, `net11.0-windows`, `net462`, `net472`, `net48`, `net481`.
+- `ReactiveUI.Primitives.WinUI` and `ReactiveUI.Primitives.WinUI.Reactive`: `net8.0-windows10.0.19041.0`,
+ `net9.0-windows10.0.19041.0`, `net10.0-windows10.0.19041.0`, `net11.0-windows10.0.19041.0`.
+- `ReactiveUI.Primitives.Blazor` and `ReactiveUI.Primitives.Blazor.Reactive`: `net8.0`, `net9.0`, `net10.0`,
+ `net11.0`.
+- `ReactiveUI.Primitives.Maui` and `ReactiveUI.Primitives.Maui.Reactive`: `net9.0`, `net10.0`, `net11.0`.
Runtime package dependencies are intentionally small. The default production packages do not depend on System.Reactive,
R3, or R3Async. `ReactiveUI.Primitives` references `ReactiveUI.Disposables`, `ReactiveUI.Primitives.Core`, and the
@@ -314,9 +333,10 @@ System.ComponentModel.Annotations, System.Buffers, System.Memory, and System.Col
package also embeds `ReactiveUI.Primitives.R3Bridge.Generator.dll` so R3/R3Async bridge methods can be generated in
consuming projects that already reference those external libraries.
-`ReactiveUI.Primitives.Blazor` references `Microsoft.AspNetCore.Components`, `ReactiveUI.Primitives.Maui` references
-`Microsoft.Maui.Core` and Microsoft.Extensions infrastructure packages, and `ReactiveUI.Primitives.WinUI` references
-`Microsoft.WindowsAppSDK`. The remaining shared package references are analyzer, SourceLink, versioning, ILLink,
+`ReactiveUI.Primitives.Blazor` and `ReactiveUI.Primitives.Blazor.Reactive` reference `Microsoft.AspNetCore.Components`.
+`ReactiveUI.Primitives.Maui` and `ReactiveUI.Primitives.Maui.Reactive` reference `Microsoft.Maui.Core` and
+Microsoft.Extensions infrastructure packages. `ReactiveUI.Primitives.WinUI` and `ReactiveUI.Primitives.WinUI.Reactive`
+reference `Microsoft.WindowsAppSDK`. The remaining shared package references are analyzer, SourceLink, versioning, ILLink,
reference-assembly, or build-time support packages such as Blazor.Common.Analyzers, Microsoft.SourceLink.GitHub, MinVer,
Roslynator.Analyzers, SonarAnalyzer.CSharp, StyleSharp.Analyzers, Microsoft.NET.ILLink.Tasks, and
Microsoft.NETFramework.ReferenceAssemblies. Benchmark projects may reference System.Reactive,
@@ -584,6 +604,7 @@ using var subscription = labels.Subscribe(Console.WriteLine);
| filter-null + project + switch to latest inner | `SwitchSelect` |
| pairwise zip | `Pair` |
| latest-value combination | `SyncLatest` |
+| System.Reactive-named latest combination | `CombineLatest` |
| combine left emission with latest right value | `Latch` |
| latest-fusion alias | `PairLatest`, `FuseLatest` |
| last values after both complete | `ForkJoin` |
@@ -620,6 +641,31 @@ width.Value = 800;
height.Value = 600;
```
+`SyncLatest` and the System.Reactive-named `CombineLatest` overloads support multi-source projections up to 16 total
+sources. The `.Reactive` package variants expose the same overloads with `System.Reactive.Unit` and `IScheduler`
+conventions, which keeps migrated Rx code using familiar `CombineLatest` names while running on the Primitives
+implementation.
+
+Multi-source latest example:
+
+```csharp
+using ReactiveUI.Primitives;
+using ReactiveUI.Primitives.Signals;
+
+var first = new StateSignal(1);
+var second = new StateSignal(2);
+var third = new StateSignal(3);
+
+using var total = first
+ .SyncLatest(second, third, static (a, b, c) => a + b + c)
+ .Subscribe(value => Console.WriteLine($"total={value}"));
+
+third.Value = 10;
+```
+
+The Rx-name `SelectMany` observable overloads keep concurrent merge semantics. Use `FlatMap` or `Bind` when you want the
+Primitives name, and use `SelectMany` when porting existing Rx code or keeping LINQ query syntax.
+
Fused projection example (`Choose` and `SwitchSelect`):
```csharp
@@ -863,6 +909,9 @@ using IDisposable subscription = names.Subscribe(Console.WriteLine);
The Extensions project is intended for applications that already use the helper operators from `ReactiveUI.Extensions`
and want the same shapes without pulling System.Reactive or R3 into the production dependency graph.
+`Filter(string pattern)` creates a regex with a 30-second match timeout so ordinary filters remain stable under
+instrumented CI runs while still protecting against runaway patterns. Use `Filter(Regex regex)` when a caller-specified
+regex timeout or options set must be preserved exactly.
## Stateful signals and subject-like types
@@ -1100,6 +1149,101 @@ When a project must keep System.Reactive `Unit` or `IScheduler` in its public su
`ReactiveUI.Primitives.Extensions.Reactive`. When the goal is to migrate away from those public System.Reactive types,
use the lean packages and the mappings below.
+### Migration track: existing `xyz` project
+
+Use this track when the project should eventually stop exposing System.Reactive types and use the lean
+ReactiveUI.Primitives package family.
+
+1. Inventory references and public API. Mark each project that exposes `System.Reactive.Unit`, `IScheduler`,
+ `IObservable` extension methods, UI schedulers, `Subject` types, or ReactiveUI.Extensions helpers.
+2. Add the lean packages needed by the existing project:
+
+```bash
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Extensions
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Async
+```
+
+3. Add only the matching UI integration package when the project owns UI-thread dispatch:
+
+```bash
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Wpf
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.WinForms
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.WinUI
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Blazor
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Maui
+```
+
+4. Convert boundary types deliberately: `System.Reactive.Unit` to `RxVoid`, `IScheduler` to `ISequencer`, Rx subjects to
+ `Signal`, `StateSignal`, `ReplaySignal`, or `FinalSignal`, and composite disposable types to
+ `MultipleDisposable`, `Pocket`, `Slot`, or `AssignmentSlot`.
+5. Keep code compiling during the first pass by using the Rx-name compatibility layer (`Select`, `Where`, `Aggregate`,
+ `Scan`, `Merge`, `Concat`, `CombineLatest`, `SelectMany`, and related aliases). Then move hot paths to Primitives
+ names (`Map`, `Keep`, `Reduce`, `Fold`, `Blend`, `Chain`, `SyncLatest`, `FlatMap`) where that makes the code clearer.
+6. Replace scheduler construction and tests: use `Sequencer.Immediate`, `Sequencer.CurrentThread`,
+ `ThreadPoolSequencer.Instance`, `TaskPoolSequencer.Instance`, UI sequencers, and `VirtualClock`.
+7. Remove `System.Reactive` and `ReactiveUI.Extensions` package references only after the project builds without
+ `System.Reactive.Linq`, `System.Reactive.Subjects`, `System.Reactive.Disposables`, or
+ `System.Reactive.Concurrency` imports.
+8. Run tests and package/API approval checks. For time-sensitive tests, use virtual time rather than real sleeps.
+
+### Migration track: new `xyz.Reactive` project
+
+Use this track when an existing Rx-based source base must remain source-compatible for consumers while the repository
+moves implementation work onto ReactiveUI.Primitives. The pattern is to keep or create a `xyz` lean package and add a
+new `xyz.Reactive` package that references the `.Reactive` Primitives range.
+
+1. Move shared implementation files into a shared source folder that can be linked by both projects.
+2. In shared source, use the neutral identifiers `RxVoid` and `ISequencer`. In the lean project they bind to
+ ReactiveUI.Primitives types; in the `.Reactive` project they bind to `System.Reactive.Unit` and
+ `System.Reactive.Concurrency.IScheduler`.
+3. Gate namespaces when the public namespace must differ:
+
+```csharp
+#if REACTIVE_SHIM
+namespace xyz.Reactive;
+#else
+namespace xyz;
+#endif
+```
+
+4. Reference the `.Reactive` packages from `xyz.Reactive`:
+
+```bash
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Extensions.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Async.Reactive
+```
+
+5. Add the matching reactive UI package only when the project exposes UI scheduling:
+
+```bash
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Wpf.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.WinForms.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.WinUI.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Blazor.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Maui.Reactive
+```
+
+6. Configure the reactive project to define `REACTIVE_SHIM` and alias the System.Reactive types if your repository does
+ not already centralize this in `Directory.Build.props`:
+
+```xml
+
+ $(DefineConstants);REACTIVE_SHIM
+
+
+
+
+
+```
+
+7. Prefer zero source changes in the first `xyz.Reactive` pass: keep Rx names such as `Select`, `Where`, `SelectMany`,
+ `CombineLatest`, `Merge`, `Concat`, `Throttle`, and `WithLatestFrom` where compatibility matters. The `.Reactive`
+ Primitives packages supply those names over the Primitives implementation.
+8. Build both packages side by side. `xyz` should have no System.Reactive runtime dependency; `xyz.Reactive` should keep
+ System.Reactive-facing APIs for existing consumers.
+
### Factory mapping
| System.Reactive | ReactiveUI.Primitives | Notes |
@@ -1139,7 +1283,7 @@ use the lean packages and the mappings below.
|----------------------------------|--------------------------------------------------------|-----------------------------------------------------------------------------------------------------|
| `Select` | `Map` | Prefer `Map` for distinct Primitives style. |
| `Where` | `Keep` | Predicate filtering. |
-| `SelectMany` | `FlatMap` or `Bind` | `Bind` is a Primitives alias for flat mapping. |
+| `SelectMany` | `FlatMap`, `Bind`, or Rx-name `SelectMany` | Observable overloads preserve concurrent merge semantics; enumerable overloads flatten inline. |
| `Aggregate` | `Reduce` | Emits final accumulated value on completion. |
| `Scan` | `Fold` | Emits every accumulated value. |
| `Do` | `Tap` | Side effect while preserving values. |
@@ -1158,7 +1302,7 @@ use the lean packages and the mappings below.
| `Switch` | `SwitchTo` | Latest inner observable wins. |
| `Select` + `Switch` | `SwitchSelect` | Filters null source values, projects each to an inner observable, and mirrors only the latest. |
| `Zip` | `Pair` or `Signal.Pair` | Pair values by index. |
-| `CombineLatest` | `SyncLatest` or `Signal.SyncLatest` | Latest values after both sources have emitted. |
+| `CombineLatest` | `SyncLatest`, Rx-name `CombineLatest`, or `Signal.SyncLatest` | Latest values after all sources have emitted; overloads support up to 16 total sources. |
| `WithLatestFrom` | `Latch` | Left emission paired with latest right value. |
| `ForkJoin` | `ForkJoin` | Last values after completion. |
| `Throttle` | `Calm` / `Stabilize` | Quiet-period emission. |
@@ -1575,10 +1719,15 @@ duration is indistinguishable from empty method overhead; the benchmark run stil
| `src/ReactiveUI.Primitives.Extensions` | Migrated non-async ReactiveUI.Extensions helper operators backed by lean Primitives. |
| `src/ReactiveUI.Primitives.Extensions.Reactive` | System.Reactive-flavoured Extensions helper leaf. |
| `src/ReactiveUI.Primitives.Wpf` | Optional WPF dispatcher integration library. |
+| `src/ReactiveUI.Primitives.Wpf.Reactive` | Optional WPF dispatcher scheduler integration library for System.Reactive consumers. |
| `src/ReactiveUI.Primitives.WinForms` | Optional Windows Forms control integration library. |
+| `src/ReactiveUI.Primitives.WinForms.Reactive` | Optional Windows Forms control scheduler integration library for System.Reactive consumers. |
| `src/ReactiveUI.Primitives.WinUI` | Optional WinUI dispatcher queue integration library. |
+| `src/ReactiveUI.Primitives.WinUI.Reactive` | Optional WinUI dispatcher queue scheduler integration library for System.Reactive consumers. |
| `src/ReactiveUI.Primitives.Blazor` | Optional Blazor renderer integration library. |
+| `src/ReactiveUI.Primitives.Blazor.Reactive` | Optional Blazor renderer scheduler integration library for System.Reactive consumers. |
| `src/ReactiveUI.Primitives.Maui` | Optional MAUI dispatcher integration library. |
+| `src/ReactiveUI.Primitives.Maui.Reactive` | Optional MAUI dispatcher scheduler integration library for System.Reactive consumers. |
| `src/ReactiveUI.Primitives.R3Bridge.Generator` | Non-packable analyzer project embedded by the lean base and async packages for R3 bridges. |
| `src/Primitives.Shared` | Linked lean/Reactive synchronous source. |
| `src/Primitives.Async.Shared` | Linked lean/Reactive async source. |
diff --git a/Skill.md b/Skill.md
index 57408aa7..16ab7c4f 100644
--- a/Skill.md
+++ b/Skill.md
@@ -1,6 +1,6 @@
---
name: reactiveui-primitives
-description: Use when working with ReactiveUI.Primitives NuGet packages in .NET projects, including ReactiveUI.Disposables, ReactiveUI.Primitives.Core, ReactiveUI.Primitives, ReactiveUI.Primitives.Reactive, ReactiveUI.Primitives.Async.Core, ReactiveUI.Primitives.Async, ReactiveUI.Primitives.Async.Reactive, ReactiveUI.Primitives.Extensions.Core, ReactiveUI.Primitives.Extensions, ReactiveUI.Primitives.Extensions.Reactive, ReactiveUI.Primitives.Wpf, ReactiveUI.Primitives.WinForms, ReactiveUI.Primitives.WinUI, ReactiveUI.Primitives.Blazor, or ReactiveUI.Primitives.Maui; choosing Core vs lean vs System.Reactive package variants; using IObservable, IObservableAsync, signals, sequencers, disposable helpers, UI dispatch adapters, R3/R3Async generated bridges, or migration guidance from System.Reactive/R3.
+description: Use when working with ReactiveUI.Primitives NuGet packages in .NET projects, including ReactiveUI.Disposables, ReactiveUI.Primitives.Core, ReactiveUI.Primitives, ReactiveUI.Primitives.Reactive, ReactiveUI.Primitives.Async.Core, ReactiveUI.Primitives.Async, ReactiveUI.Primitives.Async.Reactive, ReactiveUI.Primitives.Extensions.Core, ReactiveUI.Primitives.Extensions, ReactiveUI.Primitives.Extensions.Reactive, ReactiveUI.Primitives.Wpf, ReactiveUI.Primitives.Wpf.Reactive, ReactiveUI.Primitives.WinForms, ReactiveUI.Primitives.WinForms.Reactive, ReactiveUI.Primitives.WinUI, ReactiveUI.Primitives.WinUI.Reactive, ReactiveUI.Primitives.Blazor, ReactiveUI.Primitives.Blazor.Reactive, ReactiveUI.Primitives.Maui, or ReactiveUI.Primitives.Maui.Reactive; choosing Core vs lean vs System.Reactive package variants; using IObservable, IObservableAsync, signals, sequencers, disposable helpers, UI dispatch adapters, R3/R3Async generated bridges, or migration guidance from System.Reactive/R3/R3Async repositories to Primitives or .Reactive package variants.
---
# ReactiveUI.Primitives
@@ -24,10 +24,15 @@ Default to the non-Core leaf packages for applications. Choose Core packages onl
| `ReactiveUI.Primitives.Extensions` | The app needs convenience operators over lean BCL `IObservable` pipelines. | `ReactiveUI.Primitives.Extensions`; `ReactiveExtensions`, `ObservableSubscriptionExtensions`, buffering, debounce/throttle, stale detection, retry/backoff, heartbeat, observe-on helpers, pairwise/partition, `ToHotTask`, `ToHotValueTask`, `SubscribeAsync`, `WaitUntil`, `AsSignal`. Depends on `ReactiveUI.Primitives` and `ReactiveUI.Primitives.Extensions.Core`. |
| `ReactiveUI.Primitives.Extensions.Reactive` | The app needs the Extensions surface in System.Reactive-first code. | Same extension family as `ReactiveUI.Primitives.Extensions`, recompiled under `.Reactive` namespaces and `System.Reactive` scheduler/unit conventions. Depends on `ReactiveUI.Primitives.Reactive` and `ReactiveUI.Primitives.Extensions.Core`. |
| `ReactiveUI.Primitives.Wpf` | WPF UI code needs dispatcher marshalling. | `ReactiveUI.Primitives.Concurrency.DispatcherSequencer`. Depends on `ReactiveUI.Primitives`. |
+| `ReactiveUI.Primitives.Wpf.Reactive` | WPF UI code is System.Reactive-first and needs dispatcher scheduling. | `ReactiveUI.Primitives.Reactive.Concurrency.DispatcherSequencer` implements System.Reactive scheduling conventions. Depends on `ReactiveUI.Primitives.Reactive`. |
| `ReactiveUI.Primitives.WinForms` | Windows Forms UI code needs control-thread marshalling. | `ReactiveUI.Primitives.Concurrency.ControlSequencer`. Depends on `ReactiveUI.Primitives`. |
+| `ReactiveUI.Primitives.WinForms.Reactive` | Windows Forms UI code is System.Reactive-first and needs control-thread scheduling. | `ReactiveUI.Primitives.Reactive.Concurrency.ControlSequencer`. Depends on `ReactiveUI.Primitives.Reactive`. |
| `ReactiveUI.Primitives.WinUI` | WinUI code needs `DispatcherQueue` marshalling. | `ReactiveUI.Primitives.Concurrency.DispatcherQueueSequencer` and `DispatcherQueueSequencerExtensions.ToSequencer()`. Depends on `ReactiveUI.Primitives` and `Microsoft.WindowsAppSDK`. |
+| `ReactiveUI.Primitives.WinUI.Reactive` | WinUI code is System.Reactive-first and needs `DispatcherQueue` scheduling. | `ReactiveUI.Primitives.Reactive.Concurrency.DispatcherQueueSequencer` and `DispatcherQueueSequencerExtensions.ToSequencer()`. Depends on `ReactiveUI.Primitives.Reactive` and `Microsoft.WindowsAppSDK`. |
| `ReactiveUI.Primitives.Blazor` | Blazor components need render-thread sequencing and component-bound subscriptions. | `ReactiveUI.Primitives.Blazor.Components.ReactiveComponentBase`, `Observe`, `Track`, `InvalidateAsync`, `ReactiveUI.Primitives.Blazor.Concurrency.BlazorRendererSequencer`. Depends on `ReactiveUI.Primitives` and `Microsoft.AspNetCore.Components`. |
+| `ReactiveUI.Primitives.Blazor.Reactive` | Blazor components are System.Reactive-first and need render-thread scheduling. | `ReactiveUI.Primitives.Blazor.Reactive.Components.ReactiveComponentBase` and `ReactiveUI.Primitives.Blazor.Reactive.Concurrency.BlazorRendererSequencer`. Depends on `ReactiveUI.Primitives.Reactive` and `Microsoft.AspNetCore.Components`. |
| `ReactiveUI.Primitives.Maui` | .NET MAUI code needs dispatcher marshalling. | `ReactiveUI.Primitives.Concurrency.MauiDispatcherSequencer` and `MauiDispatcherSequencerExtensions.ToSequencer()`. Depends on `ReactiveUI.Primitives` and `Microsoft.Maui.Core`. |
+| `ReactiveUI.Primitives.Maui.Reactive` | .NET MAUI code is System.Reactive-first and needs dispatcher scheduling. | `ReactiveUI.Primitives.Reactive.Concurrency.MauiDispatcherSequencer` and `MauiDispatcherSequencerExtensions.ToSequencer()`. Depends on `ReactiveUI.Primitives.Reactive` and `Microsoft.Maui.Core`. |
Install examples:
@@ -52,6 +57,7 @@ Use `.Reactive` packages when the project already uses System.Reactive idioms:
dotnet add package ReactiveUI.Primitives.Reactive
dotnet add package ReactiveUI.Primitives.Async.Reactive
dotnet add package ReactiveUI.Primitives.Extensions.Reactive
+dotnet add package ReactiveUI.Primitives.Wpf.Reactive
```
Do not add `ReactiveUI.Primitives.R3Bridge.Generator` as a package. It is a non-packable Roslyn component whose output is packed as an analyzer by `ReactiveUI.Primitives` and `ReactiveUI.Primitives.Async`.
@@ -61,8 +67,8 @@ Do not add `ReactiveUI.Primitives.R3Bridge.Generator` as a package. It is a non-
- Prefer `ReactiveUI.Primitives` for new app code that uses BCL `IObservable`.
- Prefer `ReactiveUI.Primitives.Async` when subscription, notification, or completion work must be asynchronous.
- Prefer `ReactiveUI.Primitives.Extensions` for migrated helper operators from ReactiveUI.Extensions-style code.
-- Prefer UI packages only in the matching UI framework.
-- Prefer `.Reactive` variants only when public APIs should expose `System.Reactive.Unit`, `IScheduler`, or `.Reactive` namespaces.
+- Prefer UI packages only in the matching UI framework; use the lean UI package for `ISequencer` and the `.Reactive` UI package for `IScheduler`.
+- Prefer `.Reactive` variants when public APIs should expose `System.Reactive.Unit`, `IScheduler`, `.Reactive` namespaces, or existing Rx source should compile with minimal code changes.
- Prefer `.Core` variants only when composing packages or minimizing a library dependency layer. Most apps should reference the leaf package.
- Avoid mixing lean and `.Reactive` variants in the same pipeline without an explicit boundary; their namespaces and scheduler/unit conventions differ.
- Do not add `System.Reactive` just to use core Primitives. The `.Reactive` packages add it for System.Reactive-first projects.
@@ -105,6 +111,8 @@ using ReactiveUI.Primitives.Reactive.Signals;
using ReactiveUI.Primitives.Extensions.Reactive;
```
+Reactive UI packages mostly expose sequencers under `ReactiveUI.Primitives.Reactive.Concurrency`. Blazor also exposes component helpers under `ReactiveUI.Primitives.Blazor.Reactive.Components`.
+
R3 generated bridges:
```csharp
@@ -118,11 +126,95 @@ Use these landmarks to choose APIs quickly; rely on IntelliSense/PublicAPI for e
- Signals and factories: `Signal`, `Signal`, `BehaviorSignal`, `StateSignal`, `ReplaySignal`, `ScheduledSignal`, `PrioritySemaphoreSignal`, `AnonymousSignal`, `ConnectableSignal`.
- State and commands: `ReadOnlyState`, `CommandSignal`, `CommandExecution`, `TaskSignal`, `TaskSignal`.
- Sequencing: `ISequencer`, `CurrentThreadSequencer`, `ImmediateSequencer`, `SynchronizationContextSequencer`, `TaskPoolSequencer`, `ThreadPoolSequencer`, `VirtualTimeSequencer`, `VirtualClock`.
-- Operators: `Map`, `FlatMap`, `SelectMany`, `Where`, `Keep`, `KeepNotNull`, `Cast`, `Concat`, `Merge`, `Amb`, `Race`, `Switch`, `Retry`, `Recover`, `Rescue`, `Distinct`, `Take`, `Skip`, `Collect`, `Materialize`, `Dematerialize`, `ForkJoin`, `Pair`, `Latch`, `Synchronize`, `Timeout`, `Shift`, `Spark`, `Unspark`.
+- Operators: `Map`, `FlatMap`, `Bind`, `SelectMany`, `Where`, `Keep`, `KeepNotNull`, `Cast`, `Concat`, `Merge`, `Amb`, `Race`, `Switch`, `Retry`, `Recover`, `Rescue`, `Distinct`, `Take`, `Skip`, `Collect`, `Materialize`, `Dematerialize`, `ForkJoin`, `Pair`, `Latch`, `SyncLatest`, `CombineLatest`, `Synchronize`, `Timeout`, `Shift`, `Spark`, `Unspark`. `SyncLatest` and Rx-name `CombineLatest` support up to 16 total sources; Rx-name `SelectMany` observable overloads preserve concurrent merge semantics.
- Connectable helpers: `Replay`, `ReplayLive`, `Share`, `ShareLatest`, `AutoConnect`, `AutoShare`.
- Async core: `IObservableAsync`, `IObserverAsync`, `SignalAsync`, `SignalAsync`, `WitnessAsync`, `ConnectableSignalAsync`, `ConcurrentWitnessCallsException`.
- Async factories/operators: `SignalAsync.Create`, `Return`, `Range`, `FromAsync`, `FromAsyncEnumerable`, `Every`, `Interval`, `Timer`, `Using`, `Blend`, `Chain`, `Map`, `Merge`, `CombineLatest`, `Switch`, `ForEachAsync`, `Collect*Async`, `WaitCompletionAsync`, `OnErrorResumeAsFailure`.
-- Extension helpers: `AsSignal`, `BufferUntil`, `BufferUntilIdle`, `DebounceImmediate`, `DebounceUntil`, `DetectStale`, `Heartbeat`, `ObserveOnSafe`, `ObserveOnIf`, `Pairwise`, `Partition`, `ReplayLastOnSubscribe`, `RetryWithBackoff`, `RetryWithDelay`, `RetryForeverWithDelay`, `RunAll`, `Shuffle`, `SwitchIfEmpty`, `Throttle*`, `ToHotTask`, `ToHotValueTask`, `WaitUntil`.
+- Extension helpers: `AsSignal`, `BufferUntil`, `BufferUntilIdle`, `DebounceImmediate`, `DebounceUntil`, `DetectStale`, `Filter`, `Heartbeat`, `ObserveOnSafe`, `ObserveOnIf`, `Pairwise`, `Partition`, `ReplayLastOnSubscribe`, `RetryWithBackoff`, `RetryWithDelay`, `RetryForeverWithDelay`, `RunAll`, `Shuffle`, `SwitchIfEmpty`, `Throttle*`, `ToHotTask`, `ToHotValueTask`, `WaitUntil`. `Filter(string)` uses a 30-second regex timeout; `Filter(Regex)` preserves caller-supplied options and timeout.
+
+## Rx Migration Process
+
+Use this process when moving an Rx-based repository to ReactiveUI.Primitives. Decide first whether the immediate target is a lean Primitives project, a System.Reactive-compatible `.Reactive` project, or both.
+
+### Existing `xyz` Project To Lean Primitives
+
+Use this path when the existing project should stop exposing System.Reactive `Unit`, `IScheduler`, and package dependencies.
+
+1. Inventory package references and public APIs. Find `System.Reactive`, `System.Reactive.Linq`, `System.Reactive.Subjects`, `System.Reactive.Disposables`, `System.Reactive.Concurrency`, `ReactiveUI.Extensions`, `Unit`, `IScheduler`, `Subject`, `BehaviorSubject`, `ReplaySubject`, and `TestScheduler`.
+2. Add the smallest lean package set:
+
+```bash
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Extensions
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Async
+```
+
+3. Add only the matching lean UI package when UI dispatch is used:
+
+```bash
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Wpf
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.WinForms
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.WinUI
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Blazor
+dotnet add xyz/xyz.csproj package ReactiveUI.Primitives.Maui
+```
+
+4. Convert types at boundaries: `Unit` -> `RxVoid`, `IScheduler` -> `ISequencer`, `Subject` -> `Signal`, `BehaviorSubject` -> `StateSignal`, `ReplaySubject` -> `ReplaySignal`, `AsyncSubject` -> `FinalSignal`, `CompositeDisposable` -> `MultipleDisposable` or `Pocket`, `SerialDisposable` -> `SingleReplaceableDisposable` or `Slot`.
+5. Convert factories: `Observable.Return` -> `Signal.Emit`, `Empty` -> `Signal.None`, `Never` -> `Signal.Silent`, `Throw` -> `Signal.Fail`, `Range` -> `Signal.Sequence`, `Defer` -> `Signal.Lazy`, `Timer` -> `Signal.After`, `Interval` -> `Signal.Pulse` or `Signal.Every`, `Create` -> `Signal.Create` or `Signal.CreateSafe`.
+6. Keep code compiling with Rx-name compatibility aliases where useful: `Select`, `Where`, `Aggregate`, `Scan`, `Merge`, `Concat`, `SelectMany`, `CombineLatest`, `WithLatestFrom`, `Throttle`, and related names exist on the Primitives surface.
+7. Prefer Primitives names in new or hot-path code: `Map`, `Keep`, `Reduce`, `Fold`, `Blend`, `Chain`, `FlatMap`/`Bind`, `SyncLatest`, `Latch`, `Calm`/`Stabilize`, `Probe`, `Shift`, `Expire`.
+8. Replace schedulers with `Sequencer.Immediate`, `Sequencer.CurrentThread`, `ThreadPoolSequencer.Instance`, `TaskPoolSequencer.Instance`, `SynchronizationContextSequencer`, UI sequencers, or `VirtualClock` for tests.
+9. Remove System.Reactive and ReactiveUI.Extensions package references only after imports and public APIs no longer require them. Keep bridge conversions at package edges.
+
+### New `xyz.Reactive` Project Consuming `.Reactive` Packages
+
+Use this path when an Rx-based source base should keep System.Reactive-facing APIs for existing consumers while sharing implementation with a lean `xyz` package.
+
+1. Create `xyz.Reactive` as a sibling project to `xyz`. Link or share implementation files instead of copying logic.
+2. In shared implementation code, use neutral identifiers `RxVoid` and `ISequencer`. The lean project binds them to Primitives types; the reactive project aliases them to `System.Reactive.Unit` and `System.Reactive.Concurrency.IScheduler`.
+3. Gate namespaces when package namespaces differ:
+
+```csharp
+#if REACTIVE_SHIM
+namespace xyz.Reactive;
+#else
+namespace xyz;
+#endif
+```
+
+4. Add the `.Reactive` package set that matches the source surface:
+
+```bash
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Extensions.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Async.Reactive
+```
+
+5. Add only the matching `.Reactive` UI package when the project exposes UI scheduling:
+
+```bash
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Wpf.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.WinForms.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.WinUI.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Blazor.Reactive
+dotnet add xyz.Reactive/xyz.Reactive.csproj package ReactiveUI.Primitives.Maui.Reactive
+```
+
+6. If the repository does not centralize this already, configure `xyz.Reactive`:
+
+```xml
+
+ $(DefineConstants);REACTIVE_SHIM
+
+
+
+
+
+```
+
+7. Preserve Rx source compatibility first. Keep names such as `Select`, `Where`, `SelectMany`, `CombineLatest`, `Merge`, `Concat`, `Throttle`, and `WithLatestFrom` when that avoids unnecessary code churn. The `.Reactive` packages provide those names over the Primitives implementation.
+8. Build `xyz` and `xyz.Reactive` side by side. The lean `xyz` package should avoid System.Reactive runtime dependencies; `xyz.Reactive` intentionally references System.Reactive and exposes `Unit`/`IScheduler`.
+9. Update tests to run both package variants. Prefer TUnit/Microsoft Testing Platform in this repository and use only TUnit assertions.
## R3 And R3Async Bridges
@@ -148,11 +240,11 @@ Use bridge methods only at boundaries. Keep internal pipelines in one model afte
## Framework And Platform Notes
- General libraries target `net8.0`, `net9.0`, `net10.0`, `net11.0`, `net462`, `net472`, `net48`, and `net481`.
-- `ReactiveUI.Primitives` also builds Android TFMs and Apple TFMs for platform sequencers in the base package.
-- WPF and WinForms packages target Windows TFMs plus .NET Framework.
-- WinUI targets `net*-windows10.0.19041.0`.
-- MAUI targets `net9.0`, `net10.0`, and `net11.0`.
-- Blazor targets the modern .NET TFMs.
+- `ReactiveUI.Primitives` and `ReactiveUI.Primitives.Reactive` also build Android TFMs and Apple TFMs for platform sequencers in the base package family.
+- WPF and WinForms packages, including `.Reactive` variants, target Windows TFMs plus .NET Framework.
+- WinUI packages, including `.Reactive` variants, target `net*-windows10.0.19041.0`.
+- MAUI packages, including `.Reactive` variants, target `net9.0`, `net10.0`, and `net11.0`.
+- Blazor packages, including `.Reactive` variants, target the modern .NET TFMs.
## Repository Maintenance
@@ -162,5 +254,5 @@ When editing this repository:
- Build and test from `src`.
- Tests use Microsoft.Testing.Platform with TUnit; write TUnit tests and TUnit assertions only.
- Shipping public API baselines live under each package's `PublicAPI//PublicAPI.Shipped.txt` and `PublicAPI.Unshipped.txt`.
-- `Skill.md` is the canonical skill file. `ReactiveUI.Primitives.csproj` packs it both at package root as `Skill.md` and at `.agents/skills/reactiveui-primitives/SKILL.md`.
+- `Skill.md` is the canonical skill file. `ReactiveUI.Primitives.csproj` packs it both at package root as `Skill.md` and at `.agents/skills/reactiveui-primitives/SKILL.md`; keep this filename singular and casing intact.
- Keep package guidance synchronized with packable projects in the solution. Tests, benchmarks, and `ReactiveUI.Primitives.R3Bridge.Generator` are not NuGet packages to add directly.
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 0b909d63..82aa9213 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -96,6 +96,7 @@
+
diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index f57e8f25..0948536f 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -9,7 +9,7 @@
-
+
diff --git a/src/Polyfills/AllowNullAttribute.cs b/src/Polyfills/AllowNullAttribute.cs
new file mode 100644
index 00000000..8c890bb6
--- /dev/null
+++ b/src/Polyfills/AllowNullAttribute.cs
@@ -0,0 +1,14 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+// Polyfill implementation adapted from SimonCropp/Polyfill (https://github.com/SimonCropp/Polyfill).
+#if !NETCOREAPP3_0_OR_GREATER && !NETSTANDARD2_1_OR_GREATER
+namespace System.Diagnostics.CodeAnalysis;
+
+/// Specifies that is allowed as an input even if the corresponding type disallows it.
+[ExcludeFromCodeCoverage]
+[DebuggerNonUserCode]
+[AttributeUsage(AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.Property)]
+internal sealed class AllowNullAttribute : Attribute;
+#endif
diff --git a/src/Primitives.Extensions.Shared/ReactiveExtensions.cs b/src/Primitives.Extensions.Shared/ReactiveExtensions.cs
index e76662fd..66cb013c 100644
--- a/src/Primitives.Extensions.Shared/ReactiveExtensions.cs
+++ b/src/Primitives.Extensions.Shared/ReactiveExtensions.cs
@@ -22,6 +22,9 @@ public static class ReactiveExtensions
/// Default backoff factor for : each retry doubles the previous delay.
private const double DefaultBackoffFactor = 2.0;
+ /// Default match timeout for regex filters created from string patterns.
+ private static readonly TimeSpan DefaultRegexMatchTimeout = TimeSpan.FromSeconds(30);
+
/// Boolean reduction operators for a set of boolean observable sources.
/// The sources.
extension(IEnumerable> sources)
@@ -906,7 +909,7 @@ public IObservable BufferUntil(char startsWith, char endsWith) =>
/// Regex pattern.
/// Filtered sequence.
public IObservable Filter(string regexPattern) =>
- source.Filter(new Regex(regexPattern, RegexOptions.None, TimeSpan.FromSeconds(1)));
+ source.Filter(new Regex(regexPattern, RegexOptions.None, DefaultRegexMatchTimeout));
/// Filters strings by regex.
/// Regex.
diff --git a/src/Primitives.Shared/Advanced/AsyncSubscriptionLifetime.cs b/src/Primitives.Shared/Advanced/AsyncSubscriptionLifetime.cs
new file mode 100644
index 00000000..eb045fd8
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/AsyncSubscriptionLifetime.cs
@@ -0,0 +1,90 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Owns cancellation and the eventual inner disposable for asynchronous signal subscriptions.
+///
+/// Call exactly once after asynchronous setup has finished, even when setup faults or is canceled.
+///
+public sealed class AsyncSubscriptionLifetime : IDisposable
+{
+ /// The cancellation source passed to the asynchronous subscription.
+ private readonly CancellationTokenSource _cts = new();
+
+ /// The inner disposable returned by the asynchronous subscription.
+ private readonly SingleDisposable _subscription = new();
+
+ /// Non-zero once the outer subscription has been disposed.
+ private int _disposed;
+
+ /// Non-zero when disposal requested cancellation before the asynchronous subscription completed.
+ private int _canceled;
+
+ /// Non-zero once the asynchronous subscription task has completed.
+ private int _completed;
+
+ /// Gets the token supplied to the asynchronous subscription.
+ public CancellationToken Token => _cts.Token;
+
+ /// Gets a value indicating whether disposal requested cancellation before asynchronous setup completed.
+ public bool IsCancellationRequested => Volatile.Read(ref _canceled) != 0;
+
+ /// Assigns the disposable returned by asynchronous setup.
+ /// The returned disposable, or for an empty lifetime.
+ public void SetSubscription(IDisposable? disposable) =>
+ _subscription.Create(disposable ?? EmptyDisposable.Instance);
+
+ /// Marks asynchronous setup complete and releases the cancellation source when still owned here.
+ public void Complete()
+ {
+ Volatile.Write(ref _completed, 1);
+ if (Volatile.Read(ref _disposed) != 0)
+ {
+ return;
+ }
+
+ _cts.Dispose();
+ }
+
+ ///
+ public void Dispose()
+ {
+ if (Interlocked.Exchange(ref _disposed, 1) != 0)
+ {
+ return;
+ }
+
+ if (Volatile.Read(ref _completed) != 0)
+ {
+ _subscription.Dispose();
+ _cts.Dispose();
+ return;
+ }
+
+ Volatile.Write(ref _canceled, 1);
+ CancelIgnoringDisposed(_cts);
+ _subscription.Dispose();
+ _cts.Dispose();
+ }
+
+ /// Cancels the source while tolerating a concurrent completion disposing it first.
+ /// The cancellation source to cancel.
+ [System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
+ private static void CancelIgnoringDisposed(CancellationTokenSource cts)
+ {
+ try
+ {
+ cts.Cancel();
+ }
+ catch (ObjectDisposedException)
+ {
+ // Completion can release the CTS concurrently; disposal still continues with the inner subscription.
+ }
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/CurrentThreadRequirement.cs b/src/Primitives.Shared/Advanced/CurrentThreadRequirement.cs
new file mode 100644
index 00000000..8a833550
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/CurrentThreadRequirement.cs
@@ -0,0 +1,27 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Shared current-thread requirement checks for advanced signals.
+internal static class CurrentThreadRequirement
+{
+ /// Determines whether a source requires current-thread subscription.
+ /// The source value type.
+ /// The source observable.
+ /// when the source requires current-thread subscription.
+ public static bool IsRequired(IObservable source)
+ {
+ if (source is not IRequireCurrentThread currentThread)
+ {
+ return false;
+ }
+
+ return currentThread.IsRequiredSubscribeOnCurrentThread();
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/MapIndexedSignal{TSource,TResult}.cs b/src/Primitives.Shared/Advanced/MapIndexedSignal{TSource,TResult}.cs
new file mode 100644
index 00000000..0e9f7206
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/MapIndexedSignal{TSource,TResult}.cs
@@ -0,0 +1,42 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Indexed map signal.
+/// The source value type.
+/// The projected value type.
+public sealed class MapIndexedSignal : IRequireCurrentThread
+{
+ /// The source observable.
+ private readonly IObservable _source;
+
+ /// The indexed selector.
+ private readonly Func _selector;
+
+ /// Initializes a new instance of the class.
+ /// The source observable.
+ /// The indexed selector.
+ public MapIndexedSignal(IObservable source, Func selector)
+ {
+ _source = source;
+ _selector = selector;
+ }
+
+ ///
+ public bool IsRequiredSubscribeOnCurrentThread() =>
+ CurrentThreadRequirement.IsRequired(_source);
+
+ ///
+ public IDisposable Subscribe(IObserver observer)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(observer);
+
+ return _source.Subscribe(new MapIndexedWitness(observer, _selector));
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/MapIndexedWitness{TSource,TResult}.cs b/src/Primitives.Shared/Advanced/MapIndexedWitness{TSource,TResult}.cs
new file mode 100644
index 00000000..93c601a2
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/MapIndexedWitness{TSource,TResult}.cs
@@ -0,0 +1,82 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Applies an indexed selector to source values.
+/// The source value type.
+/// The projected value type.
+public sealed class MapIndexedWitness : IObserver
+{
+ /// The downstream observer.
+ private readonly IObserver _observer;
+
+ /// The indexed selector.
+ private readonly Func _selector;
+
+ /// The next zero-based index.
+ private int _index;
+
+ /// Whether a terminal notification has been forwarded.
+ private bool _stopped;
+
+ /// Initializes a new instance of the class.
+ /// The downstream observer.
+ /// The indexed selector.
+ public MapIndexedWitness(IObserver observer, Func selector)
+ {
+ _observer = observer;
+ _selector = selector;
+ }
+
+ ///
+ public void OnNext(TSource value)
+ {
+ if (_stopped)
+ {
+ return;
+ }
+
+ TResult result;
+ try
+ {
+ result = _selector(value, _index++);
+ }
+ catch (Exception error) when (!FatalExceptionHelper.IsFatal(error))
+ {
+ OnError(error);
+ return;
+ }
+
+ _observer.OnNext(result);
+ }
+
+ ///
+ public void OnError(Exception error)
+ {
+ if (_stopped)
+ {
+ return;
+ }
+
+ _stopped = true;
+ _observer.OnError(error);
+ }
+
+ ///
+ public void OnCompleted()
+ {
+ if (_stopped)
+ {
+ return;
+ }
+
+ _stopped = true;
+ _observer.OnCompleted();
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/MaxConcurrentBlendCoordinator{T}.cs b/src/Primitives.Shared/Advanced/MaxConcurrentBlendCoordinator{T}.cs
new file mode 100644
index 00000000..b05794c4
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/MaxConcurrentBlendCoordinator{T}.cs
@@ -0,0 +1,210 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Coordinates bounded-concurrency merging of enumerable observable sources.
+/// The value type.
+public sealed class MaxConcurrentBlendCoordinator : IDisposable
+{
+ /// Serializes enumeration, counters, and downstream callbacks.
+ private readonly Lock _gate = new();
+
+ /// Active subscriptions and enumerable lifetime.
+ private readonly MultipleDisposable _subscriptions = [];
+
+ /// The downstream observer.
+ private readonly IObserver _observer;
+
+ /// The source enumerator.
+ private IEnumerator>? _enumerator;
+
+ /// The number of active inner sources.
+ private int _active;
+
+ /// Whether all enumerable sources have been consumed.
+ private bool _enumerationCompleted;
+
+ /// Whether a terminal notification has been emitted.
+ private bool _done;
+
+ /// Initializes a new instance of the class.
+ /// The downstream observer.
+ public MaxConcurrentBlendCoordinator(IObserver observer) => _observer = observer;
+
+ ///
+ public void Dispose()
+ {
+ var enumerator = _enumerator;
+ _enumerator = null;
+ enumerator?.Dispose();
+ _subscriptions.Dispose();
+ }
+
+ /// Starts bounded-concurrency merging.
+ /// The enumerable sources.
+ /// The maximum number of active inner subscriptions.
+ /// The subscription cleanup.
+ public MaxConcurrentBlendCoordinator Run(IEnumerable> sources, int maxConcurrent)
+ {
+ _enumerator = sources.GetEnumerator();
+
+ for (var i = 0; i < maxConcurrent; i++)
+ {
+ if (!SubscribeNext())
+ {
+ break;
+ }
+ }
+
+ return this;
+ }
+
+ /// Subscribes to the next enumerable source when one is available.
+ /// when a new inner source was subscribed.
+ private bool SubscribeNext()
+ {
+ var next = TakeNextSource(out var failed);
+ if (failed)
+ {
+ Dispose();
+ }
+
+ if (next is null)
+ {
+ return false;
+ }
+
+ OnceDisposable inner = new();
+ _subscriptions.Add(inner);
+ inner.Disposable = next.Subscribe(OnInnerNext, OnAnyError, () => OnInnerCompleted(inner));
+ return true;
+ }
+
+ /// Reads the next source from the enumerable under the gate.
+ /// Set to when reading the next source failed.
+ /// The next source, or when no source should be subscribed.
+ private IObservable? TakeNextSource(out bool failed)
+ {
+ failed = false;
+ lock (_gate)
+ {
+ if (_done || _enumerationCompleted)
+ {
+ return null;
+ }
+
+ var enumerator = _enumerator;
+ try
+ {
+ if (enumerator?.MoveNext() != true)
+ {
+ _enumerationCompleted = true;
+ DisposeEnumerator();
+ TryCompleteCore();
+ return null;
+ }
+ }
+ catch (Exception error) when (!FatalExceptionHelper.IsFatal(error))
+ {
+ FailCore(error);
+ failed = true;
+ return null;
+ }
+
+ var next = enumerator!.Current;
+ if (next is null)
+ {
+ FailCore(new InvalidOperationException("Blend source contained null."));
+ failed = true;
+ return null;
+ }
+
+ _active++;
+ return next;
+ }
+ }
+
+ /// Forwards an inner value under the serialization gate.
+ /// The value to forward.
+ private void OnInnerNext(T value)
+ {
+ lock (_gate)
+ {
+ if (!_done)
+ {
+ _observer.OnNext(value);
+ }
+ }
+ }
+
+ /// Forwards the first terminal error and releases active subscriptions.
+ /// The error to forward.
+ private void OnAnyError(Exception error)
+ {
+ lock (_gate)
+ {
+ if (_done)
+ {
+ return;
+ }
+
+ FailCore(error);
+ }
+
+ Dispose();
+ }
+
+ /// Completes one inner source and starts another if possible.
+ /// The completed inner subscription.
+ private void OnInnerCompleted(OnceDisposable inner)
+ {
+ _subscriptions.Remove(inner);
+
+ lock (_gate)
+ {
+ if (_done)
+ {
+ return;
+ }
+
+ _active--;
+ TryCompleteCore();
+ }
+
+ SubscribeNext();
+ }
+
+ /// Marks the coordinator failed and forwards the error. Caller must hold the gate.
+ /// The terminal error.
+ private void FailCore(Exception error)
+ {
+ _done = true;
+ _observer.OnError(error);
+ }
+
+ /// Completes downstream once enumeration and all active sources have completed. Caller must hold the gate.
+ private void TryCompleteCore()
+ {
+ if (_done || !_enumerationCompleted || _active != 0)
+ {
+ return;
+ }
+
+ _done = true;
+ _observer.OnCompleted();
+ }
+
+ /// Disposes the enumerable source exactly once.
+ private void DisposeEnumerator()
+ {
+ var enumerator = _enumerator;
+ _enumerator = null;
+ enumerator?.Dispose();
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/MaxConcurrentEnumerableBlendSignal{T}.cs b/src/Primitives.Shared/Advanced/MaxConcurrentEnumerableBlendSignal{T}.cs
new file mode 100644
index 00000000..2e019512
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/MaxConcurrentEnumerableBlendSignal{T}.cs
@@ -0,0 +1,37 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Enumerable Blend signal with bounded concurrency.
+/// The value type.
+public sealed class MaxConcurrentEnumerableBlendSignal : IObservable
+{
+ /// The sources to merge.
+ private readonly IEnumerable> _sources;
+
+ /// The maximum number of active inner subscriptions.
+ private readonly int _maxConcurrent;
+
+ /// Initializes a new instance of the class.
+ /// The sources to merge.
+ /// The maximum number of active inner subscriptions.
+ public MaxConcurrentEnumerableBlendSignal(IEnumerable> sources, int maxConcurrent)
+ {
+ _sources = sources;
+ _maxConcurrent = maxConcurrent;
+ }
+
+ ///
+ public IDisposable Subscribe(IObserver observer)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(observer);
+
+ return new MaxConcurrentBlendCoordinator(observer).Run(_sources, _maxConcurrent);
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/ObserverSinkLifetime.cs b/src/Primitives.Shared/Advanced/ObserverSinkLifetime.cs
new file mode 100644
index 00000000..aa65f113
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/ObserverSinkLifetime.cs
@@ -0,0 +1,82 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Shared terminal and subscription handling for advanced observer sinks.
+internal static class ObserverSinkLifetime
+{
+ /// Disposes a sink and its upstream subscription.
+ /// The stopped flag.
+ /// The upstream subscription slot.
+ public static void Dispose(ref int stopped, SingleReplaceableDisposable subscription)
+ {
+ Interlocked.Exchange(ref stopped, 1);
+ subscription.Dispose();
+ }
+
+ /// Gets a value indicating whether the sink has stopped.
+ /// The stopped flag.
+ /// when the sink has stopped.
+ public static bool IsStopped(ref int stopped) => Volatile.Read(ref stopped) != 0;
+
+ /// Assigns the upstream subscription and disposes it when the sink has already stopped.
+ /// The stopped flag.
+ /// The upstream subscription slot.
+ /// The upstream subscription.
+ public static void SetSubscription(ref int stopped, SingleReplaceableDisposable slot, IDisposable subscription)
+ {
+ slot.Create(subscription);
+ if (Volatile.Read(ref stopped) == 0)
+ {
+ return;
+ }
+
+ slot.Dispose();
+ }
+
+ /// Forwards completion exactly once and disposes the upstream subscription.
+ /// The observer value type.
+ /// The stopped flag.
+ /// The upstream subscription slot.
+ /// The downstream observer.
+ public static void Complete(
+ ref int stopped,
+ SingleReplaceableDisposable subscription,
+ IObserver observer)
+ {
+ if (Interlocked.Exchange(ref stopped, 1) != 0)
+ {
+ return;
+ }
+
+ using var _ = subscription;
+ observer.OnCompleted();
+ }
+
+ /// Forwards an error exactly once and disposes the upstream subscription.
+ /// The observer value type.
+ /// The stopped flag.
+ /// The upstream subscription slot.
+ /// The downstream observer.
+ /// The terminal error.
+ public static void Error(
+ ref int stopped,
+ SingleReplaceableDisposable subscription,
+ IObserver observer,
+ Exception terminalError)
+ {
+ if (Interlocked.Exchange(ref stopped, 1) != 0)
+ {
+ return;
+ }
+
+ using var _ = subscription;
+ observer.OnError(terminalError);
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/SelectManyEnumerableObserver{TSource,TResult}.cs b/src/Primitives.Shared/Advanced/SelectManyEnumerableObserver{TSource,TResult}.cs
new file mode 100644
index 00000000..e9c543a4
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/SelectManyEnumerableObserver{TSource,TResult}.cs
@@ -0,0 +1,87 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Observer for enumerable SelectMany.
+/// The source value type.
+/// The result value type.
+public sealed class SelectManyEnumerableObserver : IObserver, IDisposable
+{
+ /// The downstream observer.
+ private readonly IObserver _observer;
+
+ /// The enumerable projection.
+ private readonly Func> _selector;
+
+ /// The upstream subscription.
+ private readonly SingleReplaceableDisposable _subscription = new();
+
+ /// Non-zero after terminal notification or disposal.
+ private int _stopped;
+
+ /// Initializes a new instance of the class.
+ /// The downstream observer.
+ /// The enumerable projection.
+ public SelectManyEnumerableObserver(IObserver observer, Func> selector)
+ {
+ _observer = observer;
+ _selector = selector;
+ }
+
+ ///
+ public void Dispose() => ObserverSinkLifetime.Dispose(ref _stopped, _subscription);
+
+ ///
+ public void OnCompleted() => ObserverSinkLifetime.Complete(ref _stopped, _subscription, _observer);
+
+ ///
+ public void OnError(Exception error) => ObserverSinkLifetime.Error(ref _stopped, _subscription, _observer, error);
+
+ ///
+ public void OnNext(TSource value)
+ {
+ if (ObserverSinkLifetime.IsStopped(ref _stopped))
+ {
+ return;
+ }
+
+ try
+ {
+ var values = _selector(value);
+ if (values is null)
+ {
+ ObserverSinkLifetime.Error(
+ ref _stopped,
+ _subscription,
+ _observer,
+ new InvalidOperationException("SelectMany selector returned null."));
+ return;
+ }
+
+ foreach (var result in values)
+ {
+ if (ObserverSinkLifetime.IsStopped(ref _stopped))
+ {
+ return;
+ }
+
+ _observer.OnNext(result);
+ }
+ }
+ catch (Exception error) when (!FatalExceptionHelper.IsFatal(error))
+ {
+ ObserverSinkLifetime.Error(ref _stopped, _subscription, _observer, error);
+ }
+ }
+
+ /// Assigns the upstream subscription.
+ /// The upstream subscription.
+ public void SetSubscription(IDisposable subscription) =>
+ ObserverSinkLifetime.SetSubscription(ref _stopped, _subscription, subscription);
+}
diff --git a/src/Primitives.Shared/Advanced/SelectManyEnumerableSignal{TSource,TResult}.cs b/src/Primitives.Shared/Advanced/SelectManyEnumerableSignal{TSource,TResult}.cs
new file mode 100644
index 00000000..9f4a06c7
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/SelectManyEnumerableSignal{TSource,TResult}.cs
@@ -0,0 +1,40 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Signal that projects each value to an enumerable sequence and emits the enumerable values.
+/// The source value type.
+/// The result value type.
+public sealed class SelectManyEnumerableSignal : IObservable
+{
+ /// The source observable.
+ private readonly IObservable _source;
+
+ /// The enumerable projection.
+ private readonly Func> _selector;
+
+ /// Initializes a new instance of the class.
+ /// The source observable.
+ /// The enumerable projection.
+ public SelectManyEnumerableSignal(IObservable source, Func> selector)
+ {
+ _source = source;
+ _selector = selector;
+ }
+
+ ///
+ public IDisposable Subscribe(IObserver observer)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(observer);
+
+ SelectManyEnumerableObserver sink = new(observer, _selector);
+ sink.SetSubscription(_source.Subscribe(sink));
+ return sink;
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/SubscribeSafeObserver{T}.cs b/src/Primitives.Shared/Advanced/SubscribeSafeObserver{T}.cs
new file mode 100644
index 00000000..b302f5f4
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/SubscribeSafeObserver{T}.cs
@@ -0,0 +1,59 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Observer that turns downstream OnNext exceptions into a terminal error and upstream disposal.
+/// The value type.
+public sealed class SubscribeSafeObserver : IObserver, IDisposable
+{
+ /// The wrapped observer.
+ private readonly IObserver _observer;
+
+ /// The upstream subscription.
+ private readonly SingleReplaceableDisposable _subscription = new();
+
+ /// Non-zero after terminal notification or disposal.
+ private int _stopped;
+
+ /// Initializes a new instance of the class.
+ /// The wrapped observer.
+ public SubscribeSafeObserver(IObserver observer) => _observer = observer;
+
+ ///
+ public void Dispose() => ObserverSinkLifetime.Dispose(ref _stopped, _subscription);
+
+ ///
+ public void OnCompleted() => ObserverSinkLifetime.Complete(ref _stopped, _subscription, _observer);
+
+ ///
+ public void OnError(Exception error) => ObserverSinkLifetime.Error(ref _stopped, _subscription, _observer, error);
+
+ ///
+ public void OnNext(T value)
+ {
+ if (ObserverSinkLifetime.IsStopped(ref _stopped))
+ {
+ return;
+ }
+
+ try
+ {
+ _observer.OnNext(value);
+ }
+ catch (Exception error) when (!FatalExceptionHelper.IsFatal(error))
+ {
+ ObserverSinkLifetime.Error(ref _stopped, _subscription, _observer, error);
+ }
+ }
+
+ /// Assigns the upstream subscription.
+ /// The upstream subscription.
+ public void SetSubscription(IDisposable subscription) =>
+ ObserverSinkLifetime.SetSubscription(ref _stopped, _subscription, subscription);
+}
diff --git a/src/Primitives.Shared/Advanced/SynchronizeObjectSignal{T}.cs b/src/Primitives.Shared/Advanced/SynchronizeObjectSignal{T}.cs
new file mode 100644
index 00000000..6528c51f
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/SynchronizeObjectSignal{T}.cs
@@ -0,0 +1,39 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Object-gated Synchronize compatibility signal.
+/// The value type.
+public sealed class SynchronizeObjectSignal : IObservable
+{
+ /// The source observable.
+ private readonly IObservable _source;
+
+ /// The shared gate.
+ private readonly object _gate;
+
+ /// Initializes a new instance of the class.
+ /// The source observable.
+ /// The gate shared across subscriptions and other synchronized sequences.
+ public SynchronizeObjectSignal(IObservable source, object gate)
+ {
+ _source = source;
+ _gate = gate;
+ }
+
+ ///
+ public IDisposable Subscribe(IObserver observer)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(observer);
+
+ SynchronizeObjectWitness sink = new(observer, _gate);
+ sink.SetSubscription(_source.Subscribe(sink));
+ return sink;
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/SynchronizeObjectWitness{T}.cs b/src/Primitives.Shared/Advanced/SynchronizeObjectWitness{T}.cs
new file mode 100644
index 00000000..ff4b349b
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/SynchronizeObjectWitness{T}.cs
@@ -0,0 +1,66 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Observer that serializes notifications using an object gate.
+/// The value type.
+public sealed class SynchronizeObjectWitness : IObserver, IDisposable
+{
+ /// The downstream observer.
+ private readonly IObserver _observer;
+
+ /// The gate that serializes every forwarded notification.
+ private readonly object _gate;
+
+ /// The upstream subscription.
+ private IDisposable? _subscription;
+
+ /// Initializes a new instance of the class.
+ /// The downstream observer.
+ /// The gate shared with other synchronized observers.
+ public SynchronizeObjectWitness(IObserver observer, object gate)
+ {
+ _observer = observer;
+ _gate = gate;
+ }
+
+ ///
+ public void OnNext(T value)
+ {
+ lock (_gate)
+ {
+ _observer.OnNext(value);
+ }
+ }
+
+ ///
+ public void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ _observer.OnError(error);
+ }
+ }
+
+ ///
+ public void OnCompleted()
+ {
+ lock (_gate)
+ {
+ _observer.OnCompleted();
+ }
+ }
+
+ /// Assigns the upstream subscription.
+ /// The upstream subscription.
+ public void SetSubscription(IDisposable subscription) => SinkSubscription.Set(ref _subscription, subscription);
+
+ ///
+ public void Dispose() => SinkSubscription.Dispose(ref _subscription);
+}
diff --git a/src/Primitives.Shared/Advanced/TaskChainCoordinatorState.cs b/src/Primitives.Shared/Advanced/TaskChainCoordinatorState.cs
new file mode 100644
index 00000000..e7a0a376
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/TaskChainCoordinatorState.cs
@@ -0,0 +1,38 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// State transitions for .
+internal static class TaskChainCoordinatorState
+{
+ /// Marks the active inner source complete and drains when the coordinator has not already stopped.
+ /// The task result type.
+ /// The coordinator state gate.
+ /// A value indicating whether a terminal notification has already been emitted.
+ /// A value indicating whether an inner source is active.
+ /// The coordinator to drain.
+ public static void OnInnerCompleted(
+ Lock gate,
+ ref bool done,
+ ref bool active,
+ TaskChainCoordinator coordinator)
+ {
+ lock (gate)
+ {
+ if (done)
+ {
+ return;
+ }
+
+ active = false;
+ }
+
+ coordinator.Drain();
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/TaskChainCoordinator{T}.cs b/src/Primitives.Shared/Advanced/TaskChainCoordinator{T}.cs
new file mode 100644
index 00000000..cc1bdcc3
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/TaskChainCoordinator{T}.cs
@@ -0,0 +1,148 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Coordinates sequential task-source concatenation without a map adapter.
+/// The task result type.
+public sealed class TaskChainCoordinator : IDisposable
+{
+ /// Guards the queue and active/completed flags.
+ private readonly Lock _gate = new();
+
+ /// Queued task signals awaiting the active one to complete.
+ private readonly Queue> _queue = new();
+
+ /// Active subscriptions.
+ private readonly MultipleDisposable _pocket = [];
+
+ /// The downstream observer.
+ private readonly IObserver _observer;
+
+ /// A value indicating whether an inner task signal is active.
+ private bool _active;
+
+ /// A value indicating whether the outer task source completed.
+ private bool _outerCompleted;
+
+ /// A value indicating whether a terminal notification has been emitted.
+ private bool _done;
+
+ /// Initializes a new instance of the class.
+ /// The downstream observer.
+ public TaskChainCoordinator(IObserver observer) => _observer = observer;
+
+ ///
+ public void Dispose() => _pocket.Dispose();
+
+ /// Subscribes to the outer task source.
+ /// The outer task source.
+ /// The coordinator that owns the subscription cleanup.
+ public TaskChainCoordinator Run(IObservable> sources)
+ {
+ _pocket.Add(sources.Subscribe(OnTask, OnError, OnOuterCompleted));
+ return this;
+ }
+
+ /// Subscribes the next queued task signal, or completes when the task chain is drained.
+ internal void Drain()
+ {
+ IObservable? next = null;
+ lock (_gate)
+ {
+ if (_done || _active)
+ {
+ return;
+ }
+
+ if (_queue.Count > 0)
+ {
+ _active = true;
+ next = _queue.Dequeue();
+ }
+ else if (_outerCompleted)
+ {
+ _done = true;
+ _observer.OnCompleted();
+ }
+ }
+
+ if (next is null)
+ {
+ return;
+ }
+
+ _pocket.Add(next.Subscribe(_observer.OnNext, OnError, OnInnerCompleted));
+ }
+
+ /// Queues a task as a task-backed signal and pumps the drain.
+ /// The task to observe in source order.
+ private void OnTask(Task task)
+ {
+ IObservable source;
+ try
+ {
+ source = Signal.FromTask(task);
+ }
+ catch (Exception error) when (!FatalExceptionHelper.IsFatal(error))
+ {
+ OnError(error);
+ return;
+ }
+
+ lock (_gate)
+ {
+ if (_done)
+ {
+ return;
+ }
+
+ _queue.Enqueue(source);
+ }
+
+ Drain();
+ }
+
+ /// Marks the outer task source complete and pumps the drain.
+ private void OnOuterCompleted()
+ {
+ lock (_gate)
+ {
+ if (_done)
+ {
+ return;
+ }
+
+ _outerCompleted = true;
+ }
+
+ Drain();
+ }
+
+ /// Marks the active task signal complete and pumps the drain.
+ private void OnInnerCompleted() =>
+ TaskChainCoordinatorState.OnInnerCompleted(_gate, ref _done, ref _active, this);
+
+ /// Forwards an error and terminates active subscriptions.
+ /// The terminal error.
+ private void OnError(Exception error)
+ {
+ lock (_gate)
+ {
+ if (_done)
+ {
+ return;
+ }
+
+ _done = true;
+ _observer.OnError(error);
+ }
+
+ Dispose();
+ }
+}
diff --git a/src/Primitives.Shared/Advanced/TaskChainSignal{T}.cs b/src/Primitives.Shared/Advanced/TaskChainSignal{T}.cs
new file mode 100644
index 00000000..87717831
--- /dev/null
+++ b/src/Primitives.Shared/Advanced/TaskChainSignal{T}.cs
@@ -0,0 +1,29 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Advanced;
+#else
+namespace ReactiveUI.Primitives.Advanced;
+#endif
+
+/// Task-source Chain/Concat signal.
+/// The task result type.
+public sealed class TaskChainSignal : IObservable
+{
+ /// The outer task source.
+ private readonly IObservable> _sources;
+
+ /// Initializes a new instance of the class.
+ /// The outer task source.
+ public TaskChainSignal(IObservable> sources) => _sources = sources;
+
+ ///
+ public IDisposable Subscribe(IObserver observer)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(observer);
+
+ return new TaskChainCoordinator(observer).Run(_sources);
+ }
+}
diff --git a/src/Primitives.Shared/ConnectableSignalExtensions.cs b/src/Primitives.Shared/ConnectableSignalExtensions.cs
index 473c4284..6da1480f 100644
--- a/src/Primitives.Shared/ConnectableSignalExtensions.cs
+++ b/src/Primitives.Shared/ConnectableSignalExtensions.cs
@@ -25,6 +25,11 @@ public IObservable AutoShare()
return AutoShareGate.For(source);
}
+ /// Shares a single subscription while observers are present. System.Reactive name for AutoShare.
+ /// A reference-counted sequence.
+ public IObservable RefCount() =>
+ source.AutoShare();
+
/// Connects on the first observer subscription.
/// A sequence that connects after the first subscription.
public IObservable AutoConnect() =>
@@ -70,6 +75,53 @@ public ConnectableSignal ShareLive() =>
public ConnectableSignal Share() =>
source.ShareLive();
+ /// Creates a connectable live signal. System.Reactive name for ShareLive.
+ /// A connectable live signal.
+ public ConnectableSignal Publish() =>
+ source.ShareLive();
+
+ /// Multicasts source values through a live hub and applies a selector.
+ /// The selected value type.
+ /// The selector applied to the connectable signal before it is connected.
+ /// A sequence returned by while the source is connected.
+ /// or is .
+ public IObservable Publish(Func, IObservable> selector)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(selector);
+
+ return Signal.Create(observer =>
+ {
+ var connectable = source.Publish();
+ IObservable selected;
+ try
+ {
+ selected = selector(connectable);
+ }
+ catch (Exception error) when (!FatalExceptionHelper.IsFatal(error))
+ {
+ observer.OnError(error);
+ return EmptyDisposable.Instance;
+ }
+
+ if (selected is null)
+ {
+ observer.OnError(new InvalidOperationException("Publish selector returned null."));
+ return EmptyDisposable.Instance;
+ }
+
+ var subscription = selected.Subscribe(observer);
+ var connection = connectable.Connect();
+ return new MultipleDisposable(subscription, connection);
+ });
+ }
+
+ /// Replays all source values through an unbounded replay hub.
+ /// A connectable replay signal.
+ public ConnectableSignal ReplayLive() =>
+ source.Multicast(new ReplaySignal());
+
/// Replays source values through a bounded replay hub.
/// Maximum number of values to replay.
/// A connectable replay signal.
@@ -83,6 +135,11 @@ public ConnectableSignal ReplayLive(int bufferSize) =>
public ConnectableSignal ReplayLive(int bufferSize, TimeSpan window) =>
source.Multicast(new ReplaySignal(bufferSize, window));
+ /// Replays all source values through an unbounded replay hub.
+ /// A connectable replay signal.
+ public ConnectableSignal Replay() =>
+ source.ReplayLive();
+
/// Replays source values through a bounded replay hub.
/// Maximum number of values to replay.
/// A connectable replay signal.
diff --git a/src/Primitives.Shared/FatalExceptionHelper.cs b/src/Primitives.Shared/FatalExceptionHelper.cs
new file mode 100644
index 00000000..f58d0a79
--- /dev/null
+++ b/src/Primitives.Shared/FatalExceptionHelper.cs
@@ -0,0 +1,27 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive;
+#else
+namespace ReactiveUI.Primitives;
+#endif
+
+/// Identifies runtime exceptions that should not be converted into observable error notifications.
+internal static class FatalExceptionHelper
+{
+ /// Determines whether an exception represents a fatal runtime failure.
+ /// The exception to classify.
+ /// when the exception should be allowed to propagate.
+ internal static bool IsFatal(Exception error) =>
+ error is
+ StackOverflowException or
+ AccessViolationException or
+ AppDomainUnloadedException or
+ BadImageFormatException or
+ CannotUnloadAppDomainException or
+ InvalidProgramException or
+ System.Threading.ThreadAbortException or
+ (OutOfMemoryException and not InsufficientMemoryException);
+}
diff --git a/src/Primitives.Shared/LinqExtensions.BlendEnumerable.cs b/src/Primitives.Shared/LinqExtensions.BlendEnumerable.cs
index 732492cb..432e40c0 100644
--- a/src/Primitives.Shared/LinqExtensions.BlendEnumerable.cs
+++ b/src/Primitives.Shared/LinqExtensions.BlendEnumerable.cs
@@ -24,6 +24,18 @@ public IObservable Blend()
return new EnumerableBlendSignal(sources);
}
+
+ /// Concurrently merges the supplied observable sources with a maximum number of active subscriptions.
+ /// The maximum number of sources to subscribe to at the same time.
+ /// An observable that forwards values from every source.
+ public IObservable Blend(int maxConcurrent)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(sources);
+
+ ArgumentOutOfRangeExceptionHelper.ThrowIfNegativeOrZero(maxConcurrent);
+
+ return maxConcurrent == int.MaxValue ? sources.Blend() : new MaxConcurrentEnumerableBlendSignal(sources, maxConcurrent);
+ }
}
/// Dedicated signal for enumerable Blend sources.
diff --git a/src/Primitives.Shared/SignalOperatorMixins.cs b/src/Primitives.Shared/SignalOperatorMixins.cs
index 2a6f24ad..c2fe255d 100644
--- a/src/Primitives.Shared/SignalOperatorMixins.cs
+++ b/src/Primitives.Shared/SignalOperatorMixins.cs
@@ -114,6 +114,20 @@ public IObservable Map(Func selector)
return new MapSignal(source, selector);
}
+ /// Projects each element and its zero-based index into a new form.
+ /// The type of the elements in the result sequence.
+ /// A transform function to apply to each source element and its index.
+ /// An observable sequence whose elements are the result of invoking the transform on each source element and index.
+ /// or is .
+ public IObservable MapIndexed(Func selector)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(selector);
+
+ return new MapIndexedSignal(source, selector);
+ }
+
///
/// Projects each element of an observable sequence into a new form by incorporating state that is passed to the
/// selector function.
@@ -530,6 +544,22 @@ public IObservable KeepNotNull()
}
}
+ /// Combining operators for an observable source of tasks.
+ /// The outer sequence of task sources.
+ /// The task result type.
+ extension(IObservable> sources)
+ {
+ /// Subscribes to task results one at a time in source order.
+ /// A sequence that emits each task result after the previous task signal completes.
+ /// is .
+ public IObservable Chain()
+ {
+ ArgumentExceptionHelper.ThrowIfNull(sources);
+
+ return new TaskChainSignal(sources);
+ }
+ }
+
/// Type-filtering and casting operators for an untyped observable source.
/// The source sequence.
extension(IObservable
public static partial class LinqExtensions
{
+ /// System.Reactive-named combining operators for enumerable observable sources.
+ /// The observable sources.
+ /// The value type.
+ extension(IEnumerable> sources)
+ {
+ /// Concurrently merges the supplied observable sources. System.Reactive name for Blend.
+ /// An observable that forwards values from every source.
+ /// is .
+ public IObservable Merge()
+ {
+ ArgumentExceptionHelper.ThrowIfNull(sources);
+
+ return sources.Blend();
+ }
+
+ /// Concurrently merges observable sources with a maximum number of active subscriptions.
+ /// The maximum number of sources to subscribe to at the same time.
+ /// An observable that forwards values from every source.
+ /// is .
+ /// is less than or equal to zero.
+ public IObservable Merge(int maxConcurrent)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(sources);
+
+ ArgumentOutOfRangeExceptionHelper.ThrowIfNegativeOrZero(maxConcurrent);
+
+ return sources.Blend(maxConcurrent);
+ }
+ }
+
/// System.Reactive-named combining operators for an observable source of inner observable sequences.
/// The outer sequence of inner sequences.
/// The value type.
@@ -87,6 +117,53 @@ public IObservable Dematerialize()
/// The value type.
extension(IObservable source)
{
+ /// Subscribes an observer with downstream exception protection.
+ /// The observer to subscribe.
+ /// A disposable that cancels the subscription.
+ /// or is .
+ public IDisposable SubscribeSafe(IObserver observer)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(observer);
+
+ SubscribeSafeObserver safe = new(observer);
+ safe.SetSubscription(source.Subscribe(safe));
+ return safe;
+ }
+
+ /// Subscribes callbacks with downstream exception protection.
+ /// The action to invoke for each value.
+ /// The action to invoke for an error.
+ /// A disposable that cancels the subscription.
+ /// A required argument is .
+ public IDisposable SubscribeSafe(Action onNext, Action onError) =>
+ source.SubscribeSafe(Witness.Create(onNext, onError));
+
+ /// Subscribes callbacks with downstream exception protection.
+ /// The action to invoke for each value.
+ /// The action to invoke for an error.
+ /// The action to invoke when the sequence completes.
+ /// A disposable that cancels the subscription.
+ /// A required argument is .
+ public IDisposable SubscribeSafe(Action onNext, Action onError, Action onCompleted) =>
+ source.SubscribeSafe(Witness.Create(onNext, onError, onCompleted));
+
+ /// Subscribes terminal callbacks with downstream exception protection.
+ /// The action to invoke for an error.
+ /// A disposable that cancels the subscription.
+ /// A required argument is .
+ public IDisposable SubscribeSafe(Action onError) =>
+ source.SubscribeSafe(Witness.Create(static _ => { }, onError));
+
+ /// Subscribes terminal callbacks with downstream exception protection.
+ /// The action to invoke for an error.
+ /// The action to invoke when the sequence completes.
+ /// A disposable that cancels the subscription.
+ /// A required argument is .
+ public IDisposable SubscribeSafe(Action onError, Action onCompleted) =>
+ source.SubscribeSafe(Witness.Create(static _ => { }, onError, onCompleted));
+
/// Invokes an action for each value while preserving the sequence. System.Reactive name for Tap.
/// The action to invoke for each value.
/// The source values after the action has run.
@@ -132,6 +209,28 @@ public IObservable Do(Action onNext, Action onCompleted)
return new TapSignal(source, onNext, static _ => { }, onCompleted);
}
+ /// Invokes actions for each value, error, and completion while preserving the sequence. System.Reactive name for Tap.
+ /// The action to invoke for each value.
+ /// The action to invoke for an error.
+ /// The action to invoke when the sequence completes.
+ /// The source values after the actions have run.
+ /// A required argument is .
+ public IObservable Do(
+ Action onNext,
+ Action onError,
+ Action onCompleted)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(onNext);
+
+ ArgumentExceptionHelper.ThrowIfNull(onError);
+
+ ArgumentExceptionHelper.ThrowIfNull(onCompleted);
+
+ return new TapSignal(source, onNext, onError, onCompleted);
+ }
+
///
/// Serializes notifications behind a gate so downstream operators observe the single-threaded
/// OnNext* then OnError|OnCompleted grammar even when the source delivers
@@ -154,8 +253,6 @@ public IObservable Synchronize()
/// The gate shared with other synchronized sequences.
/// A sequence that forwards the source notifications one at a time under the shared gate.
/// or is .
- [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0001:Simplify Names", Justification = "The argument validation uses ArgumentExceptionHelper")]
- [System.Diagnostics.CodeAnalysis.SuppressMessage("Modernization", "SST2000:Use ArgumentNullException.ThrowIfNull", Justification = "Uses variable types")]
public IObservable Synchronize(Lock gate)
{
ArgumentExceptionHelper.ThrowIfNull(source);
@@ -168,6 +265,38 @@ public IObservable Synchronize(Lock gate)
return new SynchronizeSignal(source, gate);
}
+#if NET9_0_OR_GREATER
+ /// Serializes notifications behind an object gate. System.Reactive name for object-gated synchronization.
+ /// The gate shared with other synchronized sequences.
+ /// A sequence that forwards the source notifications one at a time under the shared gate.
+ /// or is .
+ public IObservable Synchronize(object gate)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(gate);
+
+ return new SynchronizeObjectSignal(source, gate);
+ }
+#endif
+
+ /// Serializes notifications behind an object gate when a caller cannot provide a dedicated lock gate.
+ /// The gate shared with other synchronized sequences.
+ /// A sequence that forwards the source notifications one at a time under the shared gate.
+ /// or is .
+ public IObservable SynchronizeObject(object gate)
+ {
+#if NET9_0_OR_GREATER
+ return LinqExtensions.Synchronize(source, gate);
+#else
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(gate);
+
+ return new SynchronizeObjectSignal(source, gate);
+#endif
+ }
+
/// Invokes a stateful action for each value while preserving the sequence. State-carrying name for TapWith.
/// The state type.
/// The state passed to .
@@ -265,7 +394,67 @@ public IObservable IgnoreElements()
return new IgnoreValuesSignal(source);
}
- /// Projects each value to an inner sequence and merges the results. LINQ name for FlatMap.
+ /// Prepends values before the source sequence. System.Reactive name for Prepend.
+ /// The values to emit before the source.
+ /// A sequence that emits before the source values.
+ /// or is .
+ public IObservable StartWith(params T[] values) =>
+ source.Prepend(values);
+
+ /// Prepends values before the source sequence. System.Reactive name for Prepend.
+ /// The values to emit before the source.
+ /// A sequence that emits before the source values.
+ /// or is .
+ public IObservable StartWith(IEnumerable values) =>
+ source.Prepend(values);
+
+ /// Collects values into time-windowed batches. System.Reactive name for Collect.
+ /// The duration of each buffer window.
+ /// A sequence that emits non-empty batches of source values.
+ /// is .
+ public IObservable> Buffer(TimeSpan timeSpan) =>
+ source.Collect(timeSpan);
+
+ /// Collects values into time-windowed batches on the supplied scheduler.
+ /// The duration of each buffer window.
+ /// The scheduler used to schedule buffer flushes.
+ /// A sequence that emits non-empty batches of source values.
+ /// or is .
+ public IObservable> Buffer(TimeSpan timeSpan, ISequencer scheduler) =>
+ source.Collect(timeSpan, scheduler);
+
+ /// Invokes an action when the subscription terminates or is disposed. System.Reactive name for OnCleanup.
+ /// The action to invoke exactly once.
+ /// A sequence that mirrors the source and invokes on cleanup.
+ /// or is .
+ public IObservable Finally(Action finallyAction) =>
+ source.OnCleanup(finallyAction);
+
+ /// Emits a value only after no newer value arrives within the quiet period. System.Reactive name for Calm.
+ /// The quiet period.
+ /// A sequence that emits the latest value after each quiet period.
+ /// is .
+ public IObservable Throttle(TimeSpan dueTime) =>
+ source.Calm(dueTime);
+
+ /// Emits a value only after no newer value arrives within the scheduler quiet period. System.Reactive name for Calm.
+ /// The quiet period.
+ /// The scheduler used to schedule quiet-period timers.
+ /// A sequence that emits the latest value after each quiet period.
+ /// or is .
+ public IObservable Throttle(TimeSpan dueTime, ISequencer scheduler) =>
+ source.Calm(dueTime, scheduler);
+
+ /// Handles errors of the specified type by switching to a replacement sequence.
+ /// The exception type to handle.
+ /// The function that produces a replacement sequence for handled errors.
+ /// A sequence that switches to the handler result for matching errors.
+ /// or is .
+ public IObservable Catch(Func> handler)
+ where TException : Exception =>
+ source.Recover(handler);
+
+ /// Projects each value to an inner sequence and merges the results. LINQ name for concurrent flattening.
/// The inner value type.
/// The function that projects each source value to an inner sequence.
/// A sequence containing the merged values of every inner sequence.
@@ -276,10 +465,38 @@ public IObservable SelectMany(Func> se
ArgumentExceptionHelper.ThrowIfNull(selector);
- return new FlatMapSignal(source, selector);
+ return source.Map(selector).Merge();
+ }
+
+ /// Projects each value to the same inner sequence and merges the results.
+ /// The inner value type.
+ /// The inner sequence used for each source value.
+ /// A sequence containing the merged values of every inner sequence.
+ /// or is .
+ public IObservable SelectMany(IObservable other)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(other);
+
+ return source.Map(_ => other).Merge();
}
- /// Projects each value to an inner sequence and combines each pair with a result selector. LINQ name for FlatMap.
+ /// Projects each value to an enumerable sequence and emits the projected values.
+ /// The projected value type.
+ /// The function that projects each source value to enumerable values.
+ /// A sequence containing the projected enumerable values.
+ /// or is .
+ public IObservable SelectMany(Func> selector)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(selector);
+
+ return new SelectManyEnumerableSignal(source, selector);
+ }
+
+ /// Projects each value to an inner sequence and concurrently combines each pair with a result selector.
/// The inner value type.
/// The result value type.
/// The function that projects each source value to an inner sequence.
@@ -294,7 +511,20 @@ public IObservable SelectMany(
ArgumentExceptionHelper.ThrowIfNull(resultSelector);
- return new FlatMapResultSignal(source, collectionSelector, resultSelector);
+ return source.Map(value => collectionSelector(value).Map(inner => resultSelector(value, inner))).Merge();
+ }
+
+ /// Merges this sequence with another observable sequence. System.Reactive name for Blend.
+ /// The second sequence to merge.
+ /// A sequence containing values from both sources as they arrive.
+ /// or is .
+ public IObservable Merge(IObservable second)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(second);
+
+ return new[] { source, second }.Blend();
}
/// Concatenates two sequences. System.Reactive name for Chain.
@@ -509,6 +739,14 @@ public IObservable Select(Func selector)
return new MapSignal(source, selector);
}
+ /// Projects each element and its zero-based index into a new form. LINQ name for MapIndexed.
+ /// The type of the elements in the result sequence.
+ /// A transform function to apply to each element and its index.
+ /// An observable sequence whose elements are the result of invoking the transform on each source element and index.
+ /// or is .
+ public IObservable Select(Func selector) =>
+ source.MapIndexed(selector);
+
/// Projects each element into a new form using external state passed to the selector. State-carrying name for MapWith.
/// The type of the state used in the selector function.
/// The type of the elements in the result sequence.
@@ -553,4 +791,20 @@ public IObservable WhereWith(TState state, Func(source, state, predicate);
}
}
+
+ /// System.Reactive-named combining operators for an observable source of tasks.
+ /// The outer sequence of task sources.
+ /// The task result type.
+ extension(IObservable> sources)
+ {
+ /// Subscribes to task results one at a time in source order. System.Reactive name for Chain.
+ /// A sequence that emits each task result after the previous task signal completes.
+ /// is .
+ public IObservable Concat()
+ {
+ ArgumentExceptionHelper.ThrowIfNull(sources);
+
+ return new TaskChainSignal(sources);
+ }
+ }
}
diff --git a/src/Primitives.Shared/SignalOperatorParityMixins.cs b/src/Primitives.Shared/SignalOperatorParityMixins.cs
index e9f0e478..f03e7c30 100644
--- a/src/Primitives.Shared/SignalOperatorParityMixins.cs
+++ b/src/Primitives.Shared/SignalOperatorParityMixins.cs
@@ -21,6 +21,30 @@ public static partial class LinqExtensions
/// The receiver sequence is .
public IObservable ToObservable() => Signal.FromEnumerable(values);
+ /// Converts an enumerable sequence to a Primitives signal on the supplied scheduler.
+ /// The scheduler used to enumerate and emit the values.
+ /// A signal that emits the enumerable values on the supplied scheduler.
+ /// The receiver sequence or is .
+ public IObservable ToObservable(ISequencer scheduler)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(values);
+
+ ArgumentExceptionHelper.ThrowIfNull(scheduler);
+
+ if (scheduler == Sequencer.Immediate || scheduler == Sequencer.CurrentThread)
+ {
+ return Signal.FromEnumerable(values);
+ }
+
+ return Signal.Create(observer =>
+ {
+ CancellationDisposable cancel = new();
+ var scheduled = scheduler.Schedule(() => EmitEnumerable(values, observer, cancel));
+
+ return new MultipleDisposable(cancel, scheduled);
+ });
+ }
+
/// Converts an enumerable sequence to a Primitives signal using the System.Reactive conversion name.
/// The token used to stop enumeration.
/// A signal that emits the enumerable values until enumeration completes or cancellation is requested.
@@ -952,6 +976,11 @@ private Task FirstOrDefaultCoreAsync(bool hasDefault, T defaultValue)
/// The task result type.
extension(Task task)
{
+ /// Converts a task to an observable sequence that emits the task result.
+ /// An observable sequence that emits the completed task result or faults with the task error.
+ /// The receiver task is .
+ public IObservable ToObservable() => Signal.FromTask(task);
+
/// Identity helper that keeps source-compatible FirstAsync().ToTask() migrations compiling.
/// The supplied task.
/// The receiver task is .
@@ -959,6 +988,34 @@ private Task FirstOrDefaultCoreAsync(bool hasDefault, T defaultValue)
public Task ToTask() => task ?? throw new ArgumentNullException(nameof(task));
}
+ /// Emits enumerable values while honoring a scheduled subscription cancellation.
+ /// The values to emit.
+ /// The destination observer.
+ /// The subscription cancellation.
+ /// The value type.
+ private static void EmitEnumerable(
+ IEnumerable values,
+ IObserver observer,
+ CancellationDisposable cancel)
+ {
+ foreach (var value in values)
+ {
+ if (cancel.IsDisposed)
+ {
+ return;
+ }
+
+ observer.OnNext(value);
+ }
+
+ if (cancel.IsDisposed)
+ {
+ return;
+ }
+
+ observer.OnCompleted();
+ }
+
/// Returns the final value as a completed task when the source is a readable range, avoiding a subscription.
/// The value type.
/// The source sequence.
diff --git a/src/Primitives.Shared/Signals/SignalExtensions{Recover}.cs b/src/Primitives.Shared/Signals/SignalExtensions{Recover}.cs
index e2047aa4..34e28a49 100644
--- a/src/Primitives.Shared/Signals/SignalExtensions{Recover}.cs
+++ b/src/Primitives.Shared/Signals/SignalExtensions{Recover}.cs
@@ -54,7 +54,13 @@ public IObservable Recover(FuncFinallies the specified finally action.
/// The finally action.
/// An observable sequence containing elements from consecutive source sequences until a source sequence terminates successfully.
- public IObservable OnCleanup(Action finallyAction) =>
- new FinallySignal(source, finallyAction);
+ public IObservable OnCleanup(Action finallyAction)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(source);
+
+ ArgumentExceptionHelper.ThrowIfNull(finallyAction);
+
+ return new FinallySignal(source, finallyAction);
+ }
}
}
diff --git a/src/Primitives.Shared/Signals/Signal{Create}.cs b/src/Primitives.Shared/Signals/Signal{Create}.cs
index 3766dfa8..c0958e1c 100644
--- a/src/Primitives.Shared/Signals/Signal{Create}.cs
+++ b/src/Primitives.Shared/Signals/Signal{Create}.cs
@@ -3,8 +3,12 @@
// See the LICENSE file in the project root for full license information.
#if REACTIVE_SHIM
+using ReactiveUI.Primitives.Reactive.Advanced;
+
namespace ReactiveUI.Primitives.Reactive.Signals;
#else
+using ReactiveUI.Primitives.Advanced;
+
namespace ReactiveUI.Primitives.Signals;
#endif
@@ -27,6 +31,35 @@ public static IObservable Create(Func, IDisposable> subscribe
return new CreateSignal(subscribe);
}
+ /// Creates an observable from an asynchronous subscription function.
+ /// The type.
+ /// The asynchronous subscription function.
+ /// An observable sequence backed by the asynchronous subscription.
+ /// is .
+ public static IObservable Create(Func, Task> subscribe)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(subscribe);
+
+ return Create((observer, _) => subscribe(observer));
+ }
+
+ /// Creates an observable from a cancellable asynchronous subscription function.
+ /// The type.
+ /// The asynchronous subscription function.
+ /// An observable sequence backed by the asynchronous subscription.
+ /// is .
+ public static IObservable Create(Func, CancellationToken, Task> subscribe)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(subscribe);
+
+ return Create(observer =>
+ {
+ AsyncSubscriptionLifetime subscription = new();
+ _ = RunAsyncSubscription(subscribe, observer, subscription);
+ return subscription;
+ });
+ }
+
///
/// Create anonymous Signals. Observer has exception durability.
/// This is recommended for make operator and event, generating a HotSignals.
@@ -150,4 +183,89 @@ public static IObservable Defer(Func> observableFactory)
return source.Subscribe(observer);
});
}
+
+ /// Creates a signal whose source is produced asynchronously for each subscription.
+ /// The value type.
+ /// The asynchronous factory that creates the source signal for a subscription.
+ /// A signal that subscribes to the factory-produced source for each observer.
+ /// is .
+ public static IObservable Defer(Func>> observableFactory)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(observableFactory);
+
+ return Create(
+ async (observer, cancellationToken) =>
+ {
+ IObservable source;
+ try
+ {
+ source = await observableFactory().ConfigureAwait(false);
+ }
+ catch (Exception error)
+ {
+ observer.OnError(error);
+ return EmptyDisposable.Instance;
+ }
+
+ return cancellationToken.IsCancellationRequested ? EmptyDisposable.Instance : source.Subscribe(observer);
+ });
+ }
+
+ /// Creates a signal whose source is produced asynchronously for each subscription.
+ /// The value type.
+ /// The asynchronous factory that creates the source signal for a subscription.
+ /// A signal that subscribes to the factory-produced source for each observer.
+ /// is .
+ public static IObservable Defer(Func>> observableFactory)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(observableFactory);
+
+ return Create(
+ async (observer, cancellationToken) =>
+ {
+ IObservable source;
+ try
+ {
+ source = await observableFactory(cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception error)
+ {
+ observer.OnError(error);
+ return EmptyDisposable.Instance;
+ }
+
+ return cancellationToken.IsCancellationRequested ? EmptyDisposable.Instance : source.Subscribe(observer);
+ });
+ }
+
+ /// Completes an asynchronous subscription and assigns its disposable lifetime.
+ /// The value type.
+ /// The asynchronous subscription function.
+ /// The downstream observer.
+ /// The subscription slot.
+ /// A task that completes when the asynchronous subscription has assigned its disposable.
+ private static async Task RunAsyncSubscription(
+ Func, CancellationToken, Task> subscribe,
+ IObserver observer,
+ AsyncSubscriptionLifetime subscription)
+ {
+ try
+ {
+ var disposable = await subscribe(observer, subscription.Token).ConfigureAwait(false);
+ subscription.SetSubscription(disposable);
+ }
+ catch (OperationCanceledException) when (subscription.IsCancellationRequested)
+ {
+ subscription.SetSubscription(EmptyDisposable.Instance);
+ }
+ catch (Exception error)
+ {
+ observer.OnError(error);
+ subscription.SetSubscription(EmptyDisposable.Instance);
+ }
+ finally
+ {
+ subscription.Complete();
+ }
+ }
}
diff --git a/src/Primitives.Shared/Signals/Signal{Factories}.cs b/src/Primitives.Shared/Signals/Signal{Factories}.cs
index 21a41781..6f06aa18 100644
--- a/src/Primitives.Shared/Signals/Signal{Factories}.cs
+++ b/src/Primitives.Shared/Signals/Signal{Factories}.cs
@@ -2,6 +2,7 @@
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
+using System.Collections.Specialized;
using System.ComponentModel;
#if REACTIVE_SHIM
@@ -209,6 +210,18 @@ public static IObservable> FromEventPattern
+ observer.OnNext(new(sender, (TEventArgs)(EventArgs)args));
+ handler = (TEventHandler)(object)typed;
+ }
+ else if (typeof(TEventHandler) == typeof(ListChangedEventHandler))
+ {
+ ListChangedEventHandler typed = (sender, args) =>
+ observer.OnNext(new(sender, (TEventArgs)(EventArgs)args));
+ handler = (TEventHandler)(object)typed;
+ }
else if (typeof(TEventHandler) == typeof(EventHandler))
{
EventHandler typed = (sender, args) =>
diff --git a/src/Primitives.Shared/Signals/Signal{RxAliases}.cs b/src/Primitives.Shared/Signals/Signal{RxAliases}.cs
new file mode 100644
index 00000000..0a7e00b2
--- /dev/null
+++ b/src/Primitives.Shared/Signals/Signal{RxAliases}.cs
@@ -0,0 +1,169 @@
+// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
+// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+#if REACTIVE_SHIM
+namespace ReactiveUI.Primitives.Reactive.Signals;
+#else
+namespace ReactiveUI.Primitives.Signals;
+#endif
+
+/// System.Reactive factory aliases for the Primitives signal factory vocabulary.
+public static partial class Signal
+{
+ /// Returns an observable sequence that contains a single value.
+ /// The value type.
+ /// The value to emit.
+ /// An observable sequence that emits and completes.
+ public static IObservable Return(T value) => Emit(value);
+
+ /// Returns an observable sequence that contains a single scheduled value.
+ /// The value type.
+ /// The value to emit.
+ /// The scheduler used to emit the value.
+ /// An observable sequence that emits and completes.
+ public static IObservable Return(T value, ISequencer scheduler) => Emit(value, scheduler);
+
+ /// Returns an empty observable sequence.
+ /// The value type.
+ /// An observable sequence that completes without values.
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Major Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "The type parameter defines the element type for this Rx-style factory and cannot be inferred from the arguments.")]
+ public static IObservable Empty() => None();
+
+ /// Returns an empty observable sequence on the supplied scheduler.
+ /// The value type.
+ /// The scheduler used to complete the sequence.
+ /// An observable sequence that completes without values.
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Major Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "The type parameter defines the element type for this Rx-style factory and cannot be inferred from the arguments.")]
+ public static IObservable Empty(ISequencer scheduler) => None(scheduler);
+
+ /// Returns a non-terminating observable sequence.
+ /// The value type.
+ /// An observable sequence that never emits and never terminates.
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Major Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "The type parameter defines the element type for this Rx-style factory and cannot be inferred from the arguments.")]
+ public static IObservable Never() => Silent();
+
+ /// Returns an observable sequence that terminates with an error.
+ /// The value type.
+ /// The error used to terminate the sequence.
+ /// An observable sequence that terminates with .
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Major Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "The type parameter defines the element type for this Rx-style factory and cannot be inferred from the arguments.")]
+ public static IObservable Throw(Exception error) => Fail(error);
+
+ /// Returns an observable sequence that terminates with a scheduled error.
+ /// The value type.
+ /// The error used to terminate the sequence.
+ /// The scheduler used to emit the error.
+ /// An observable sequence that terminates with .
+ [System.Diagnostics.CodeAnalysis.SuppressMessage(
+ "Major Code Smell",
+ "S4018:Generic methods should provide type parameters",
+ Justification = "The type parameter defines the element type for this Rx-style factory and cannot be inferred from the arguments.")]
+ public static IObservable Throw(Exception error, ISequencer scheduler) => Fail(error, scheduler);
+
+ /// Returns an observable sequence that emits a range of integral values.
+ /// The first value to emit.
+ /// The number of values to emit.
+ /// An observable sequence that emits the requested range and completes.
+ public static IObservable Range(int start, int count) => Sequence(start, count);
+
+ /// Returns an observable sequence that emits a scheduled range of integral values.
+ /// The first value to emit.
+ /// The number of values to emit.
+ /// The scheduler used to emit the values.
+ /// An observable sequence that emits the requested range and completes.
+ public static IObservable Range(int start, int count, ISequencer scheduler) => Sequence(start, count, scheduler);
+
+ /// Returns an observable sequence that emits a single tick after the due time.
+ /// The relative time after which to emit the tick.
+ /// An observable sequence that emits one tick and completes.
+ public static IObservable Timer(TimeSpan dueTime) => After(dueTime);
+
+ /// Returns an observable sequence that emits a single tick after the due time on a scheduler.
+ /// The relative time after which to emit the tick.
+ /// The scheduler used to emit the tick.
+ /// An observable sequence that emits one tick and completes.
+ public static IObservable Timer(TimeSpan dueTime, ISequencer scheduler) => After(dueTime, scheduler);
+
+ /// Returns an observable sequence that emits a single tick at an absolute due time.
+ /// The absolute time at which to emit the tick.
+ /// An observable sequence that emits one tick and completes.
+ public static IObservable Timer(DateTimeOffset dueTime) => After(dueTime);
+
+ /// Returns an observable sequence that emits a single tick at an absolute due time on a scheduler.
+ /// The absolute time at which to emit the tick.
+ /// The scheduler used to emit the tick.
+ /// An observable sequence that emits one tick and completes.
+ public static IObservable Timer(DateTimeOffset dueTime, ISequencer scheduler) => After(dueTime, scheduler);
+
+ /// Returns an observable sequence that emits ticks periodically after an initial due time.
+ /// The relative time before the first tick.
+ /// The period between subsequent ticks.
+ /// An observable sequence that emits periodic ticks.
+ public static IObservable Timer(TimeSpan dueTime, TimeSpan period) => After(dueTime, period);
+
+ /// Returns an observable sequence that emits scheduled ticks periodically after an initial due time.
+ /// The relative time before the first tick.
+ /// The period between subsequent ticks.
+ /// The scheduler used to emit ticks.
+ /// An observable sequence that emits periodic ticks.
+ public static IObservable Timer(TimeSpan dueTime, TimeSpan period, ISequencer scheduler) => After(dueTime, period, scheduler);
+
+ /// Returns an observable sequence that emits monotonically increasing ticks at the specified interval.
+ /// The period between ticks.
+ /// An observable sequence that emits periodic ticks.
+ public static IObservable Interval(TimeSpan period) => Every(period);
+
+ /// Returns an observable sequence that emits scheduled, monotonically increasing ticks at the specified interval.
+ /// The period between ticks.
+ /// The scheduler used to emit ticks.
+ /// An observable sequence that emits periodic ticks.
+ public static IObservable Interval(TimeSpan period, ISequencer scheduler) => Every(period, scheduler);
+
+ /// Concatenates the supplied observable sources.
+ /// The value type.
+ /// The sources to concatenate.
+ /// An observable sequence that subscribes to each source after the previous one completes.
+ public static IObservable Concat(params IObservable[] sources) => Chain(sources);
+
+ /// Concatenates the supplied observable sources.
+ /// The value type.
+ /// The sources to concatenate.
+ /// An observable sequence that subscribes to each source after the previous one completes.
+ public static IObservable Concat(IEnumerable> sources)
+ {
+ ArgumentExceptionHelper.ThrowIfNull(sources);
+
+ return FromEnumerable(sources).Chain();
+ }
+
+ ///