From dc8b763b144c5b8e93d7334ec36f8837855c8e4f Mon Sep 17 00:00:00 2001 From: JeanCarlos_MartinsDa Date: Thu, 18 Jun 2026 21:09:33 -0300 Subject: [PATCH 1/6] added limit to 1000 in GetOrchestrationWorkItem --- backend/postgres/postgres.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 10d8336..af85c0e 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -140,7 +140,7 @@ func (be *postgresBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi } defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op - var visibleTime*time.Time = nil + var visibleTime *time.Time = nil if delay := wi.GetAbandonDelay(); delay > 0 { t := time.Now().UTC().Add(delay) visibleTime = &t @@ -769,7 +769,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op now := time.Now().UTC() - newLockExpiration:= now.Add(be.options.OrchestrationLockTimeout) + newLockExpiration := now.Add(be.options.OrchestrationLockTimeout) // Place a lock on an orchestration instance that has new events that are ready to be executed. row := tx.QueryRow( @@ -782,7 +782,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) ORDER BY I.SequenceNumber ASC - LIMIT 1 + LIMIT 1000 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table From bdaa47467b318822bcf2db1280d192f5aa3e3ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20Carlos=20Magalh=C3=A3es?= Date: Thu, 18 Jun 2026 21:12:31 -0300 Subject: [PATCH 2/6] increase limit to 100 in GetOrchestrationWorkItem --- backend/postgres/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 7d03000..740ca63 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -790,7 +790,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) ORDER BY I.SequenceNumber ASC - LIMIT 1 + LIMIT 1000 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table From 6f216ca672c2e74b928ab7b48c8bd639e30ead6f Mon Sep 17 00:00:00 2001 From: JeanCarlos_MartinsDa Date: Thu, 18 Jun 2026 22:04:19 -0300 Subject: [PATCH 3/6] early commit approach --- backend/postgres/postgres.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 740ca63..069add9 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -790,7 +790,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) ORDER BY I.SequenceNumber ASC - LIMIT 1000 + LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table @@ -827,30 +827,42 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe } defer events.Close() - maxDequeueCount := int32(0) + type rawEvent struct { + payload []byte + dequeue int32 + } - newEvents := make([]*protos.HistoryEvent, 0, 10) + rawEvents := []rawEvent{} for events.Next() { var eventPayload []byte var dequeueCount int32 if err := events.Scan(&eventPayload, &dequeueCount); err != nil { return nil, fmt.Errorf("failed to read history event: %w", err) } + rawEvents = append(rawEvents, rawEvent{ + payload: eventPayload, + dequeue: dequeueCount, + }) + } + events.Close() - if dequeueCount > maxDequeueCount { - maxDequeueCount = dequeueCount + if err = tx.Commit(ctx); err != nil { + return nil, fmt.Errorf("failed to update orchestration work-item: %w", err) + } + + maxDequeueCount := int32(0) + newEvents := make([]*protos.HistoryEvent, 0, len(rawEvents)) + for _, e := range rawEvents { + if e.dequeue > maxDequeueCount { + maxDequeueCount = e.dequeue } - e, err := backend.UnmarshalHistoryEvent(eventPayload) + evt, err := backend.UnmarshalHistoryEvent(e.payload) if err != nil { return nil, err } - newEvents = append(newEvents, e) - } - - if err = tx.Commit(ctx); err != nil { - return nil, fmt.Errorf("failed to update orchestration work-item: %w", err) + newEvents = append(newEvents, evt) } wi := &backend.OrchestrationWorkItem{ From 1f8e9b66bf1c760e01614735b92cd3090a0946d8 Mon Sep 17 00:00:00 2001 From: JeanCarlos_MartinsDa Date: Fri, 19 Jun 2026 11:59:30 -0300 Subject: [PATCH 4/6] use uuidv7 and ordering by instanceID and sequenceNumber --- backend/client.go | 2 +- backend/postgres/postgres.go | 10 +++++++--- client/client_grpc.go | 7 ++++++- go.mod | 2 +- go.sum | 2 ++ internal/helpers/history.go | 6 +++++- internal/helpers/worker.go | 3 ++- samples/parallel/parallel.go | 3 ++- 8 files changed, 26 insertions(+), 9 deletions(-) diff --git a/backend/client.go b/backend/client.go index aff3918..51f27d9 100644 --- a/backend/client.go +++ b/backend/client.go @@ -46,7 +46,7 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat } } if req.InstanceId == "" { - u, err := uuid.NewRandom() + u, err := uuid.NewV7() if err != nil { return api.EmptyInstanceID, fmt.Errorf("failed to generate instance ID: %w", err) } diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 069add9..1c65483 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -64,7 +64,11 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba } pid := os.Getpid() - uuidStr := uuid.NewString() + u, err := uuid.NewV7() + if err != nil { + return nil + } + uuidStr := u.String() if opts == nil { opts = NewPostgresOptions("localhost", 5432, "postgres", "postgres", "postgres") @@ -789,7 +793,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) - ORDER BY I.SequenceNumber ASC + ORDER BY I.InstanceID, I.SequenceNumber ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, @@ -889,7 +893,7 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 - ORDER BY T.SequenceNumber ASC + ORDER BY T.InstanceID, T.SequenceNumber ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING SequenceNumber, InstanceID, EventPayload`, diff --git a/client/client_grpc.go b/client/client_grpc.go index 996fd98..ef172f2 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -39,7 +39,12 @@ func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orches } } if req.InstanceId == "" { - req.InstanceId = uuid.NewString() + u, err := uuid.NewV7() + if err != nil { + return api.EmptyInstanceID, fmt.Errorf("failed to create uuid: %w", err) + } + + req.InstanceId = u.String() } resp, err := c.client.StartInstance(ctx, req) diff --git a/go.mod b/go.mod index ed00710..1d8b7a3 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.0 require ( github.com/cenkalti/backoff/v4 v4.1.3 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.1 github.com/marusama/semaphore/v2 v2.5.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 050e23d..aae8ee0 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbu github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/internal/helpers/history.go b/internal/helpers/history.go index e7d3aa5..6ca9cfe 100644 --- a/internal/helpers/history.go +++ b/internal/helpers/history.go @@ -21,6 +21,10 @@ func NewExecutionStartedEvent( parentTraceContext *protos.TraceContext, scheduledStartTimeStamp *timestamppb.Timestamp, ) *protos.HistoryEvent { + u, err := uuid.NewV7() + if err != nil { + return nil + } return &protos.HistoryEvent{ EventId: -1, Timestamp: timestamppb.New(time.Now()), @@ -31,7 +35,7 @@ func NewExecutionStartedEvent( Input: input, OrchestrationInstance: &protos.OrchestrationInstance{ InstanceId: instanceId, - ExecutionId: wrapperspb.String(uuid.New().String()), + ExecutionId: wrapperspb.String(u.String()), }, ParentTraceContext: parentTraceContext, ScheduledStartTimestamp: scheduledStartTimeStamp, diff --git a/internal/helpers/worker.go b/internal/helpers/worker.go index 4d507a0..aa2a567 100644 --- a/internal/helpers/worker.go +++ b/internal/helpers/worker.go @@ -14,6 +14,7 @@ func GetDefaultWorkerName() string { } pid := os.Getpid() - uuidStr := uuid.NewString() + u, _ := uuid.NewV7() + uuidStr := u.String() return fmt.Sprintf("%v,%d,%v", hostname, pid, uuidStr) } diff --git a/samples/parallel/parallel.go b/samples/parallel/parallel.go index 7465f37..50df044 100644 --- a/samples/parallel/parallel.go +++ b/samples/parallel/parallel.go @@ -117,7 +117,8 @@ func GetDevicesToUpdate(task.ActivityContext) (any, error) { const deviceCount = 10 deviceIDs := make([]string, deviceCount) for i := 0; i < deviceCount; i++ { - deviceIDs[i] = uuid.NewString() + u, _ := uuid.NewV7() + deviceIDs[i] = u.String() } return deviceIDs, nil } From b1c36f706675689244714d84a5dc877cb2bfadbb Mon Sep 17 00:00:00 2001 From: "Americas\\JeanCarlos_MartinsDa" Date: Mon, 22 Jun 2026 10:47:03 -0300 Subject: [PATCH 5/6] fix suggested changes --- backend/postgres/postgres.go | 2 +- client/client_grpc.go | 2 +- internal/helpers/worker.go | 7 +++++-- samples/parallel/parallel.go | 5 ++++- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 1c65483..e0066dd 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -66,7 +66,7 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba pid := os.Getpid() u, err := uuid.NewV7() if err != nil { - return nil + u = uuid.New() } uuidStr := u.String() diff --git a/client/client_grpc.go b/client/client_grpc.go index ef172f2..7475c40 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -41,7 +41,7 @@ func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orches if req.InstanceId == "" { u, err := uuid.NewV7() if err != nil { - return api.EmptyInstanceID, fmt.Errorf("failed to create uuid: %w", err) + return api.EmptyInstanceID, fmt.Errorf("failed to generate instance ID: %w", err) } req.InstanceId = u.String() diff --git a/internal/helpers/worker.go b/internal/helpers/worker.go index aa2a567..2290fa3 100644 --- a/internal/helpers/worker.go +++ b/internal/helpers/worker.go @@ -14,7 +14,10 @@ func GetDefaultWorkerName() string { } pid := os.Getpid() - u, _ := uuid.NewV7() - uuidStr := u.String() + uuidStr := uuid.NewString() + u, err := uuid.NewV7() + if err == nil { + uuidStr = u.String() + } return fmt.Sprintf("%v,%d,%v", hostname, pid, uuidStr) } diff --git a/samples/parallel/parallel.go b/samples/parallel/parallel.go index 50df044..4df03d9 100644 --- a/samples/parallel/parallel.go +++ b/samples/parallel/parallel.go @@ -117,7 +117,10 @@ func GetDevicesToUpdate(task.ActivityContext) (any, error) { const deviceCount = 10 deviceIDs := make([]string, deviceCount) for i := 0; i < deviceCount; i++ { - u, _ := uuid.NewV7() + u, err := uuid.NewV7() + if err != nil { + return nil, err + } deviceIDs[i] = u.String() } return deviceIDs, nil From 2f7d9c5a3445f859bdfc2a7d5dd5867fac6fe252 Mon Sep 17 00:00:00 2001 From: "Americas\\JeanCarlos_MartinsDa" Date: Mon, 22 Jun 2026 11:00:49 -0300 Subject: [PATCH 6/6] fallback to uuid --- internal/helpers/history.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/helpers/history.go b/internal/helpers/history.go index 6ca9cfe..0bf7dbf 100644 --- a/internal/helpers/history.go +++ b/internal/helpers/history.go @@ -23,7 +23,7 @@ func NewExecutionStartedEvent( ) *protos.HistoryEvent { u, err := uuid.NewV7() if err != nil { - return nil + u = uuid.New() } return &protos.HistoryEvent{ EventId: -1,