From 43219af52eba317d4a6fb77a3dbb0323224c52e1 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Wed, 24 Jun 2026 16:02:21 -0700 Subject: [PATCH] =?UTF-8?q?Reapply=20"[APIE-1040]=20Add=20generic=20--wait?= =?UTF-8?q?=20framework;=20refactor=20Flink=20stat=E2=80=A6=20(#3386)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 7d98f0c1dafa4326af402a4ff4b5f26c0a40d5cc. --- internal/flink/command_statement_create.go | 68 +++- .../flink/command_statement_create_onprem.go | 49 ++- pkg/wait/wait.go | 142 +++++++++ pkg/wait/wait_test.go | 300 ++++++++++++++++++ .../flink/statement/create-help-onprem.golden | 3 +- .../output/flink/statement/create-help.golden | 3 +- ...create-missing-compute-pool-failure.golden | 3 +- .../create-missing-sql-failure.golden | 3 +- .../flink/statement/create-wait-failed.golden | 4 + .../flink/statement/create-wait-onprem.golden | 11 + .../statement/create-wait-timeout.golden | 4 + test/flink_onprem_test.go | 4 + test/flink_test.go | 2 + test/test-server/flink_gateway_router.go | 14 +- test/test-server/flink_onprem_handler.go | 23 ++ 15 files changed, 595 insertions(+), 38 deletions(-) create mode 100644 pkg/wait/wait.go create mode 100644 pkg/wait/wait_test.go create mode 100644 test/fixtures/output/flink/statement/create-wait-failed.golden create mode 100644 test/fixtures/output/flink/statement/create-wait-onprem.golden create mode 100644 test/fixtures/output/flink/statement/create-wait-timeout.golden diff --git a/internal/flink/command_statement_create.go b/internal/flink/command_statement_create.go index eda49f8606..ae9254f5e3 100644 --- a/internal/flink/command_statement_create.go +++ b/internal/flink/command_statement_create.go @@ -16,9 +16,23 @@ import ( "github.com/confluentinc/cli/v4/pkg/flink/types" "github.com/confluentinc/cli/v4/pkg/output" "github.com/confluentinc/cli/v4/pkg/properties" - "github.com/confluentinc/cli/v4/pkg/retry" + "github.com/confluentinc/cli/v4/pkg/wait" ) +// Phase enum from flink-gateway/v1 SqlV1StatementStatus.Phase. PENDING / +// FAILING / STOPPING / DELETING are transitioning states; FAILED is the only +// terminal failure; RUNNING / COMPLETED / STOPPED / DEGRADED are terminal +// success. The generator will source the same sets from AsyncConfig. +var ( + flinkStatementPendingPhases = []string{"PENDING", "FAILING", "STOPPING", "DELETING"} + flinkStatementFailedPhases = []string{"FAILED"} +) + +// statementsAPICreateTimeout aligns with terraform-provider-confluent's +// statementsAPICreateTimeout in internal/provider/constants.go. CLI users can +// shorten it with --wait-timeout; TF has no equivalent override. +const flinkStatementCreateWaitTimeout = 6 * time.Hour + func (c *command) newStatementCreateCommand() *cobra.Command { cmd := &cobra.Command{ Use: "create [name]", @@ -41,7 +55,8 @@ func (c *command) newStatementCreateCommand() *cobra.Command { c.addComputePoolFlag(cmd) pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand) c.addDatabaseFlag(cmd) - cmd.Flags().Bool("wait", false, "Block until the statement is running or has failed.") + cmd.Flags().Bool("wait", false, "Block until the statement reaches a terminal state.") + cmd.Flags().Duration("wait-timeout", flinkStatementCreateWaitTimeout, "Maximum time to wait when --wait is set.") cmd.Flags().StringSlice("property", []string{}, "A mechanism to pass properties in the form key=value when creating a Flink statement.") pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) @@ -151,26 +166,47 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error { if err != nil { return err } - wait, err := cmd.Flags().GetBool("wait") + shouldWait, err := cmd.Flags().GetBool("wait") if err != nil { return err } - if wait { - err := retry.Retry(time.Second, time.Minute, func() error { - statement, err = client.GetStatement(environmentId, name, c.Context.LastOrgId) - if err != nil { - return err - } - - if statement.Status.GetPhase() == "PENDING" { - return fmt.Errorf(`statement phase is "%s"`, statement.Status.GetPhase()) - } - - return nil - }) + if shouldWait { + timeout, err := cmd.Flags().GetDuration("wait-timeout") if err != nil { return err } + statement, err = wait.PollPhases(cmd.Context(), wait.PhaseOptions[flinkgatewayv1.SqlV1Statement]{ + Fetch: func() (flinkgatewayv1.SqlV1Statement, error) { + return client.GetStatement(environmentId, name, c.Context.LastOrgId) + }, + Phase: func(s flinkgatewayv1.SqlV1Statement) string { return s.Status.GetPhase() }, + PendingPhases: flinkStatementPendingPhases, + FailedPhases: flinkStatementFailedPhases, + PollInterval: time.Second, + Timeout: timeout, + }) + if err != nil { + // wait.ErrFailed and wait.ErrTimeout are package-level sentinels + // returned directly from Poll (never wrapped); a direct == compare + // is correct and avoids importing stdlib errors alongside the CLI + // errors package. + switch err { + case wait.ErrFailed: + return errors.NewErrorWithSuggestions( + fmt.Sprintf(`statement "%s" entered failed phase %q: %s`, name, statement.Status.GetPhase(), statement.Status.GetDetail()), + fmt.Sprintf("Inspect the statement with `confluent flink statement describe %s`.", name), + ) + case wait.ErrTimeout: + return errors.NewErrorWithSuggestions( + fmt.Sprintf(`wait timed out: statement "%s" is still in phase %q`, name, statement.Status.GetPhase()), + "Increase `--wait-timeout` or omit `--wait`.", + ) + default: + // Fetch error or context cancellation — surface the underlying + // cause unmodified; suggesting a longer timeout would mislead. + return err + } + } } table := output.NewTable(cmd) diff --git a/internal/flink/command_statement_create_onprem.go b/internal/flink/command_statement_create_onprem.go index d6405eff7a..0567c8e6af 100644 --- a/internal/flink/command_statement_create_onprem.go +++ b/internal/flink/command_statement_create_onprem.go @@ -16,7 +16,7 @@ import ( "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/flink/types" "github.com/confluentinc/cli/v4/pkg/output" - "github.com/confluentinc/cli/v4/pkg/retry" + "github.com/confluentinc/cli/v4/pkg/wait" ) func (c *command) newStatementCreateCommandOnPrem() *cobra.Command { @@ -36,7 +36,8 @@ func (c *command) newStatementCreateCommandOnPrem() *cobra.Command { cmd.Flags().String("catalog", "", "The name of the default catalog.") cmd.Flags().String("database", "", "The name of the default database.") cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.") - cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.") + cmd.Flags().Bool("wait", false, "Block until the statement reaches a terminal state.") + cmd.Flags().Duration("wait-timeout", flinkStatementCreateWaitTimeout, "Maximum time to wait when --wait is set.") addCmfFlagSet(cmd) pcmd.AddOutputFlag(cmd) @@ -114,7 +115,7 @@ func (c *command) statementCreateOnPrem(cmd *cobra.Command, args []string) error Stopped: cmfsdk.PtrBool(false), }, } - wait, err := cmd.Flags().GetBool("wait") + shouldWait, err := cmd.Flags().GetBool("wait") if err != nil { return err } @@ -124,22 +125,38 @@ func (c *command) statementCreateOnPrem(cmd *cobra.Command, args []string) error return err } - if wait { - err := retry.Retry(time.Second*2, time.Minute, func() error { - polledStatement, err := client.GetStatement(c.createContext(), environment, name) - if err != nil { - return err - } - if polledStatement.GetStatus().Phase == "PENDING" { - return fmt.Errorf(`statement phase is "%s"`, polledStatement.GetStatus().Phase) - } - // Update the finalStatement with the completed state - finalStatement = polledStatement - return nil - }) + if shouldWait { + timeout, err := cmd.Flags().GetDuration("wait-timeout") if err != nil { return err } + finalStatement, err = wait.PollPhases(cmd.Context(), wait.PhaseOptions[cmfsdk.Statement]{ + Fetch: func() (cmfsdk.Statement, error) { + return client.GetStatement(c.createContext(), environment, name) + }, + Phase: func(s cmfsdk.Statement) string { return s.GetStatus().Phase }, + PendingPhases: flinkStatementPendingPhases, + FailedPhases: flinkStatementFailedPhases, + PollInterval: 2 * time.Second, + Timeout: timeout, + }) + if err != nil { + status := finalStatement.GetStatus() + switch err { + case wait.ErrFailed: + return errors.NewErrorWithSuggestions( + fmt.Sprintf(`statement "%s" entered failed phase %q: %s`, name, status.Phase, status.GetDetail()), + fmt.Sprintf("Inspect the statement with `confluent flink statement describe %s`.", name), + ) + case wait.ErrTimeout: + return errors.NewErrorWithSuggestions( + fmt.Sprintf(`wait timed out: statement "%s" is still in phase %q`, name, status.Phase), + "Increase `--wait-timeout` or omit `--wait`.", + ) + default: + return err + } + } } if output.GetFormat(cmd) == output.Human { diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go new file mode 100644 index 0000000000..d002696593 --- /dev/null +++ b/pkg/wait/wait.go @@ -0,0 +1,142 @@ +package wait + +import ( + "context" + "errors" + "time" +) + +// Options describes a single Poll invocation. T is the polled resource type. +// +// Delay (if non-zero) sleeps before the first Fetch. Use it to give the +// resource a moment to materialize on the server after a POST; mirrors +// retry.StateChangeConf.Delay in terraform-provider-confluent. +// +// PollInterval is the gap between successive Fetch calls after the first. +type Options[T any] struct { + Fetch func() (T, error) + IsTerminal func(T) bool + IsFailed func(T) bool + Delay time.Duration + PollInterval time.Duration + Timeout time.Duration +} + +// PhaseOptions is a declarative form of Options for the common case where +// readiness is determined by a single status-phase string. The pending and +// failed phase sets are typically sourced from the resource's OpenAPI status +// enum (see cli-terraform-generator AsyncConfig). +type PhaseOptions[T any] struct { + Fetch func() (T, error) + Phase func(T) string + PendingPhases []string + FailedPhases []string + Delay time.Duration + PollInterval time.Duration + Timeout time.Duration +} + +var ( + ErrTimeout = errors.New("wait timed out") + ErrFailed = errors.New("resource entered failed state") +) + +// PhaseSet returns a predicate reporting whether its argument is in phases. +// Used to build IsTerminal / IsFailed checks from OpenAPI status enums. +func PhaseSet(phases ...string) func(string) bool { + set := make(map[string]struct{}, len(phases)) + for _, p := range phases { + set[p] = struct{}{} + } + return func(s string) bool { + _, ok := set[s] + return ok + } +} + +// PollPhases polls opts.Fetch until opts.Phase returns a value not in +// opts.PendingPhases. If the resulting phase is in opts.FailedPhases the +// return error is ErrFailed; otherwise it is treated as a successful +// terminal state. Timeout / context-cancellation / fetch-errors behave as in +// Poll. +func PollPhases[T any](ctx context.Context, opts PhaseOptions[T]) (T, error) { + pending := PhaseSet(opts.PendingPhases...) + failed := PhaseSet(opts.FailedPhases...) + return Poll(ctx, Options[T]{ + Fetch: opts.Fetch, + IsTerminal: func(v T) bool { return !pending(opts.Phase(v)) }, + IsFailed: func(v T) bool { return failed(opts.Phase(v)) }, + Delay: opts.Delay, + PollInterval: opts.PollInterval, + Timeout: opts.Timeout, + }) +} + +// Poll sleeps opts.Delay (if non-zero), then calls opts.Fetch, then every +// opts.PollInterval until IsFailed returns true (ErrFailed), IsTerminal +// returns true (success), opts.Timeout elapses, or ctx is cancelled. +// +// Fetch errors are treated as transient and do not abort polling: the loop +// continues, preserving the most recent successfully-fetched value as `last`. +// This matches the historical retry.Retry-based behavior callers relied on, +// where 429/5xx/network blips during polling were retried until timeout. If +// the timeout elapses while the most recent Fetch errored, that error is +// returned in place of ErrTimeout so the user sees the underlying cause. +func Poll[T any](ctx context.Context, opts Options[T]) (T, error) { + var ( + last T + lastErr error + ) + + check := func() (bool, error) { + v, ferr := opts.Fetch() + if ferr != nil { + lastErr = ferr + return false, nil + } + lastErr = nil + last = v + if opts.IsFailed != nil && opts.IsFailed(v) { + return true, ErrFailed + } + if opts.IsTerminal(v) { + return true, nil + } + return false, nil + } + + deadline := time.After(opts.Timeout) + + if opts.Delay > 0 { + select { + case <-time.After(opts.Delay): + case <-deadline: + return last, ErrTimeout + case <-ctx.Done(): + return last, ctx.Err() + } + } + + if done, err := check(); done { + return last, err + } + + ticker := time.NewTicker(opts.PollInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if done, err := check(); done { + return last, err + } + case <-deadline: + if lastErr != nil { + return last, lastErr + } + return last, ErrTimeout + case <-ctx.Done(): + return last, ctx.Err() + } + } +} diff --git a/pkg/wait/wait_test.go b/pkg/wait/wait_test.go new file mode 100644 index 0000000000..b0dcd90cc8 --- /dev/null +++ b/pkg/wait/wait_test.go @@ -0,0 +1,300 @@ +package wait + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type fakeResource struct { + phase string +} + +func TestPoll_ImmediateReady(t *testing.T) { + calls := 0 + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + return fakeResource{phase: "READY"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase == "READY" }, + PollInterval: time.Millisecond, + Timeout: time.Second, + }) + require.NoError(t, err) + require.Equal(t, "READY", v.phase) + require.Equal(t, 1, calls) +} + +func TestPoll_EventuallyReady(t *testing.T) { + calls := 0 + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + if calls < 3 { + return fakeResource{phase: "PENDING"}, nil + } + return fakeResource{phase: "READY"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, + PollInterval: time.Nanosecond, + Timeout: time.Second, + }) + require.NoError(t, err) + require.Equal(t, "READY", v.phase) + require.Equal(t, 3, calls) +} + +func TestPoll_Failed(t *testing.T) { + calls := 0 + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + if calls == 1 { + return fakeResource{phase: "PENDING"}, nil + } + return fakeResource{phase: "FAILED"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase == "READY" || r.phase == "FAILED" }, + IsFailed: func(r fakeResource) bool { return r.phase == "FAILED" }, + PollInterval: time.Nanosecond, + Timeout: time.Second, + }) + require.ErrorIs(t, err, ErrFailed) + require.Equal(t, "FAILED", v.phase) +} + +func TestPoll_Timeout(t *testing.T) { + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + return fakeResource{phase: "PENDING"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, + PollInterval: time.Millisecond, + Timeout: 5 * time.Millisecond, + }) + require.ErrorIs(t, err, ErrTimeout) + require.Equal(t, "PENDING", v.phase) +} + +// TestPoll_PersistentFetchErrorReturnsLastErrAtTimeout: fetch errors do not +// abort polling (preserves the historical retry.Retry behavior); when the +// timeout fires while the most recent fetch errored, the underlying error is +// surfaced in place of ErrTimeout so users see the real cause. +func TestPoll_PersistentFetchErrorReturnsLastErrAtTimeout(t *testing.T) { + calls := 0 + fetchErr := fmt.Errorf("transient fetch failure") + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + if calls == 1 { + return fakeResource{phase: "PENDING"}, nil + } + return fakeResource{}, fetchErr + }, + IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, + PollInterval: time.Millisecond, + Timeout: 20 * time.Millisecond, + }) + require.ErrorIs(t, err, fetchErr) + require.NotErrorIs(t, err, ErrTimeout) + require.Equal(t, "PENDING", v.phase) // last good value preserved + require.GreaterOrEqual(t, calls, 2) +} + +func TestPoll_FetchErrorOnlyOnFirstCallReturnsAtTimeout(t *testing.T) { + fetchErr := fmt.Errorf("initial fetch failure") + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + return fakeResource{}, fetchErr + }, + IsTerminal: func(r fakeResource) bool { return true }, + PollInterval: time.Millisecond, + Timeout: 20 * time.Millisecond, + }) + require.ErrorIs(t, err, fetchErr) + require.Equal(t, fakeResource{}, v) +} + +// TestPoll_FetchErrorThenSuccess: a transient fetch error mid-polling should +// not abort. Once a subsequent fetch returns successfully and reaches a +// terminal state, the poll returns success. +func TestPoll_FetchErrorThenSuccess(t *testing.T) { + calls := 0 + fetchErr := fmt.Errorf("transient 502") + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + switch calls { + case 1: + return fakeResource{phase: "PENDING"}, nil + case 2, 3: + return fakeResource{}, fetchErr + default: + return fakeResource{phase: "READY"}, nil + } + }, + IsTerminal: func(r fakeResource) bool { return r.phase == "READY" }, + PollInterval: time.Millisecond, + Timeout: time.Second, + }) + require.NoError(t, err) + require.Equal(t, "READY", v.phase) + require.GreaterOrEqual(t, calls, 4) +} + +func TestPoll_ContextCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(5 * time.Millisecond) + cancel() + }() + _, err := Poll(ctx, Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + return fakeResource{phase: "PENDING"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase != "PENDING" }, + PollInterval: time.Millisecond, + Timeout: time.Second, + }) + require.True(t, errors.Is(err, context.Canceled)) +} + +// TestPoll_DelayPostponesFirstFetch: Delay sleeps before the first Fetch so +// the resource has a moment to materialize on the server. Mirrors +// retry.StateChangeConf.Delay. +func TestPoll_DelayPostponesFirstFetch(t *testing.T) { + start := time.Now() + calls := 0 + v, err := Poll(context.Background(), Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + return fakeResource{phase: "READY"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase == "READY" }, + Delay: 15 * time.Millisecond, + PollInterval: time.Millisecond, + Timeout: time.Second, + }) + require.NoError(t, err) + require.Equal(t, "READY", v.phase) + require.Equal(t, 1, calls) + require.GreaterOrEqual(t, time.Since(start), 15*time.Millisecond) +} + +// TestPoll_DelayRespectsCtxCancellation: cancelling ctx while waiting on Delay +// returns ctx.Err() immediately instead of sleeping out the full Delay. +func TestPoll_DelayRespectsCtxCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(5 * time.Millisecond) + cancel() + }() + calls := 0 + _, err := Poll(ctx, Options[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + return fakeResource{phase: "READY"}, nil + }, + IsTerminal: func(r fakeResource) bool { return r.phase == "READY" }, + Delay: time.Second, + PollInterval: time.Millisecond, + Timeout: time.Second, + }) + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, 0, calls) // never made it past Delay +} + +func TestPhaseSet(t *testing.T) { + pending := PhaseSet("PENDING", "FAILING", "STOPPING", "DELETING") + require.True(t, pending("PENDING")) + require.True(t, pending("FAILING")) + require.True(t, pending("STOPPING")) + require.True(t, pending("DELETING")) + require.False(t, pending("RUNNING")) + require.False(t, pending("FAILED")) + require.False(t, pending("")) + require.False(t, pending("pending")) // case-sensitive + + // Empty set never matches. + none := PhaseSet() + require.False(t, none("")) + require.False(t, none("ANYTHING")) +} + +func TestPollPhases_TerminalSuccess(t *testing.T) { + calls := 0 + v, err := PollPhases(context.Background(), PhaseOptions[fakeResource]{ + Fetch: func() (fakeResource, error) { + calls++ + if calls < 3 { + return fakeResource{phase: "PENDING"}, nil + } + return fakeResource{phase: "RUNNING"}, nil + }, + Phase: func(r fakeResource) string { return r.phase }, + PendingPhases: []string{"PENDING", "FAILING", "STOPPING", "DELETING"}, + FailedPhases: []string{"FAILED"}, + PollInterval: time.Nanosecond, + Timeout: time.Second, + }) + require.NoError(t, err) + require.Equal(t, "RUNNING", v.phase) + require.Equal(t, 3, calls) +} + +func TestPollPhases_FailedPhase(t *testing.T) { + v, err := PollPhases(context.Background(), PhaseOptions[fakeResource]{ + Fetch: func() (fakeResource, error) { return fakeResource{phase: "FAILED"}, nil }, + Phase: func(r fakeResource) string { return r.phase }, + PendingPhases: []string{"PENDING"}, + FailedPhases: []string{"FAILED"}, + PollInterval: time.Nanosecond, + Timeout: time.Second, + }) + require.ErrorIs(t, err, ErrFailed) + require.Equal(t, "FAILED", v.phase) +} + +// Verifies the bug Channing flagged: PENDING is not the only intermediate +// state — FAILING, STOPPING, DELETING are also transitioning. They must keep +// polling, not terminate. +func TestPollPhases_AllPendingPhasesContinuePolling(t *testing.T) { + cases := []struct { + name string + sequence []string + wantFinal string + }{ + {name: "failing_then_failed", sequence: []string{"PENDING", "FAILING", "FAILED"}, wantFinal: "FAILED"}, + {name: "stopping_then_stopped", sequence: []string{"PENDING", "STOPPING", "STOPPED"}, wantFinal: "STOPPED"}, + {name: "deleting_then_running", sequence: []string{"PENDING", "DELETING", "RUNNING"}, wantFinal: "RUNNING"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + calls := 0 + v, err := PollPhases(context.Background(), PhaseOptions[fakeResource]{ + Fetch: func() (fakeResource, error) { + p := tc.sequence[calls] + calls++ + return fakeResource{phase: p}, nil + }, + Phase: func(r fakeResource) string { return r.phase }, + PendingPhases: []string{"PENDING", "FAILING", "STOPPING", "DELETING"}, + FailedPhases: []string{"FAILED"}, + PollInterval: time.Nanosecond, + Timeout: time.Second, + }) + require.Equal(t, tc.wantFinal, v.phase) + if tc.wantFinal == "FAILED" { + require.ErrorIs(t, err, ErrFailed) + } else { + require.NoError(t, err) + } + require.Equal(t, len(tc.sequence), calls) + }) + } +} diff --git a/test/fixtures/output/flink/statement/create-help-onprem.golden b/test/fixtures/output/flink/statement/create-help-onprem.golden index 8755604c1c..de2ed822e9 100644 --- a/test/fixtures/output/flink/statement/create-help-onprem.golden +++ b/test/fixtures/output/flink/statement/create-help-onprem.golden @@ -11,7 +11,8 @@ Flags: --catalog string The name of the default catalog. --database string The name of the default database. --flink-configuration string The file path to hold the Flink configuration for the statement. - --wait Boolean flag to block until the statement is running or has failed. + --wait Block until the statement reaches a terminal state. + --wait-timeout duration Maximum time to wait when --wait is set. (default 6h0m0s) --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. diff --git a/test/fixtures/output/flink/statement/create-help.golden b/test/fixtures/output/flink/statement/create-help.golden index da60ffb363..f3c983c9ce 100644 --- a/test/fixtures/output/flink/statement/create-help.golden +++ b/test/fixtures/output/flink/statement/create-help.golden @@ -17,7 +17,8 @@ Flags: --compute-pool string Flink compute pool ID. --service-account string Service account ID. --database string The database which will be used as the default database. When using Kafka, this is the cluster ID. - --wait Block until the statement is running or has failed. + --wait Block until the statement reaches a terminal state. + --wait-timeout duration Maximum time to wait when --wait is set. (default 6h0m0s) --property strings A mechanism to pass properties in the form key=value when creating a Flink statement. --environment string Environment ID. --context string CLI context name. diff --git a/test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden b/test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden index 637ec67841..e330fbfa86 100644 --- a/test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden +++ b/test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden @@ -10,7 +10,8 @@ Flags: --catalog string The name of the default catalog. --database string The name of the default database. --flink-configuration string The file path to hold the Flink configuration for the statement. - --wait Boolean flag to block until the statement is running or has failed. + --wait Block until the statement reaches a terminal state. + --wait-timeout duration Maximum time to wait when --wait is set. (default 6h0m0s) --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. diff --git a/test/fixtures/output/flink/statement/create-missing-sql-failure.golden b/test/fixtures/output/flink/statement/create-missing-sql-failure.golden index 6fa346c882..51aae81f04 100644 --- a/test/fixtures/output/flink/statement/create-missing-sql-failure.golden +++ b/test/fixtures/output/flink/statement/create-missing-sql-failure.golden @@ -10,7 +10,8 @@ Flags: --catalog string The name of the default catalog. --database string The name of the default database. --flink-configuration string The file path to hold the Flink configuration for the statement. - --wait Boolean flag to block until the statement is running or has failed. + --wait Block until the statement reaches a terminal state. + --wait-timeout duration Maximum time to wait when --wait is set. (default 6h0m0s) --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. diff --git a/test/fixtures/output/flink/statement/create-wait-failed.golden b/test/fixtures/output/flink/statement/create-wait-failed.golden new file mode 100644 index 0000000000..e1a1051cf3 --- /dev/null +++ b/test/fixtures/output/flink/statement/create-wait-failed.golden @@ -0,0 +1,4 @@ +Error: statement "failed-statement" entered failed phase "FAILED": SQL statement execution has failed + +Suggestions: + Inspect the statement with `confluent flink statement describe failed-statement`. diff --git a/test/fixtures/output/flink/statement/create-wait-onprem.golden b/test/fixtures/output/flink/statement/create-wait-onprem.golden new file mode 100644 index 0000000000..cfcad605b3 --- /dev/null +++ b/test/fixtures/output/flink/statement/create-wait-onprem.golden @@ -0,0 +1,11 @@ ++---------------+-------------------------------+ +| Creation Date | 2025-08-05 12:00:00 +0000 UTC | +| Name | running-wait-stmt | +| Statement | SELECT * FROM test_table | +| Compute Pool | test-pool | +| Status | RUNNING | +| Status Detail | Statement is running. | +| Parallelism | 1 | +| Stopped | false | +| SQL Kind | SELECT | ++---------------+-------------------------------+ diff --git a/test/fixtures/output/flink/statement/create-wait-timeout.golden b/test/fixtures/output/flink/statement/create-wait-timeout.golden new file mode 100644 index 0000000000..165457494a --- /dev/null +++ b/test/fixtures/output/flink/statement/create-wait-timeout.golden @@ -0,0 +1,4 @@ +Error: wait timed out: statement "pending-statement" is still in phase "PENDING" + +Suggestions: + Increase `--wait-timeout` or omit `--wait`. diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 8c0756e30e..2c8c3dce21 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -571,6 +571,10 @@ func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { {args: `flink statement create test-stmt --environment default --sql "SELECT * FROM test_table" --compute-pool test-pool -o yaml`, fixture: "flink/statement/create-success-yaml.golden"}, {args: `flink statement create test-stmt --environment default --sql "SELECT * FROM test_table" --compute-pool test-pool --flink-configuration test/fixtures/input/flink/statement/flink-configuration.json`, fixture: "flink/statement/create-success.golden"}, {args: `flink statement create test-stmt --environment default --sql "SELECT * FROM test_table" --compute-pool test-pool --flink-configuration test/fixtures/input/flink/statement/flink-configuration.yaml`, fixture: "flink/statement/create-success.golden"}, + // --wait: POST returns PENDING; first poll observes RUNNING; framework + // returns the post-transition object so the output shows the terminal + // phase. Mirrors the cloud --wait coverage in flink_test.go. + {args: `flink statement create running-wait-stmt --environment default --sql "SELECT * FROM test_table" --compute-pool test-pool --wait`, fixture: "flink/statement/create-wait-onprem.golden"}, // failure {args: `flink statement create test-stmt --environment default --sql "SELECT * FROM test_table" --compute-pool test-pool --flink-configuration test/fixtures/input/flink/statement/flink-configuration.properties`, fixture: "flink/statement/create-failure-invalid-configuration-file-format.golden", exitCode: 1}, {args: `flink statement create test-stmt --environment default --sql "SELECT * FROM test_table" --compute-pool test-pool --flink-configuration test/fixtures/input/flink/statement/flink-configuration.csv`, fixture: "flink/statement/create-failure-configuration-file-dne.golden", regex: true, exitCode: 1}, diff --git a/test/flink_test.go b/test/flink_test.go index c07ef11c7f..6ad78de2f1 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -478,6 +478,8 @@ func (s *CLITestSuite) TestFlinkStatementCreate() { {args: `flink statement create my-statement-2 --sql "INSERT * INTO table;" --cloud aws --region eu-west-1 --service-account sa-123456`, fixture: "flink/statement/create-without-compute-pool.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456`, fixture: "flink/statement/create-service-account-warning.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --wait`, fixture: "flink/statement/create-wait.golden"}, + {args: `flink statement create pending-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --wait --wait-timeout 100ms`, fixture: "flink/statement/create-wait-timeout.golden", exitCode: 1}, + {args: `flink statement create failed-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --wait`, fixture: "flink/statement/create-wait-failed.golden", exitCode: 1}, {args: `flink statement create --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 -o yaml`, fixture: "flink/statement/create-no-name-yaml.golden", regex: true}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --property property1=value1,property2=value2`, fixture: "flink/statement/create-with-properties.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --property invalid-format,property1=value1`, fixture: "flink/statement/create-invalid-property.golden", exitCode: 1}, diff --git a/test/test-server/flink_gateway_router.go b/test/test-server/flink_gateway_router.go index affabe13a4..705abae2d7 100644 --- a/test/test-server/flink_gateway_router.go +++ b/test/test-server/flink_gateway_router.go @@ -226,6 +226,16 @@ func handleSqlEnvironmentsEnvironmentStatementsStatement(t *testing.T) http.Hand func handleStatementGet(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + phase := "COMPLETED" + detail := "SQL statement is completed" + switch mux.Vars(r)["statement"] { + case "pending-statement": + phase = "PENDING" + detail = "SQL statement is pending" + case "failed-statement": + phase = "FAILED" + detail = "SQL statement execution has failed" + } statement := flinkgatewayv1.SqlV1Statement{ Name: flinkgatewayv1.PtrString(mux.Vars(r)["statement"]), Spec: &flinkgatewayv1.SqlV1StatementSpec{ @@ -238,8 +248,8 @@ func handleStatementGet(t *testing.T) http.HandlerFunc { Principal: flinkgatewayv1.PtrString(validFlinkStatementPrincipalId), }, Status: &flinkgatewayv1.SqlV1StatementStatus{ - Phase: "COMPLETED", - Detail: flinkgatewayv1.PtrString("SQL statement is completed"), + Phase: phase, + Detail: flinkgatewayv1.PtrString(detail), LatestOffsets: &map[string]string{ "customers_source": "partition:0,offset:9223372036854775808", }, diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index 199f16791a..557f50b28d 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -1328,6 +1328,29 @@ func handleCmfStatement(t *testing.T) http.HandlerFunc { return } + // running-wait-stmt drives the on-prem --wait success path: POST + // returns PENDING (default), and the first poll observes RUNNING. + if stmtName == "running-wait-stmt" { + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() + stmt := cmfsdk.Statement{ + Metadata: cmfsdk.StatementMetadata{Name: stmtName, CreationTimestamp: &timeStamp}, + Spec: cmfsdk.StatementSpec{ + Statement: "SELECT * FROM test_table", + ComputePoolName: "test-pool", + Parallelism: cmfsdk.PtrInt32(1), + Stopped: cmfsdk.PtrBool(false), + }, + Status: &cmfsdk.StatementStatus{ + Phase: "RUNNING", + Detail: cmfsdk.PtrString("Statement is running."), + Traits: &cmfsdk.StatementTraits{SqlKind: cmfsdk.PtrString("SELECT"), IsAppendOnly: cmfsdk.PtrBool(false), IsBounded: cmfsdk.PtrBool(false)}, + }, + } + err := json.NewEncoder(w).Encode(stmt) + require.NoError(t, err) + return + } + if stmtName == "shell-test-stmt" { stmt := cmfsdk.Statement{ Metadata: cmfsdk.StatementMetadata{