diff --git a/src/commands/music/generate.ts b/src/commands/music/generate.ts index 0dd023c..49f2ed1 100644 --- a/src/commands/music/generate.ts +++ b/src/commands/music/generate.ts @@ -6,6 +6,7 @@ import { musicEndpoint } from '../../client/endpoints'; import { formatOutput, detectOutputFormat } from '../../output/formatter'; import { saveAudioOutput } from '../../output/audio'; import { readTextFromPathOrStdin } from '../../utils/fs'; +import { pipeAudioSseToStdout, NoResponseBodyError } from '../../utils/audio-stream'; import type { Config } from '../../config/schema'; import type { GlobalFlags } from '../../types/flags'; import type { MusicRequest, MusicResponse } from '../../types/api'; @@ -149,14 +150,14 @@ export default defineCommand({ if (flags.stream) { const res = await request(config, { url, method: 'POST', body, stream: true }); - const reader = res.body?.getReader(); - if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - process.stdout.write(value); + try { + await pipeAudioSseToStdout(res); + } catch (err) { + if (err instanceof NoResponseBodyError) { + throw new CLIError('No response body', ExitCode.GENERAL); + } + throw err; } - reader.releaseLock(); return; } diff --git a/src/commands/speech/synthesize.ts b/src/commands/speech/synthesize.ts index 5ad788e..0f46066 100644 --- a/src/commands/speech/synthesize.ts +++ b/src/commands/speech/synthesize.ts @@ -6,6 +6,7 @@ import { speechEndpoint } from '../../client/endpoints'; import { detectOutputFormat, formatOutput } from '../../output/formatter'; import { saveAudioOutput } from '../../output/audio'; import { readTextFromPathOrStdin } from '../../utils/fs'; +import { pipeAudioSseToStdout, NoResponseBodyError } from '../../utils/audio-stream'; import type { Config } from '../../config/schema'; import type { GlobalFlags } from '../../types/flags'; import type { SpeechRequest, SpeechResponse } from '../../types/api'; @@ -98,14 +99,14 @@ export default defineCommand({ if (flags.stream) { const res = await request(config, { url, method: 'POST', body, stream: true }); - const reader = res.body?.getReader(); - if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - process.stdout.write(value); + try { + await pipeAudioSseToStdout(res); + } catch (err) { + if (err instanceof NoResponseBodyError) { + throw new CLIError('No response body', ExitCode.GENERAL); + } + throw err; } - reader.releaseLock(); return; } diff --git a/src/utils/audio-stream.ts b/src/utils/audio-stream.ts new file mode 100644 index 0000000..72506e8 --- /dev/null +++ b/src/utils/audio-stream.ts @@ -0,0 +1,92 @@ +/** + * Helpers for piping streamed TTS / music responses to stdout as raw audio. + * + * The MiniMax streaming endpoints return a Server-Sent Events stream of JSON + * envelopes whose `data.audio` field is a hex-encoded chunk of the target + * audio format. The `--stream` CLI flag is documented as writing *raw audio* + * to stdout (so it can be piped directly into players such as `mpv -`), so + * this helper consumes the SSE stream, decodes the hex payloads, and writes + * the decoded bytes to stdout. + * + * The stream contains N incremental chunk events followed by a terminal + * "summary" event that re-sends the full audio plus metadata (this is what + * `--out` saves). The summary must be skipped in streaming mode, otherwise + * the complete file gets appended after the incremental frames and the + * resulting MP3 contains duplicated audio with broken framing. The summary + * is identified by the presence of a top-level `extra_info` field — note + * that `trace_id` is on every event, so it cannot be used as the + * discriminator. + */ + +import { parseSSE } from '../client/stream'; + +/** Thrown when the upstream response has no body to stream from. */ +export class NoResponseBodyError extends Error { + constructor() { + super('No response body'); + this.name = 'NoResponseBodyError'; + } +} + +interface SseEnvelope { + data?: { audio?: string; status?: number }; + extra_info?: unknown; + trace_id?: string; +} + +let stdoutEpipeHandlerInstalled = false; + +/** + * Install (idempotently) an EPIPE handler on stdout so that downstream + * consumers closing the pipe early (e.g. `... | head`, or a player that + * exits) cause a clean exit instead of an unhandled `'error'` event. + */ +export function installStdoutEpipeHandler(): void { + if (stdoutEpipeHandlerInstalled) return; + stdoutEpipeHandlerInstalled = true; + process.stdout.on('error', (err: NodeJS.ErrnoException) => { + if (err && err.code === 'EPIPE') { + process.exit(0); + } + throw err; + }); +} + +/** + * Consume a fetch-style SSE response and write the decoded raw audio bytes + * (from `data.audio` hex fields) to stdout, honoring backpressure. + * + * @throws {NoResponseBodyError} if `response.body` is missing. + */ +export async function pipeAudioSseToStdout(response: Response): Promise { + if (!response.body) { + throw new NoResponseBodyError(); + } + + installStdoutEpipeHandler(); + + for await (const event of parseSSE(response)) { + const payload = event.data; + if (!payload || payload === '[DONE]') continue; + + let parsed: SseEnvelope; + try { + parsed = JSON.parse(payload) as SseEnvelope; + } catch { + // Non-JSON keepalive — skip. + continue; + } + + // Skip the terminal summary event (it re-sends the entire audio). + if (parsed.extra_info !== undefined) continue; + + const hex = parsed.data?.audio; + if (typeof hex !== 'string' || hex.length === 0) continue; + + const chunk = Buffer.from(hex, 'hex'); + if (!process.stdout.write(chunk)) { + // Honor backpressure: pause until stdout drains. + await new Promise((resolve) => process.stdout.once('drain', resolve)); + } + } +}