-
Notifications
You must be signed in to change notification settings - Fork 65
fix(stream): decode SSE/hex audio for --stream and handle EPIPE #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import { musicEndpoint } from '../../client/endpoints'; | |
| import { formatOutput, detectOutputFormat } from '../../output/formatter'; | ||
| import { saveAudioOutput } from '../../output/audio'; | ||
| import { readTextFromPathOrStdin } from '../../utils/fs'; | ||
| import { pipeAudioSseToStdout, NoResponseBodyError } from '../../utils/audio-stream'; | ||
| import type { Config } from '../../config/schema'; | ||
| import type { GlobalFlags } from '../../types/flags'; | ||
| import type { MusicRequest, MusicResponse } from '../../types/api'; | ||
|
|
@@ -149,14 +150,14 @@ export default defineCommand({ | |
|
|
||
| if (flags.stream) { | ||
| const res = await request(config, { url, method: 'POST', body, stream: true }); | ||
| const reader = res.body?.getReader(); | ||
| if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| process.stdout.write(value); | ||
| try { | ||
| await pipeAudioSseToStdout(res); | ||
| } catch (err) { | ||
| if (err instanceof NoResponseBodyError) { | ||
| throw new CLIError('No response body', ExitCode.GENERAL); | ||
| } | ||
| throw err; | ||
| } | ||
|
Comment on lines
+153
to
160
|
||
| reader.releaseLock(); | ||
| return; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ import { speechEndpoint } from '../../client/endpoints'; | |
| import { detectOutputFormat, formatOutput } from '../../output/formatter'; | ||
| import { saveAudioOutput } from '../../output/audio'; | ||
| import { readTextFromPathOrStdin } from '../../utils/fs'; | ||
| import { pipeAudioSseToStdout, NoResponseBodyError } from '../../utils/audio-stream'; | ||
| import type { Config } from '../../config/schema'; | ||
| import type { GlobalFlags } from '../../types/flags'; | ||
| import type { SpeechRequest, SpeechResponse } from '../../types/api'; | ||
|
|
@@ -98,14 +99,14 @@ export default defineCommand({ | |
|
|
||
| if (flags.stream) { | ||
| const res = await request(config, { url, method: 'POST', body, stream: true }); | ||
| const reader = res.body?.getReader(); | ||
| if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| process.stdout.write(value); | ||
| try { | ||
| await pipeAudioSseToStdout(res); | ||
| } catch (err) { | ||
|
Comment on lines
100
to
+104
|
||
| if (err instanceof NoResponseBodyError) { | ||
| throw new CLIError('No response body', ExitCode.GENERAL); | ||
| } | ||
| throw err; | ||
|
Comment on lines
+102
to
+108
|
||
| } | ||
| reader.releaseLock(); | ||
| return; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| /** | ||
| * Helpers for piping streamed TTS / music responses to stdout as raw audio. | ||
| * | ||
| * The MiniMax streaming endpoints return a Server-Sent Events stream of JSON | ||
| * envelopes whose `data.audio` field is a hex-encoded chunk of the target | ||
| * audio format. The `--stream` CLI flag is documented as writing *raw audio* | ||
| * to stdout (so it can be piped directly into players such as `mpv -`), so | ||
| * this helper consumes the SSE stream, decodes the hex payloads, and writes | ||
| * the decoded bytes to stdout. | ||
| * | ||
| * The stream contains N incremental chunk events followed by a terminal | ||
| * "summary" event that re-sends the full audio plus metadata (this is what | ||
| * `--out` saves). The summary must be skipped in streaming mode, otherwise | ||
| * the complete file gets appended after the incremental frames and the | ||
| * resulting MP3 contains duplicated audio with broken framing. The summary | ||
| * is identified by the presence of a top-level `extra_info` field — note | ||
| * that `trace_id` is on every event, so it cannot be used as the | ||
| * discriminator. | ||
| */ | ||
|
|
||
| import { parseSSE } from '../client/stream'; | ||
|
|
||
| /** Thrown when the upstream response has no body to stream from. */ | ||
| export class NoResponseBodyError extends Error { | ||
| constructor() { | ||
| super('No response body'); | ||
| this.name = 'NoResponseBodyError'; | ||
| } | ||
| } | ||
|
|
||
| interface SseEnvelope { | ||
| data?: { audio?: string; status?: number }; | ||
| extra_info?: unknown; | ||
| trace_id?: string; | ||
| } | ||
|
|
||
| let stdoutEpipeHandlerInstalled = false; | ||
|
|
||
| /** | ||
| * Install (idempotently) an EPIPE handler on stdout so that downstream | ||
| * consumers closing the pipe early (e.g. `... | head`, or a player that | ||
| * exits) cause a clean exit instead of an unhandled `'error'` event. | ||
| */ | ||
| export function installStdoutEpipeHandler(): void { | ||
| if (stdoutEpipeHandlerInstalled) return; | ||
| stdoutEpipeHandlerInstalled = true; | ||
| process.stdout.on('error', (err: NodeJS.ErrnoException) => { | ||
| if (err && err.code === 'EPIPE') { | ||
| process.exit(0); | ||
| } | ||
| throw err; | ||
| }); | ||
|
Comment on lines
+44
to
+52
|
||
| } | ||
|
|
||
| /** | ||
| * Consume a fetch-style SSE response and write the decoded raw audio bytes | ||
| * (from `data.audio` hex fields) to stdout, honoring backpressure. | ||
| * | ||
| * @throws {NoResponseBodyError} if `response.body` is missing. | ||
| */ | ||
| export async function pipeAudioSseToStdout(response: Response): Promise<void> { | ||
| if (!response.body) { | ||
| throw new NoResponseBodyError(); | ||
| } | ||
|
|
||
| installStdoutEpipeHandler(); | ||
|
|
||
| for await (const event of parseSSE(response)) { | ||
| const payload = event.data; | ||
| if (!payload || payload === '[DONE]') continue; | ||
|
|
||
| let parsed: SseEnvelope; | ||
| try { | ||
| parsed = JSON.parse(payload) as SseEnvelope; | ||
| } catch { | ||
| // Non-JSON keepalive — skip. | ||
| continue; | ||
| } | ||
|
|
||
| // Skip the terminal summary event (it re-sends the entire audio). | ||
| if (parsed.extra_info !== undefined) continue; | ||
|
|
||
| const hex = parsed.data?.audio; | ||
| if (typeof hex !== 'string' || hex.length === 0) continue; | ||
|
|
||
| const chunk = Buffer.from(hex, 'hex'); | ||
| if (!process.stdout.write(chunk)) { | ||
| // Honor backpressure: pause until stdout drains. | ||
| await new Promise<void>((resolve) => process.stdout.once('drain', resolve)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
--streampath now decodes SSE->JSON->hex->bytes, but there’s no automated test coverage for this behavior. Consider adding a test that serves an SSE response via the existing mock server helper and verifies the generated bytes written to stdout formusic generate --stream(including chunk-boundary buffering).