fix(stream): decode SSE/hex audio for --stream and handle EPIPE#55
fix(stream): decode SSE/hex audio for --stream and handle EPIPE#55moxi000 wants to merge 3 commits intoMiniMax-AI:mainfrom
Conversation
`speech synthesize --stream` and `music generate --stream` were writing the upstream Server-Sent Events stream verbatim to stdout — JSON envelopes containing hex-encoded audio chunks — so the documented `| mpv -` pipe example produced "Failed to recognize file format". The decode logic already exists for `--out`; the stream path just never used it. This adds a shared `pipeAudioSseToStdout` helper that parses the SSE frames, hex-decodes `data.audio`, and writes raw audio bytes to stdout, buffering across chunk boundaries. Both stream commands now use it. Also installs an EPIPE handler on stdout so consumers that close the pipe early (`... | head`, a player exiting, mpv missing) cause a clean exit instead of an unhandled `'error'` event crash with a Node stack trace. Fixes MiniMax-AI#54
There was a problem hiding this comment.
Pull request overview
This PR fixes the --stream behavior for speech/music commands so they output decoded raw audio bytes to stdout (instead of the upstream SSE/JSON/hex envelopes) and adds stdout EPIPE handling to avoid crashes when downstream pipes close early.
Changes:
- Added a shared streaming helper to parse SSE frames, JSON-decode
data:payloads, hex-decodedata.audio, and write raw bytes to stdout. - Updated
speech synthesize --streamandmusic generate --streamto use the shared helper. - Added stdout
EPIPEhandling so early pipe-close exits cleanly.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
src/utils/audio-stream.ts |
New helper for decoding SSE hex audio to raw stdout + stdout EPIPE handling. |
src/commands/speech/synthesize.ts |
Routes --stream output through the new audio streaming helper. |
src/commands/music/generate.ts |
Routes --stream output through the new audio streaming helper. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| export function installStdoutEpipeHandler(): void { | ||
| process.stdout.on('error', (err: NodeJS.ErrnoException) => { | ||
| if (err && err.code === 'EPIPE') { | ||
| process.exit(0); | ||
| } | ||
| throw err; | ||
| }); |
There was a problem hiding this comment.
installStdoutEpipeHandler is described as “one-shot”, but it uses process.stdout.on('error', ...) and is called from pipeAudioSseToStdout, so repeated uses will register multiple listeners (risking MaxListenersExceededWarning in tests or long-lived processes). Consider guarding with a module-level flag and/or using once instead of on.
src/utils/audio-stream.ts
Outdated
| // SSE events are separated by blank lines. | ||
| let sep: number; | ||
| while ((sep = buffer.indexOf('\n\n')) >= 0) { | ||
| const event = buffer.slice(0, sep); | ||
| buffer = buffer.slice(sep + 2); | ||
| writeEvent(event); | ||
| } |
There was a problem hiding this comment.
SSE framing here only looks for \n\n as the event separator. If the upstream stream uses CRLF (\r\n\r\n), events may never be split and writeEvent will receive strings containing \r, causing JSON.parse failures. Consider normalizing \r\n to \n before parsing and/or reusing the existing parseSSE helper in src/client/stream.ts to avoid duplicate (and more spec-complete) SSE parsing logic.
src/utils/audio-stream.ts
Outdated
| function writeEvent(event: string): void { | ||
| for (const rawLine of event.split('\n')) { | ||
| if (!rawLine.startsWith('data:')) continue; | ||
| // Per SSE spec, an optional single space after `data:` should be stripped. | ||
| const payload = rawLine.slice(5).replace(/^ /, ''); | ||
| if (!payload || payload === '[DONE]') continue; | ||
|
|
||
| let parsed: { data?: { audio?: string } }; | ||
| try { | ||
| parsed = JSON.parse(payload); | ||
| } catch { | ||
| // Non-JSON keepalive or comment — skip. | ||
| continue; |
There was a problem hiding this comment.
writeEvent parses each data: line independently. Per SSE spec, a single event can contain multiple data: lines that must be concatenated with \n; parsing line-by-line can drop valid payloads (e.g., pretty-printed JSON) and also fails if lines end with \r. Using the shared parseSSE generator (which already concatenates data lines) or trimming rawLine/payload would make this more robust.
src/utils/audio-stream.ts
Outdated
| const hex = parsed?.data?.audio; | ||
| if (typeof hex === 'string' && hex.length > 0) { | ||
| process.stdout.write(Buffer.from(hex, 'hex')); | ||
| } |
There was a problem hiding this comment.
process.stdout.write(Buffer.from(hex, 'hex')) ignores backpressure. For long streams this can buffer large amounts in memory if stdout can’t keep up. Consider making the write path async and awaiting drain when stdout.write() returns false (similar to how file download handles backpressure elsewhere in the codebase).
| try { | ||
| await pipeAudioSseToStdout(res.body); | ||
| } catch (err) { | ||
| if (err instanceof Error && err.message === 'No response body') { | ||
| throw new CLIError('No response body', ExitCode.GENERAL); | ||
| } | ||
| throw err; |
There was a problem hiding this comment.
This error handling relies on matching err.message === 'No response body' from the helper. Using error-message strings for control flow is brittle; consider exporting a dedicated error class/type (e.g., NoResponseBodyError) from audio-stream.ts, or have pipeAudioSseToStdout throw CLIError directly so callers don’t need to pattern-match messages.
| if (flags.stream) { | ||
| const res = await request(config, { url, method: 'POST', body, stream: true }); | ||
| const reader = res.body?.getReader(); | ||
| if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| process.stdout.write(value); | ||
| try { | ||
| await pipeAudioSseToStdout(res.body); | ||
| } catch (err) { |
There was a problem hiding this comment.
The --stream path now decodes SSE->JSON->hex->bytes, but there’s no automated test coverage for this behavior. Consider adding a test that mocks an SSE response (the test suite already has sseResponse in test/helpers/mock-server.ts) and asserts that the command writes the expected raw bytes to stdout and stops on [DONE].
| try { | ||
| await pipeAudioSseToStdout(res.body); | ||
| } catch (err) { | ||
| if (err instanceof Error && err.message === 'No response body') { | ||
| throw new CLIError('No response body', ExitCode.GENERAL); | ||
| } | ||
| throw err; | ||
| } |
There was a problem hiding this comment.
This error handling relies on matching err.message === 'No response body' from the helper. Using error-message strings for control flow is brittle; consider exporting a dedicated error class/type (e.g., NoResponseBodyError) from audio-stream.ts, or have pipeAudioSseToStdout throw CLIError directly so callers don’t need to pattern-match messages.
| if (flags.stream) { | ||
| const res = await request(config, { url, method: 'POST', body, stream: true }); | ||
| const reader = res.body?.getReader(); | ||
| if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| process.stdout.write(value); | ||
| try { | ||
| await pipeAudioSseToStdout(res.body); | ||
| } catch (err) { |
There was a problem hiding this comment.
The --stream path now decodes SSE->JSON->hex->bytes, but there’s no automated test coverage for this behavior. Consider adding a test that serves an SSE response via the existing mock server helper and verifies the generated bytes written to stdout for music generate --stream (including chunk-boundary buffering).
The MiniMax streaming TTS endpoint emits N incremental SSE events followed by a final summary event that re-sends the *entire* audio along with extra_info / metadata (this is what --out saves). Naively concatenating all events produced an MP3 with the full file appended after the streamed frames, leaving broken framing in the middle — ffprobe accepted the file but mpv reported 'mp3float: Header missing / Error decoding audio'. Discriminate the summary event by the presence of top-level 'extra_info' (every event carries 'trace_id', so it can't be used). Verified end-to-end: piping --stream into mpv now plays cleanly with no decoder warnings, and the resulting bytes contain only the incremental frames.
|
Pushed a follow-up fix (84920bc) after reproducing a remaining decode error reported by mpv: Root cause: the streaming endpoint emits N incremental SSE events followed by a terminal "summary" event that re-sends the entire audio plus metadata (this is what Captured raw SSE structure for Note that Re-verified after rebuild:
Worth confirming with maintainers whether the same envelope shape applies to |
- Reuse the existing parseSSE generator from src/client/stream.ts instead of hand-rolling SSE framing. parseSSE already handles multi-line data fields, comments, and trailing buffer flush correctly, eliminating duplicated and less spec-complete parsing. - Make installStdoutEpipeHandler idempotent via a module-level flag, so repeated calls don't register multiple listeners and trigger MaxListenersExceededWarning in tests or long-lived processes. - Export a dedicated NoResponseBodyError class instead of relying on err.message string matching for control flow in callers. - Honor stdout backpressure: await 'drain' when write() returns false, so long streams don't accumulate large buffers in memory. - pipeAudioSseToStdout now takes the Response directly (parseSSE owns the body reader), simplifying both call sites.
|
Pushed 070699a addressing the Copilot review:
Skipped for this round:
Re-verified end-to-end after rebuild:
|
|
Thanks for commit. Please solve conflicts and commit again. |
Summary
Fixes #54.
speech synthesize --streamandmusic generate --streamnow write decoded raw audio bytes to stdout, matching the documentation and making the existing| mpv --no-terminal -example actually work. Previously they wrote the upstream SSE stream verbatim (data: {"data":{"audio":"<hex>"}}envelopes), so any audio player saw garbage and bailed with "Failed to recognize file format".src/utils/audio-stream.ts(pipeAudioSseToStdout) parses SSE frames, JSON-decodes eachdata:line, hex-decodesdata.audio, and writes the bytes to stdout. Buffers across chunk boundaries so partial events are handled correctly.EPIPEhandler onprocess.stdout, so consumers closing the pipe early (... | head, a player that exits, ormpvnot being installed) result in a cleanexit(0)instead of an unhandled'error'event and Node stack trace.commands/speech/synthesize.tsandcommands/music/generate.tshad the same bug; both now use the shared helper.Test plan
Built locally with
bun run buildand verified against the live API:mmx speech synthesize --text "..." --stream > out.mp3produces a valid MP3 (filereportsID3 v2.4 / MPEG ADTS layer III, 128 kbps, 32 kHz).mmx speech synthesize --text "..." --stream | mpv --no-terminal -plays correctly end-to-end.mmx speech synthesize --text "..." --stream | head -c 5 > /dev/nullexits 0 with no EPIPE crash.music generate --streamcall since I only smoke-tested speech.Notes
[DONE]sentinels, comment/keepalive lines, and the optional single space afterdata:per the SSE spec.