Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .changeset/sse-solid2-async-reactivity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
---
"@solid-primitives/sse": minor
---

Align `createSSE` with Solid 2.0 async reactivity patterns

### Breaking changes

**`pending` removed from `SSEReturn`**

Use `<Loading>` for initial load UI and `isPending(() => data())` for stale-while-revalidating. Both are re-exported from this package.

```tsx
// Before
const { data, pending } = createSSE(url);
<Show when={!pending()}><p>{data()}</p></Show>

// After — declarative initial load
<Loading fallback={<p>Connecting…</p>}>
<p>{data()}</p>
</Loading>

// After — stale-while-revalidating (only true once a value exists and new data is pending)
<Show when={isPending(() => data())}><p>Refreshing…</p></Show>
```

**`error` removed from `SSEReturn`**

Terminal errors (connection CLOSED with no retries left) now propagate through `data()` to `<Errored>`. Non-terminal errors (browser reconnecting) are still surfaced via `onError` callback.

```tsx
// Before
const { data, error } = createSSE(url);
<Show when={error()}><p>Error: {error()?.type}</p></Show>

// After — single error path via Errored boundary
<Errored fallback={err => <p>Connection failed</p>}>
<Loading fallback={<p>Connecting…</p>}>
<p>{data()}</p>
</Loading>
</Errored>
```

**`data` type narrowed from `Accessor<T | undefined>` to `Accessor<T>`**

The `| undefined` loading hole is removed. When `data()` is not ready it throws `NotReadyError` (caught by `<Loading>`) or the terminal error (caught by `<Errored>`); it never returns `undefined` due to pending state.

**SSR stub**: `data()` now throws `NotReadyError` on the server when no `initialValue` is provided (consistent with browser behaviour). Provide `initialValue` for a non-throwing SSR default.

### New primitives

**`makeSSEAsyncIterable<T>(url, options?)`**

Wraps an SSE endpoint as a standard `AsyncIterable<T>`. Each message is one yielded value; terminal errors are thrown. Cleanup runs automatically when the iterator is abandoned.

```ts
for await (const msg of makeSSEAsyncIterable<string>(url)) {
console.log(msg);
}
```

**`createSSEStream<T>(url, options?)`**

Minimal reactive alternative to `createSSE` — returns only a `data: Accessor<T>` backed by an async iterable. Same `<Loading>` / `<Errored>` integration, no `source` / `readyState` / `close` / `reconnect`.

```ts
const data = createSSEStream<{ msg: string }>(url, { transform: JSON.parse });
```
183 changes: 156 additions & 27 deletions packages/sse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
[![version](https://img.shields.io/npm/v/@solid-primitives/sse?style=for-the-badge)](https://www.npmjs.com/package/@solid-primitives/sse)
[![stage](https://img.shields.io/endpoint?style=for-the-badge&url=https%3A%2F%2Fraw.githubusercontent.com%2Fsolidjs-community%2Fsolid-primitives%2Fmain%2Fassets%2Fbadges%2Fstage-2.json)](https://github.com/solidjs-community/solid-primitives#contribution-process)

Primitives for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) using the browser's built-in `EventSource` API.
Primitives for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) using the browser's built-in `EventSource` API. Designed for Solid 2.0's async reactivity model.

- [`makeSSE`](#makesse) — Base non-reactive primitive. Creates an `EventSource` and returns a cleanup function. No Solid lifecycle.
- [`createSSE`](#createsse) — Reactive primitive. Accepts a reactive URL, integrates with Solid's owner lifecycle, and returns signals for `data`, `error`, and `readyState`.
- [`createSSE`](#createsse) — Reactive primitive. Accepts a reactive URL, integrates with Solid's owner lifecycle, and returns signals for `data` and `readyState`.
- [`makeSSEAsyncIterable`](#makesseasynciterable) — Wraps an SSE endpoint as an `AsyncIterable<T>`. Non-reactive foundation.
- [`createSSEStream`](#createssesstream) — Minimal reactive stream: just a `data` accessor backed by an async iterable.
- [`makeSSEWorker`](#running-sse-in-a-worker) — Runs the SSE connection inside a Web Worker or SharedWorker.
- [Built-in transformers](#built-in-transformers) — `json`, `ndjson`, `lines`, `number`, `safe`, `pipe`.

Expand Down Expand Up @@ -70,42 +72,87 @@ Reactive SSE primitive. Connects on creation, closes when the owner is disposed,
```ts
import { createSSE, SSEReadyState } from "@solid-primitives/sse";

const { data, readyState, error, close, reconnect } = createSSE<{ message: string }>(
const { data, readyState, close, reconnect } = createSSE<{ message: string }>(
"https://api.example.com/events",
{
transform: JSON.parse,
reconnect: { retries: 3, delay: 2000 },
},
);
```

### Loading and error boundaries

`data()` integrates with Solid 2.0's async reactivity:

- **`<Loading>`** — shows fallback while `data()` is pending (before the first message arrives).
- **`<Errored>`** — catches terminal errors (connection CLOSED with no retries left) thrown through `data()`.

```tsx
import { Loading, Errored } from "solid-js";
import { createSSE } from "@solid-primitives/sse";

const { data, close, reconnect } = createSSE<{ message: string }>(
"https://api.example.com/events",
{ transform: JSON.parse },
);

return (
<div>
<Show when={readyState() === SSEReadyState.OPEN} fallback={<p>Connecting…</p>}>
<p>Latest: {data()?.message ?? "—"}</p>
</Show>
<Show when={error()}>
<p style="color:red">Connection error</p>
<Errored fallback={err => <p style="color:red">Connection failed</p>}>
<Loading fallback={<p>Connecting…</p>}>
<p>Latest: {data().message}</p>
</Loading>
</Errored>
);
```

Non-terminal errors (while the browser is reconnecting automatically) are surfaced via the `onError` callback only — they don't interrupt the reactive graph.

### Stale-while-revalidating with `isPending`

After the first message has arrived, subsequent reconnects (URL change, `reconnect()` call) put the connection back into a pending state. Use `isPending` from Solid to show a subtle "refreshing" indicator without replacing the whole subtree:

```tsx
import { isPending } from "solid-js";
import { createSSE } from "@solid-primitives/sse";

const { data } = createSSE<{ msg: string }>(url, { transform: JSON.parse });

return (
<>
<Show when={isPending(() => data())}>
<p>Refreshing…</p>
</Show>
<button onClick={close}>Disconnect</button>
<button onClick={reconnect}>Reconnect</button>
</div>
<Loading fallback={<p>Connecting…</p>}>
<p>{data().msg}</p>
</Loading>
</>
);
```

### Reactive URL
> **Note:** `isPending` is `false` during the initial `<Loading>` fallback (no stale value yet). It becomes `true` only when a stale value exists and new data is pending — i.e., after a URL change or reconnect.

When the URL is a signal accessor, the connection is replaced whenever the URL changes:
### Reactive URL with `<Loading on=…>`

```ts
When the URL is a signal accessor, the connection is replaced whenever the URL changes. Use `<Loading>`'s `on` prop to re-show the fallback on each URL change:

```tsx
const [userId, setUserId] = createSignal("user-1");

const { data } = createSSE<Notification>(
() => `https://api.example.com/notifications/${userId()}`,
{ transform: JSON.parse },
);

return (
// on={userId()} re-shows the fallback each time userId changes while pending
<Loading on={userId()} fallback={<p>Connecting…</p>}>
<p>{data().message}</p>
</Loading>
);
```

Changing `userId()` will close the existing connection and open a new one to the updated URL.
Without `on`, `<Loading>` keeps showing stale content during revalidation. With `on`, it re-shows the fallback whenever the key changes and a new connection is establishing.

### Options

Expand All @@ -114,7 +161,7 @@ Changing `userId()` will close the existing connection and open a new one to the
| `withCredentials` | `boolean` | `false` | Send credentials with the request |
| `onOpen` | `(e: Event) => void` | — | Called when the connection opens |
| `onMessage` | `(e: MessageEvent) => void` | — | Called on each unnamed `message` event |
| `onError` | `(e: Event) => void` | — | Called on error |
| `onError` | `(e: Event) => void` | — | Called on error (terminal and transient) |
| `events` | `Record<string, (e: MessageEvent) => void>` | — | Handlers for named SSE event types |
| `initialValue` | `T` | `undefined` | Initial value of the `data` signal |
| `transform` | `(raw: string) => T` | identity | Parse raw string data, e.g. `JSON.parse` |
Expand All @@ -129,14 +176,22 @@ Changing `userId()` will close the existing connection and open a new one to the

### Return value

| Property | Type | Description |
| ------------ | ---------------------------------------- | ------------------------------------------------ |
| `source` | `Accessor<SSESourceHandle \| undefined>` | Underlying source instance; `undefined` on SSR |
| `data` | `Accessor<T \| undefined>` | Latest message data |
| `error` | `Accessor<Event \| undefined>` | Latest error event |
| `readyState` | `Accessor<SSEReadyState>` | `SSEReadyState.CONNECTING` / `.OPEN` / `.CLOSED` |
| `close` | `VoidFunction` | Close the connection |
| `reconnect` | `VoidFunction` | Force-close and reopen |
| Property | Type | Description |
| ------------ | ---------------------------------------- | ---------------------------------------------------------------------------------------- |
| `source` | `Accessor<SSESourceHandle \| undefined>` | Underlying source instance; `undefined` on SSR |
| `data` | `Accessor<T>` | Latest message data; throws `NotReadyError` until first message, terminal errors thereafter |
| `readyState` | `Accessor<SSEReadyState>` | `SSEReadyState.CONNECTING` / `.OPEN` / `.CLOSED` |
| `close` | `VoidFunction` | Close the connection |
| `reconnect` | `VoidFunction` | Force-close and reopen; resets `data` to pending |

### Initial value

Provide `initialValue` to skip the pending state entirely — `data()` returns it immediately with no `<Loading>` fallback needed:

```ts
const { data } = createSSE(url, { initialValue: [] as string[] });
// data() === [] immediately, no Loading needed
```

### `SSEReadyState`

Expand All @@ -154,6 +209,80 @@ SSEReadyState.CLOSED; // 2

`EventSource` has native browser-level reconnection built in. For transient network drops the browser automatically retries. The `reconnect` option in `createSSE` is for _application-level_ reconnection — it fires only when `readyState` becomes `SSEReadyState.CLOSED`, meaning the browser has given up entirely. You generally do not need `reconnect: true` for normal usage.

## `makeSSEAsyncIterable`

Wraps an SSE endpoint as a standard `AsyncIterable<T>`. Each SSE message becomes one yielded value; terminal errors (connection CLOSED) are thrown by the iterator. Cleanup runs automatically when the iterator is abandoned via `return()`.

Use this as a non-reactive building block: integrate it with a `for await…of` loop, pass it to your own `createMemo`, or compose it with other async utilities.

```ts
import { makeSSEAsyncIterable } from "@solid-primitives/sse";

const iterable = makeSSEAsyncIterable<string>("https://api.example.com/events");

for await (const msg of iterable) {
console.log(msg);
}
```

### Definition

```ts
function makeSSEAsyncIterable<T = string>(
url: string | URL,
options?: CreateSSEStreamOptions<T>,
): AsyncIterable<T>;

type CreateSSEStreamOptions<T> = {
withCredentials?: boolean;
onOpen?: (event: Event) => void;
onError?: (event: Event) => void;
transform?: (raw: string) => T;
events?: Record<string, (event: MessageEvent) => void>;
source?: SSESourceFn;
};
```

## `createSSEStream`

A minimal reactive alternative to `createSSE` that returns only a `data` accessor. Internally it drives an `AsyncIterable` produced by `makeSSEAsyncIterable`, giving the same `<Loading>` / `<Errored>` integration with less API surface.

Use this when you only need the stream values and don't need access to `source`, `readyState`, `close`, or `reconnect`.

```ts
import { createSSEStream } from "@solid-primitives/sse";

const data = createSSEStream<{ msg: string }>(url, { transform: JSON.parse });

return (
<Errored fallback={err => <p>Connection failed</p>}>
<Loading fallback={<p>Connecting…</p>}>
<p>{data().msg}</p>
</Loading>
</Errored>
);
```

Reactive URL is supported — the stream reconnects automatically when the URL signal changes:

```ts
const [userId, setUserId] = createSignal("user-1");

const data = createSSEStream<Notification>(
() => `https://api.example.com/notifications/${userId()}`,
{ transform: JSON.parse },
);
```

### Definition

```ts
function createSSEStream<T = string>(
url: MaybeAccessor<string>,
options?: CreateSSEStreamOptions<T>,
): Accessor<T>;
```

## Integration with `@solid-primitives/event-bus`

Because `bus.emit` matches the `(event: MessageEvent) => void` shape of `onMessage`, you can wire them directly:
Expand Down Expand Up @@ -214,7 +343,7 @@ return <For each={messages}>{msg => <p>{msg}</p>}</For>;

## Built-in transformers

Ready-made `transform` functions for the most common SSE data formats. Pass one as the `transform` option to `createSSE`:
Ready-made `transform` functions for the most common SSE data formats. Pass one as the `transform` option to `createSSE` or `createSSEStream`:

```ts
import { createSSE, json } from "@solid-primitives/sse";
Expand Down Expand Up @@ -357,7 +486,7 @@ const worker = new Worker(new URL("@solid-primitives/sse/worker-handler", import
type: "module",
});

const { data, readyState, error, close, reconnect } = createSSE<{ msg: string }>(
const { data, readyState, close, reconnect } = createSSE<{ msg: string }>(
"https://api.example.com/events",
{
source: makeSSEWorker(worker),
Expand Down
6 changes: 3 additions & 3 deletions packages/sse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@
"scripts": {
"dev": "node --import=@nothing-but/node-resolve-ts --experimental-transform-types ../../scripts/dev.ts",
"build": "node --import=@nothing-but/node-resolve-ts --experimental-transform-types ../../scripts/build.ts",
"vitest": "vitest -c ../../configs/vitest.config.ts",
"vitest": "vitest -c vitest.config.ts",
"test": "pnpm run vitest",
"test:ssr": "pnpm run vitest --mode ssr"
},
"dependencies": {
"@solid-primitives/utils": "workspace:^"
},
"peerDependencies": {
"solid-js": "^1.6.12"
"solid-js": "2.0.0-beta.7"
},
"devDependencies": {
"solid-js": "^1.9.7"
"solid-js": "2.0.0-beta.7"
}
}
9 changes: 9 additions & 0 deletions packages/sse/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export {
makeSSE,
createSSE,
makeSSEAsyncIterable,
createSSEStream,
SSEReadyState,
type SSEOptions,
type SSEReconnectOptions,
Expand All @@ -9,6 +11,13 @@ export {
type SSEReadyStateValue,
type CreateSSEOptions,
type SSEReturn,
type CreateSSEStreamOptions,
} from "./sse.js";

export { json, ndjson, lines, number, safe, pipe } from "./transform.js";

// Re-export Solid 2.0 async primitives commonly used with SSE primitives:
// - isPending(() => data()) — true during stale-while-revalidating (not initial load)
// - onSettled(() => ...) — runs when the first message arrives
// - NotReadyError — thrown by data() while pending (caught by <Loading>)
export { isPending, onSettled, NotReadyError } from "solid-js";
Loading