From 53052a6987691cf9d08241a343ddcf6ae31e9745 Mon Sep 17 00:00:00 2001 From: Bitalizer <23104115+bitalizer@users.noreply.github.com> Date: Wed, 29 Apr 2026 11:31:12 +0300 Subject: [PATCH 1/3] fix: skip CountWorkflow in batch operations when --yes is set The visibility CountWorkflowExecutions request was issued unconditionally before every batch terminate / signal / cancel / reset. The count is only used to populate the "Start batch against approximately N workflow(s)?" confirmation prompt. When --yes bypasses the prompt entirely, the count result is never read. In clusters whose visibility API is overloaded (e.g. Postgres-backed clusters with many workflows), this CountWorkflow call can time out and prevent batch jobs from being started at all, even though the batch operation itself uses the same query and would succeed. Skipping the count when --yes is set lets these batch jobs proceed unconditionally. Both batch entry points are updated: - commands.workflow.go (terminate / signal / cancel) - commands.workflow_reset.go (reset) When the count is skipped, the prompt text shown by --yes changes from "Start batch against approximately N workflow(s)? y/N" to "Start batch against workflows matching query ""? y/N" so the output remains informative. Adds TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes which uses a gRPC unary interceptor to assert that CountWorkflowExecutionsRequest is *not* sent when --yes is passed, while StartBatchOperationRequest still is. The existing without-yes tests are unaffected. Closes #838 --- internal/temporalcli/commands.workflow.go | 18 +++-- .../temporalcli/commands.workflow_reset.go | 17 ++-- .../temporalcli/commands.workflow_test.go | 79 +++++++++++++++++++ 3 files changed, 103 insertions(+), 11 deletions(-) diff --git a/internal/temporalcli/commands.workflow.go b/internal/temporalcli/commands.workflow.go index 5a98d6ffe..99bd953b6 100644 --- a/internal/temporalcli/commands.workflow.go +++ b/internal/temporalcli/commands.workflow.go @@ -542,13 +542,19 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch( return nil, nil, fmt.Errorf("cannot set run ID when query is set") } - // Count the workflows that will be affected - count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: s.Query}) - if err != nil { - return nil, nil, fmt.Errorf("failed counting workflows from query: %w", err) + // The count is only used in the confirmation prompt; skip the request when --yes + // bypasses it, so batch jobs can still proceed if the visibility API is timing out. + var promptMessage string + if s.Yes { + promptMessage = fmt.Sprintf("Start batch against workflows matching query %q? y/N", s.Query) + } else { + count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: s.Query}) + if err != nil { + return nil, nil, fmt.Errorf("failed counting workflows from query: %w", err) + } + promptMessage = fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count) } - yes, err := cctx.promptYes( - fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), s.Yes) + yes, err := cctx.promptYes(promptMessage, s.Yes) if err != nil { return nil, nil, err } else if !yes { diff --git a/internal/temporalcli/commands.workflow_reset.go b/internal/temporalcli/commands.workflow_reset.go index 4c33065ce..aee402891 100644 --- a/internal/temporalcli/commands.workflow_reset.go +++ b/internal/temporalcli/commands.workflow_reset.go @@ -134,12 +134,19 @@ func (c *TemporalWorkflowResetCommand) runBatchResetWithPostOps(cctx *CommandCon PostResetOperations: postOps, }, } - count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query}) - if err != nil { - return fmt.Errorf("failed counting workflows from query: %w", err) + // The count is only used in the confirmation prompt; skip the request when --yes + // bypasses it, so batch jobs can still proceed if the visibility API is timing out. + var promptMessage string + if c.Yes { + promptMessage = fmt.Sprintf("Start batch against workflows matching query %q? y/N", c.Query) + } else { + count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query}) + if err != nil { + return fmt.Errorf("failed counting workflows from query: %w", err) + } + promptMessage = fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count) } - yes, err := cctx.promptYes( - fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), c.Yes) + yes, err := cctx.promptYes(promptMessage, c.Yes) if err != nil { return err } diff --git a/internal/temporalcli/commands.workflow_test.go b/internal/temporalcli/commands.workflow_test.go index 5a11f3627..da603f749 100644 --- a/internal/temporalcli/commands.workflow_test.go +++ b/internal/temporalcli/commands.workflow_test.go @@ -346,6 +346,85 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_JSON() { s.NotEmpty(jsonRes["batchJobId"]) } +// TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes verifies that --yes causes +// the batch terminate command to skip the CountWorkflowExecutions call. The count +// is only used for the "Start batch against approximately N workflow(s)?" prompt; +// when --yes bypasses the prompt, issuing it adds latency and prevents batch jobs +// from running on clusters whose visibility API is timing out. +func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes() { + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + ctx.Done().Receive(ctx, nil) + return nil, ctx.Err() + }) + + var callLock sync.Mutex + var countCalls, startBatchCalls int + s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append( + s.CommandHarness.Options.AdditionalClientGRPCDialOptions, + grpc.WithChainUnaryInterceptor(func( + ctx context.Context, + method string, req, reply any, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, + ) error { + callLock.Lock() + switch req.(type) { + case *workflowservice.CountWorkflowExecutionsRequest: + countCalls++ + case *workflowservice.StartBatchOperationRequest: + startBatchCalls++ + } + callLock.Unlock() + return invoker(ctx, method, req, reply, cc, opts...) + }), + ) + + // Start a workflow so the batch query has at least one match. The count assertion + // is independent of the match count (it asserts zero CountWorkflow calls regardless), + // but executing the batch against a real workflow keeps the test path realistic. + searchAttr := "keyword-" + uuid.NewString() + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 1 + }, 3*time.Second, 100*time.Millisecond) + + res := s.Execute( + "workflow", "terminate", + "--address", s.Address(), + "--query", "CustomKeywordField = '"+searchAttr+"'", + "--reason", "skip-count-test", + "--yes", + ) + s.NoError(res.Err) + + callLock.Lock() + defer callLock.Unlock() + s.Equal(0, countCalls, "CountWorkflowExecutions must not be called when --yes is set") + s.Equal(1, startBatchCalls, "StartBatchOperation must still be called") + + // Sanity-check: the prompt text should reflect the missing count. + s.NotContains(res.Stdout.String(), "approximately") + s.Contains(res.Stdout.String(), "matching query") + + // Drain the workflow so the test fixture cleans up. + s.Eventually(func() bool { + err := run.Get(s.Context, nil) + return err != nil + }, 5*time.Second, 100*time.Millisecond) +} + func (s *SharedServerSuite) testTerminateBatchWorkflow( total int, rps float32, From 389facf1e2e9f544e1b84069f8973889c21bbbcd Mon Sep 17 00:00:00 2001 From: "alex.stanfield" <13949480+chaptersix@users.noreply.github.com> Date: Sat, 30 May 2026 06:34:50 -0500 Subject: [PATCH 2/3] test: simplify skip-count tests and add reset coverage Remove gRPC interceptor from terminate test -- asserting on prompt text is sufficient to verify the count is skipped. Add reset batch test for the same --yes skip-count behavior. Assert terminated error specifically in drain check. --- .../commands.workflow_reset_test.go | 39 ++++++++++++++++++ .../temporalcli/commands.workflow_test.go | 41 ++----------------- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/internal/temporalcli/commands.workflow_reset_test.go b/internal/temporalcli/commands.workflow_reset_test.go index 33b3cc525..5dc87604c 100644 --- a/internal/temporalcli/commands.workflow_reset_test.go +++ b/internal/temporalcli/commands.workflow_reset_test.go @@ -927,3 +927,42 @@ func (sut *batchResetTestData) getWorkflowHistory() ([]*history.HistoryEvent, er return events, nil } + +func (s *SharedServerSuite) TestWorkflow_ResetBatch_SkipsCountWhenYes() { + s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + ctx.Done().Receive(ctx, nil) + return nil, ctx.Err() + }) + + searchAttr := "keyword-" + uuid.NewString() + _, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == 1 + }, 3*time.Second, 100*time.Millisecond) + + res := s.Execute( + "workflow", "reset", + "--address", s.Address(), + "--query", "CustomKeywordField = '"+searchAttr+"'", + "--type", "FirstWorkflowTask", + "--reason", "skip-count-test", + "--yes", + ) + s.NoError(res.Err) + + s.NotContains(res.Stdout.String(), "approximately") + s.Contains(res.Stdout.String(), "matching query") +} diff --git a/internal/temporalcli/commands.workflow_test.go b/internal/temporalcli/commands.workflow_test.go index 9b1aad117..a194436c7 100644 --- a/internal/temporalcli/commands.workflow_test.go +++ b/internal/temporalcli/commands.workflow_test.go @@ -7,6 +7,7 @@ import ( "math/rand" "regexp" "strconv" + "strings" "sync" "time" @@ -387,41 +388,12 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_JSON() { s.NotEmpty(jsonRes["batchJobId"]) } -// TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes verifies that --yes causes -// the batch terminate command to skip the CountWorkflowExecutions call. The count -// is only used for the "Start batch against approximately N workflow(s)?" prompt; -// when --yes bypasses the prompt, issuing it adds latency and prevents batch jobs -// from running on clusters whose visibility API is timing out. func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes() { s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { ctx.Done().Receive(ctx, nil) return nil, ctx.Err() }) - var callLock sync.Mutex - var countCalls, startBatchCalls int - s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append( - s.CommandHarness.Options.AdditionalClientGRPCDialOptions, - grpc.WithChainUnaryInterceptor(func( - ctx context.Context, - method string, req, reply any, - cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, - ) error { - callLock.Lock() - switch req.(type) { - case *workflowservice.CountWorkflowExecutionsRequest: - countCalls++ - case *workflowservice.StartBatchOperationRequest: - startBatchCalls++ - } - callLock.Unlock() - return invoker(ctx, method, req, reply, cc, opts...) - }), - ) - - // Start a workflow so the batch query has at least one match. The count assertion - // is independent of the match count (it asserts zero CountWorkflow calls regardless), - // but executing the batch against a real workflow keeps the test path realistic. searchAttr := "keyword-" + uuid.NewString() run, err := s.Client.ExecuteWorkflow( s.Context, @@ -450,19 +422,14 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenY ) s.NoError(res.Err) - callLock.Lock() - defer callLock.Unlock() - s.Equal(0, countCalls, "CountWorkflowExecutions must not be called when --yes is set") - s.Equal(1, startBatchCalls, "StartBatchOperation must still be called") - - // Sanity-check: the prompt text should reflect the missing count. + // Prompt text should show the query, not the count s.NotContains(res.Stdout.String(), "approximately") s.Contains(res.Stdout.String(), "matching query") - // Drain the workflow so the test fixture cleans up. + // Confirm workflow was terminated s.Eventually(func() bool { err := run.Get(s.Context, nil) - return err != nil + return err != nil && strings.Contains(err.Error(), "terminated") }, 5*time.Second, 100*time.Millisecond) } From f9f2004c63a263370818925a2b7145ba0571b016 Mon Sep 17 00:00:00 2001 From: "alex.stanfield" <13949480+chaptersix@users.noreply.github.com> Date: Sat, 30 May 2026 07:23:58 -0500 Subject: [PATCH 3/3] fix: update delete batch test assertion for --yes skip-count behavior TestWorkflow_Delete_BatchWorkflowSuccess uses -y, which now skips the count call. Update assertion from "approximately 2 workflow(s)" to "matching query" to match the new prompt text. --- internal/temporalcli/commands.workflow_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/temporalcli/commands.workflow_test.go b/internal/temporalcli/commands.workflow_test.go index a194436c7..aedf331f7 100644 --- a/internal/temporalcli/commands.workflow_test.go +++ b/internal/temporalcli/commands.workflow_test.go @@ -251,7 +251,7 @@ func (s *SharedServerSuite) TestWorkflow_Delete_BatchWorkflowSuccess() { "-y", ) s.NoError(res.Err) - s.Contains(res.Stdout.String(), "Start batch against approximately 2 workflow(s)") + s.Contains(res.Stdout.String(), "Start batch against workflows matching query") s.NotContains(res.Stdout.String(), "Deleting Workflow Executions in a global Namespace") s.NotContains(res.Stderr.String(), "Deleting Workflow Executions in a global Namespace")