Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 52 additions & 16 deletions internal/flink/command_statement_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,23 @@
"github.com/confluentinc/cli/v4/pkg/flink/types"
"github.com/confluentinc/cli/v4/pkg/output"
"github.com/confluentinc/cli/v4/pkg/properties"
"github.com/confluentinc/cli/v4/pkg/retry"
"github.com/confluentinc/cli/v4/pkg/wait"
)

// Phase enum from flink-gateway/v1 SqlV1StatementStatus.Phase. PENDING /
// FAILING / STOPPING / DELETING are transitioning states; FAILED is the only
// terminal failure; RUNNING / COMPLETED / STOPPED / DEGRADED are terminal
// success. The generator will source the same sets from AsyncConfig.
var (
flinkStatementPendingPhases = []string{"PENDING", "FAILING", "STOPPING", "DELETING"}
flinkStatementFailedPhases = []string{"FAILED"}
)

// statementsAPICreateTimeout aligns with terraform-provider-confluent's
// statementsAPICreateTimeout in internal/provider/constants.go. CLI users can
// shorten it with --wait-timeout; TF has no equivalent override.
const flinkStatementCreateWaitTimeout = 6 * time.Hour

func (c *command) newStatementCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create [name]",
Expand All @@ -41,7 +55,8 @@
c.addComputePoolFlag(cmd)
pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand)
c.addDatabaseFlag(cmd)
cmd.Flags().Bool("wait", false, "Block until the statement is running or has failed.")
cmd.Flags().Bool("wait", false, "Block until the statement reaches a terminal state.")
cmd.Flags().Duration("wait-timeout", flinkStatementCreateWaitTimeout, "Maximum time to wait when --wait is set.")
cmd.Flags().StringSlice("property", []string{}, "A mechanism to pass properties in the form key=value when creating a Flink statement.")
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
Expand All @@ -54,7 +69,7 @@
return cmd
}

func (c *command) statementCreate(cmd *cobra.Command, args []string) error {

Check failure on line 72 in internal/flink/command_statement_create.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 36 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3389&issues=d0c95dde-e2cd-4f98-8433-8f4e97ccfb21&open=d0c95dde-e2cd-4f98-8433-8f4e97ccfb21
environmentId, err := c.Context.EnvironmentId()
if err != nil {
return err
Expand Down Expand Up @@ -151,26 +166,47 @@
if err != nil {
return err
}
wait, err := cmd.Flags().GetBool("wait")
shouldWait, err := cmd.Flags().GetBool("wait")
if err != nil {
return err
}
if wait {
err := retry.Retry(time.Second, time.Minute, func() error {
statement, err = client.GetStatement(environmentId, name, c.Context.LastOrgId)
if err != nil {
return err
}

if statement.Status.GetPhase() == "PENDING" {
return fmt.Errorf(`statement phase is "%s"`, statement.Status.GetPhase())
}

return nil
})
if shouldWait {
timeout, err := cmd.Flags().GetDuration("wait-timeout")
if err != nil {
return err
}
statement, err = wait.PollPhases(cmd.Context(), wait.PhaseOptions[flinkgatewayv1.SqlV1Statement]{
Fetch: func() (flinkgatewayv1.SqlV1Statement, error) {
return client.GetStatement(environmentId, name, c.Context.LastOrgId)
},
Phase: func(s flinkgatewayv1.SqlV1Statement) string { return s.Status.GetPhase() },
PendingPhases: flinkStatementPendingPhases,
FailedPhases: flinkStatementFailedPhases,
PollInterval: time.Second,
Timeout: timeout,
})
if err != nil {
// wait.ErrFailed and wait.ErrTimeout are package-level sentinels
// returned directly from Poll (never wrapped); a direct == compare
// is correct and avoids importing stdlib errors alongside the CLI
// errors package.
switch err {
case wait.ErrFailed:
return errors.NewErrorWithSuggestions(
fmt.Sprintf(`statement "%s" entered failed phase %q: %s`, name, statement.Status.GetPhase(), statement.Status.GetDetail()),
fmt.Sprintf("Inspect the statement with `confluent flink statement describe %s`.", name),
)
case wait.ErrTimeout:
return errors.NewErrorWithSuggestions(
fmt.Sprintf(`wait timed out: statement "%s" is still in phase %q`, name, statement.Status.GetPhase()),
"Increase `--wait-timeout` or omit `--wait`.",
)
default:
// Fetch error or context cancellation — surface the underlying
// cause unmodified; suggesting a longer timeout would mislead.
return err
}
}
}

table := output.NewTable(cmd)
Expand Down
49 changes: 33 additions & 16 deletions internal/flink/command_statement_create_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/flink/types"
"github.com/confluentinc/cli/v4/pkg/output"
"github.com/confluentinc/cli/v4/pkg/retry"
"github.com/confluentinc/cli/v4/pkg/wait"
)

func (c *command) newStatementCreateCommandOnPrem() *cobra.Command {
Expand All @@ -36,7 +36,8 @@
cmd.Flags().String("catalog", "", "The name of the default catalog.")
cmd.Flags().String("database", "", "The name of the default database.")
cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.")
cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.")
cmd.Flags().Bool("wait", false, "Block until the statement reaches a terminal state.")
cmd.Flags().Duration("wait-timeout", flinkStatementCreateWaitTimeout, "Maximum time to wait when --wait is set.")
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

Expand All @@ -47,7 +48,7 @@
return cmd
}

func (c *command) statementCreateOnPrem(cmd *cobra.Command, args []string) error {

Check failure on line 51 in internal/flink/command_statement_create_onprem.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3389&issues=063534a4-a8fd-42ec-bb31-3e7f3116e3f2&open=063534a4-a8fd-42ec-bb31-3e7f3116e3f2
// Flink statement name can be automatically generated or provided by the user
name := types.GenerateStatementNameForOnPrem()
if len(args) == 1 {
Expand Down Expand Up @@ -114,7 +115,7 @@
Stopped: cmfsdk.PtrBool(false),
},
}
wait, err := cmd.Flags().GetBool("wait")
shouldWait, err := cmd.Flags().GetBool("wait")
if err != nil {
return err
}
Expand All @@ -124,22 +125,38 @@
return err
}

if wait {
err := retry.Retry(time.Second*2, time.Minute, func() error {
polledStatement, err := client.GetStatement(c.createContext(), environment, name)
if err != nil {
return err
}
if polledStatement.GetStatus().Phase == "PENDING" {
return fmt.Errorf(`statement phase is "%s"`, polledStatement.GetStatus().Phase)
}
// Update the finalStatement with the completed state
finalStatement = polledStatement
return nil
})
if shouldWait {
timeout, err := cmd.Flags().GetDuration("wait-timeout")
if err != nil {
return err
}
finalStatement, err = wait.PollPhases(cmd.Context(), wait.PhaseOptions[cmfsdk.Statement]{
Fetch: func() (cmfsdk.Statement, error) {
return client.GetStatement(c.createContext(), environment, name)
},
Phase: func(s cmfsdk.Statement) string { return s.GetStatus().Phase },
PendingPhases: flinkStatementPendingPhases,
FailedPhases: flinkStatementFailedPhases,
PollInterval: 2 * time.Second,
Timeout: timeout,
})
if err != nil {
status := finalStatement.GetStatus()
switch err {
case wait.ErrFailed:
return errors.NewErrorWithSuggestions(
fmt.Sprintf(`statement "%s" entered failed phase %q: %s`, name, status.Phase, status.GetDetail()),
fmt.Sprintf("Inspect the statement with `confluent flink statement describe %s`.", name),
)
case wait.ErrTimeout:
return errors.NewErrorWithSuggestions(
fmt.Sprintf(`wait timed out: statement "%s" is still in phase %q`, name, status.Phase),
"Increase `--wait-timeout` or omit `--wait`.",
)
default:
return err
}
}
}

if output.GetFormat(cmd) == output.Human {
Expand Down
142 changes: 142 additions & 0 deletions pkg/wait/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package wait

import (
"context"
"errors"
"time"
)

// Options describes a single Poll invocation. T is the polled resource type.
//
// Delay (if non-zero) sleeps before the first Fetch. Use it to give the
// resource a moment to materialize on the server after a POST; mirrors
// retry.StateChangeConf.Delay in terraform-provider-confluent.
//
// PollInterval is the gap between successive Fetch calls after the first.
type Options[T any] struct {
Fetch func() (T, error)
IsTerminal func(T) bool
IsFailed func(T) bool
Delay time.Duration
PollInterval time.Duration
Timeout time.Duration
}

// PhaseOptions is a declarative form of Options for the common case where
// readiness is determined by a single status-phase string. The pending and
// failed phase sets are typically sourced from the resource's OpenAPI status
// enum (see cli-terraform-generator AsyncConfig).
type PhaseOptions[T any] struct {
Fetch func() (T, error)
Phase func(T) string
PendingPhases []string
FailedPhases []string
Delay time.Duration
PollInterval time.Duration
Timeout time.Duration
}

var (
ErrTimeout = errors.New("wait timed out")
ErrFailed = errors.New("resource entered failed state")
)

// PhaseSet returns a predicate reporting whether its argument is in phases.
// Used to build IsTerminal / IsFailed checks from OpenAPI status enums.
func PhaseSet(phases ...string) func(string) bool {
set := make(map[string]struct{}, len(phases))
for _, p := range phases {
set[p] = struct{}{}
}
return func(s string) bool {
_, ok := set[s]
return ok
}
}

// PollPhases polls opts.Fetch until opts.Phase returns a value not in
// opts.PendingPhases. If the resulting phase is in opts.FailedPhases the
// return error is ErrFailed; otherwise it is treated as a successful
// terminal state. Timeout / context-cancellation / fetch-errors behave as in
// Poll.
func PollPhases[T any](ctx context.Context, opts PhaseOptions[T]) (T, error) {
pending := PhaseSet(opts.PendingPhases...)
failed := PhaseSet(opts.FailedPhases...)
return Poll(ctx, Options[T]{
Fetch: opts.Fetch,
IsTerminal: func(v T) bool { return !pending(opts.Phase(v)) },
IsFailed: func(v T) bool { return failed(opts.Phase(v)) },
Delay: opts.Delay,
PollInterval: opts.PollInterval,
Timeout: opts.Timeout,
})
}

// Poll sleeps opts.Delay (if non-zero), then calls opts.Fetch, then every
// opts.PollInterval until IsFailed returns true (ErrFailed), IsTerminal
// returns true (success), opts.Timeout elapses, or ctx is cancelled.
//
// Fetch errors are treated as transient and do not abort polling: the loop
// continues, preserving the most recent successfully-fetched value as `last`.
// This matches the historical retry.Retry-based behavior callers relied on,
// where 429/5xx/network blips during polling were retried until timeout. If
// the timeout elapses while the most recent Fetch errored, that error is
// returned in place of ErrTimeout so the user sees the underlying cause.
func Poll[T any](ctx context.Context, opts Options[T]) (T, error) {
var (
last T
lastErr error
)

check := func() (bool, error) {
v, ferr := opts.Fetch()
if ferr != nil {
lastErr = ferr
return false, nil
}
lastErr = nil
last = v
if opts.IsFailed != nil && opts.IsFailed(v) {
return true, ErrFailed
}
if opts.IsTerminal(v) {
return true, nil
}
return false, nil
}

deadline := time.After(opts.Timeout)

if opts.Delay > 0 {
select {
case <-time.After(opts.Delay):
case <-deadline:
return last, ErrTimeout
case <-ctx.Done():
return last, ctx.Err()
}
}

if done, err := check(); done {
return last, err
}

ticker := time.NewTicker(opts.PollInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if done, err := check(); done {
return last, err
}
case <-deadline:
if lastErr != nil {
return last, lastErr
}
return last, ErrTimeout
case <-ctx.Done():
return last, ctx.Err()
}
}
}
Loading