From b8c41b7fc57916fd778754926c42e851bff7e937 Mon Sep 17 00:00:00 2001 From: moxi000 Date: Thu, 9 Apr 2026 12:45:18 +0800 Subject: [PATCH 1/3] fix(stream): decode SSE/hex audio for --stream and handle EPIPE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `speech synthesize --stream` and `music generate --stream` were writing the upstream Server-Sent Events stream verbatim to stdout — JSON envelopes containing hex-encoded audio chunks — so the documented `| mpv -` pipe example produced "Failed to recognize file format". The decode logic already exists for `--out`; the stream path just never used it. This adds a shared `pipeAudioSseToStdout` helper that parses the SSE frames, hex-decodes `data.audio`, and writes raw audio bytes to stdout, buffering across chunk boundaries. Both stream commands now use it. Also installs an EPIPE handler on stdout so consumers that close the pipe early (`... | head`, a player exiting, mpv missing) cause a clean exit instead of an unhandled `'error'` event crash with a Node stack trace. Fixes #54 --- src/commands/music/generate.ts | 15 +++--- src/commands/speech/synthesize.ts | 15 +++--- src/utils/audio-stream.ts | 88 +++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 14 deletions(-) create mode 100644 src/utils/audio-stream.ts diff --git a/src/commands/music/generate.ts b/src/commands/music/generate.ts index 0dd023c..8baf407 100644 --- a/src/commands/music/generate.ts +++ b/src/commands/music/generate.ts @@ -6,6 +6,7 @@ import { musicEndpoint } from '../../client/endpoints'; import { formatOutput, detectOutputFormat } from '../../output/formatter'; import { saveAudioOutput } from '../../output/audio'; import { readTextFromPathOrStdin } from '../../utils/fs'; +import { pipeAudioSseToStdout } from '../../utils/audio-stream'; import type { Config } from '../../config/schema'; import type { GlobalFlags } from '../../types/flags'; import type { MusicRequest, MusicResponse } from '../../types/api'; @@ -149,14 +150,14 @@ export default defineCommand({ if (flags.stream) { const res = await request(config, { url, method: 'POST', body, stream: true }); - const reader = res.body?.getReader(); - if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - process.stdout.write(value); + try { + await pipeAudioSseToStdout(res.body); + } catch (err) { + if (err instanceof Error && err.message === 'No response body') { + throw new CLIError('No response body', ExitCode.GENERAL); + } + throw err; } - reader.releaseLock(); return; } diff --git a/src/commands/speech/synthesize.ts b/src/commands/speech/synthesize.ts index 5ad788e..9cb865b 100644 --- a/src/commands/speech/synthesize.ts +++ b/src/commands/speech/synthesize.ts @@ -6,6 +6,7 @@ import { speechEndpoint } from '../../client/endpoints'; import { detectOutputFormat, formatOutput } from '../../output/formatter'; import { saveAudioOutput } from '../../output/audio'; import { readTextFromPathOrStdin } from '../../utils/fs'; +import { pipeAudioSseToStdout } from '../../utils/audio-stream'; import type { Config } from '../../config/schema'; import type { GlobalFlags } from '../../types/flags'; import type { SpeechRequest, SpeechResponse } from '../../types/api'; @@ -98,14 +99,14 @@ export default defineCommand({ if (flags.stream) { const res = await request(config, { url, method: 'POST', body, stream: true }); - const reader = res.body?.getReader(); - if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - process.stdout.write(value); + try { + await pipeAudioSseToStdout(res.body); + } catch (err) { + if (err instanceof Error && err.message === 'No response body') { + throw new CLIError('No response body', ExitCode.GENERAL); + } + throw err; } - reader.releaseLock(); return; } diff --git a/src/utils/audio-stream.ts b/src/utils/audio-stream.ts new file mode 100644 index 0000000..08b382e --- /dev/null +++ b/src/utils/audio-stream.ts @@ -0,0 +1,88 @@ +/** + * Helpers for piping streamed TTS / music responses to stdout as raw audio. + * + * The MiniMax streaming endpoints return a Server-Sent Events stream of JSON + * envelopes whose `data.audio` field is a hex-encoded chunk of the target + * audio format. The `--stream` CLI flag is documented as writing *raw audio* + * to stdout (so it can be piped directly into players such as `mpv -`), so + * this helper parses the SSE frames, decodes the hex payloads, and writes + * the decoded bytes to stdout. + */ + +/** + * Install a one-shot EPIPE handler on stdout so that downstream consumers + * closing the pipe early (e.g. `... | head`, or a player that exits) does + * not crash the process with an unhandled `'error'` event. + */ +export function installStdoutEpipeHandler(): void { + process.stdout.on('error', (err: NodeJS.ErrnoException) => { + if (err && err.code === 'EPIPE') { + process.exit(0); + } + throw err; + }); +} + +/** + * Consume a fetch-style ReadableStream of SSE bytes and write the decoded + * raw audio bytes (from `data.audio` hex fields) to stdout. + */ +export async function pipeAudioSseToStdout( + body: ReadableStream | null | undefined, +): Promise { + const reader = body?.getReader(); + if (!reader) { + throw new Error('No response body'); + } + + installStdoutEpipeHandler(); + + const decoder = new TextDecoder(); + let buffer = ''; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + + // SSE events are separated by blank lines. + let sep: number; + while ((sep = buffer.indexOf('\n\n')) >= 0) { + const event = buffer.slice(0, sep); + buffer = buffer.slice(sep + 2); + writeEvent(event); + } + } + + // Flush any trailing event without a terminating blank line. + buffer += decoder.decode(); + if (buffer.length > 0) { + writeEvent(buffer); + } + } finally { + reader.releaseLock(); + } +} + +function writeEvent(event: string): void { + for (const rawLine of event.split('\n')) { + if (!rawLine.startsWith('data:')) continue; + // Per SSE spec, an optional single space after `data:` should be stripped. + const payload = rawLine.slice(5).replace(/^ /, ''); + if (!payload || payload === '[DONE]') continue; + + let parsed: { data?: { audio?: string } }; + try { + parsed = JSON.parse(payload); + } catch { + // Non-JSON keepalive or comment — skip. + continue; + } + + const hex = parsed?.data?.audio; + if (typeof hex === 'string' && hex.length > 0) { + process.stdout.write(Buffer.from(hex, 'hex')); + } + } +} From 84920bcefe757d4d93d6d02107f91b13b6df34d1 Mon Sep 17 00:00:00 2001 From: moxi000 Date: Thu, 9 Apr 2026 12:50:19 +0800 Subject: [PATCH 2/3] fix(stream): skip terminal summary event to avoid duplicated audio MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The MiniMax streaming TTS endpoint emits N incremental SSE events followed by a final summary event that re-sends the *entire* audio along with extra_info / metadata (this is what --out saves). Naively concatenating all events produced an MP3 with the full file appended after the streamed frames, leaving broken framing in the middle — ffprobe accepted the file but mpv reported 'mp3float: Header missing / Error decoding audio'. Discriminate the summary event by the presence of top-level 'extra_info' (every event carries 'trace_id', so it can't be used). Verified end-to-end: piping --stream into mpv now plays cleanly with no decoder warnings, and the resulting bytes contain only the incremental frames. --- src/utils/audio-stream.ts | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/utils/audio-stream.ts b/src/utils/audio-stream.ts index 08b382e..1b1c2c6 100644 --- a/src/utils/audio-stream.ts +++ b/src/utils/audio-stream.ts @@ -65,6 +65,18 @@ export async function pipeAudioSseToStdout( } } +interface SseEnvelope { + data?: { audio?: string; status?: number }; + // `extra_info` is only present on the terminal "summary" event, which + // re-sends the full audio plus metadata. We must skip that event in + // streaming mode, otherwise the complete file gets appended after the + // incremental frames and the resulting MP3 contains duplicated audio + // with broken framing. (Note: every event carries `trace_id`, so it + // can't be used as the discriminator.) + extra_info?: unknown; + trace_id?: string; +} + function writeEvent(event: string): void { for (const rawLine of event.split('\n')) { if (!rawLine.startsWith('data:')) continue; @@ -72,14 +84,19 @@ function writeEvent(event: string): void { const payload = rawLine.slice(5).replace(/^ /, ''); if (!payload || payload === '[DONE]') continue; - let parsed: { data?: { audio?: string } }; + let parsed: SseEnvelope; try { - parsed = JSON.parse(payload); + parsed = JSON.parse(payload) as SseEnvelope; } catch { // Non-JSON keepalive or comment — skip. continue; } + // Skip the terminal summary event (it re-sends the entire audio). + if (parsed.extra_info !== undefined) { + continue; + } + const hex = parsed?.data?.audio; if (typeof hex === 'string' && hex.length > 0) { process.stdout.write(Buffer.from(hex, 'hex')); From 070699ad60276ba6a68da8ba24ae0bc78a0ccf16 Mon Sep 17 00:00:00 2001 From: moxi000 Date: Thu, 9 Apr 2026 12:54:20 +0800 Subject: [PATCH 3/3] refactor(stream): address Copilot review feedback - Reuse the existing parseSSE generator from src/client/stream.ts instead of hand-rolling SSE framing. parseSSE already handles multi-line data fields, comments, and trailing buffer flush correctly, eliminating duplicated and less spec-complete parsing. - Make installStdoutEpipeHandler idempotent via a module-level flag, so repeated calls don't register multiple listeners and trigger MaxListenersExceededWarning in tests or long-lived processes. - Export a dedicated NoResponseBodyError class instead of relying on err.message string matching for control flow in callers. - Honor stdout backpressure: await 'drain' when write() returns false, so long streams don't accumulate large buffers in memory. - pipeAudioSseToStdout now takes the Response directly (parseSSE owns the body reader), simplifying both call sites. --- src/commands/music/generate.ts | 6 +- src/commands/speech/synthesize.ts | 6 +- src/utils/audio-stream.ts | 115 +++++++++++++----------------- 3 files changed, 57 insertions(+), 70 deletions(-) diff --git a/src/commands/music/generate.ts b/src/commands/music/generate.ts index 8baf407..49f2ed1 100644 --- a/src/commands/music/generate.ts +++ b/src/commands/music/generate.ts @@ -6,7 +6,7 @@ import { musicEndpoint } from '../../client/endpoints'; import { formatOutput, detectOutputFormat } from '../../output/formatter'; import { saveAudioOutput } from '../../output/audio'; import { readTextFromPathOrStdin } from '../../utils/fs'; -import { pipeAudioSseToStdout } from '../../utils/audio-stream'; +import { pipeAudioSseToStdout, NoResponseBodyError } from '../../utils/audio-stream'; import type { Config } from '../../config/schema'; import type { GlobalFlags } from '../../types/flags'; import type { MusicRequest, MusicResponse } from '../../types/api'; @@ -151,9 +151,9 @@ export default defineCommand({ if (flags.stream) { const res = await request(config, { url, method: 'POST', body, stream: true }); try { - await pipeAudioSseToStdout(res.body); + await pipeAudioSseToStdout(res); } catch (err) { - if (err instanceof Error && err.message === 'No response body') { + if (err instanceof NoResponseBodyError) { throw new CLIError('No response body', ExitCode.GENERAL); } throw err; diff --git a/src/commands/speech/synthesize.ts b/src/commands/speech/synthesize.ts index 9cb865b..0f46066 100644 --- a/src/commands/speech/synthesize.ts +++ b/src/commands/speech/synthesize.ts @@ -6,7 +6,7 @@ import { speechEndpoint } from '../../client/endpoints'; import { detectOutputFormat, formatOutput } from '../../output/formatter'; import { saveAudioOutput } from '../../output/audio'; import { readTextFromPathOrStdin } from '../../utils/fs'; -import { pipeAudioSseToStdout } from '../../utils/audio-stream'; +import { pipeAudioSseToStdout, NoResponseBodyError } from '../../utils/audio-stream'; import type { Config } from '../../config/schema'; import type { GlobalFlags } from '../../types/flags'; import type { SpeechRequest, SpeechResponse } from '../../types/api'; @@ -100,9 +100,9 @@ export default defineCommand({ if (flags.stream) { const res = await request(config, { url, method: 'POST', body, stream: true }); try { - await pipeAudioSseToStdout(res.body); + await pipeAudioSseToStdout(res); } catch (err) { - if (err instanceof Error && err.message === 'No response body') { + if (err instanceof NoResponseBodyError) { throw new CLIError('No response body', ExitCode.GENERAL); } throw err; diff --git a/src/utils/audio-stream.ts b/src/utils/audio-stream.ts index 1b1c2c6..72506e8 100644 --- a/src/utils/audio-stream.ts +++ b/src/utils/audio-stream.ts @@ -5,16 +5,45 @@ * envelopes whose `data.audio` field is a hex-encoded chunk of the target * audio format. The `--stream` CLI flag is documented as writing *raw audio* * to stdout (so it can be piped directly into players such as `mpv -`), so - * this helper parses the SSE frames, decodes the hex payloads, and writes + * this helper consumes the SSE stream, decodes the hex payloads, and writes * the decoded bytes to stdout. + * + * The stream contains N incremental chunk events followed by a terminal + * "summary" event that re-sends the full audio plus metadata (this is what + * `--out` saves). The summary must be skipped in streaming mode, otherwise + * the complete file gets appended after the incremental frames and the + * resulting MP3 contains duplicated audio with broken framing. The summary + * is identified by the presence of a top-level `extra_info` field — note + * that `trace_id` is on every event, so it cannot be used as the + * discriminator. */ +import { parseSSE } from '../client/stream'; + +/** Thrown when the upstream response has no body to stream from. */ +export class NoResponseBodyError extends Error { + constructor() { + super('No response body'); + this.name = 'NoResponseBodyError'; + } +} + +interface SseEnvelope { + data?: { audio?: string; status?: number }; + extra_info?: unknown; + trace_id?: string; +} + +let stdoutEpipeHandlerInstalled = false; + /** - * Install a one-shot EPIPE handler on stdout so that downstream consumers - * closing the pipe early (e.g. `... | head`, or a player that exits) does - * not crash the process with an unhandled `'error'` event. + * Install (idempotently) an EPIPE handler on stdout so that downstream + * consumers closing the pipe early (e.g. `... | head`, or a player that + * exits) cause a clean exit instead of an unhandled `'error'` event. */ export function installStdoutEpipeHandler(): void { + if (stdoutEpipeHandlerInstalled) return; + stdoutEpipeHandlerInstalled = true; process.stdout.on('error', (err: NodeJS.ErrnoException) => { if (err && err.code === 'EPIPE') { process.exit(0); @@ -24,82 +53,40 @@ export function installStdoutEpipeHandler(): void { } /** - * Consume a fetch-style ReadableStream of SSE bytes and write the decoded - * raw audio bytes (from `data.audio` hex fields) to stdout. + * Consume a fetch-style SSE response and write the decoded raw audio bytes + * (from `data.audio` hex fields) to stdout, honoring backpressure. + * + * @throws {NoResponseBodyError} if `response.body` is missing. */ -export async function pipeAudioSseToStdout( - body: ReadableStream | null | undefined, -): Promise { - const reader = body?.getReader(); - if (!reader) { - throw new Error('No response body'); +export async function pipeAudioSseToStdout(response: Response): Promise { + if (!response.body) { + throw new NoResponseBodyError(); } installStdoutEpipeHandler(); - const decoder = new TextDecoder(); - let buffer = ''; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - buffer += decoder.decode(value, { stream: true }); - - // SSE events are separated by blank lines. - let sep: number; - while ((sep = buffer.indexOf('\n\n')) >= 0) { - const event = buffer.slice(0, sep); - buffer = buffer.slice(sep + 2); - writeEvent(event); - } - } - - // Flush any trailing event without a terminating blank line. - buffer += decoder.decode(); - if (buffer.length > 0) { - writeEvent(buffer); - } - } finally { - reader.releaseLock(); - } -} - -interface SseEnvelope { - data?: { audio?: string; status?: number }; - // `extra_info` is only present on the terminal "summary" event, which - // re-sends the full audio plus metadata. We must skip that event in - // streaming mode, otherwise the complete file gets appended after the - // incremental frames and the resulting MP3 contains duplicated audio - // with broken framing. (Note: every event carries `trace_id`, so it - // can't be used as the discriminator.) - extra_info?: unknown; - trace_id?: string; -} - -function writeEvent(event: string): void { - for (const rawLine of event.split('\n')) { - if (!rawLine.startsWith('data:')) continue; - // Per SSE spec, an optional single space after `data:` should be stripped. - const payload = rawLine.slice(5).replace(/^ /, ''); + for await (const event of parseSSE(response)) { + const payload = event.data; if (!payload || payload === '[DONE]') continue; let parsed: SseEnvelope; try { parsed = JSON.parse(payload) as SseEnvelope; } catch { - // Non-JSON keepalive or comment — skip. + // Non-JSON keepalive — skip. continue; } // Skip the terminal summary event (it re-sends the entire audio). - if (parsed.extra_info !== undefined) { - continue; - } + if (parsed.extra_info !== undefined) continue; + + const hex = parsed.data?.audio; + if (typeof hex !== 'string' || hex.length === 0) continue; - const hex = parsed?.data?.audio; - if (typeof hex === 'string' && hex.length > 0) { - process.stdout.write(Buffer.from(hex, 'hex')); + const chunk = Buffer.from(hex, 'hex'); + if (!process.stdout.write(chunk)) { + // Honor backpressure: pause until stdout drains. + await new Promise((resolve) => process.stdout.once('drain', resolve)); } } }