From d31a393ae680f69c9d2e2486a41b0b98399cf2da Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues de Jesus Date: Fri, 23 May 2025 15:45:54 -0300 Subject: [PATCH 01/19] Fixed issue that causes error on parallel task execution on Postgres backend. --- backend/postgres/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index cf119de..e8c9b6f 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -311,7 +311,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi builder := strings.Builder{} builder.WriteString("INSERT INTO NewTasks (InstanceID, EventPayload) VALUES ") for i := 0; i < newActivityCount; i++ { - builder.WriteString(fmt.Sprintf("($%d, $%d)", 3*i+1, 3*i+2)) + builder.WriteString(fmt.Sprintf("($%d, $%d)", 2*i+1, 2*i+2)) if i < newActivityCount-1 { builder.WriteString(", ") } From 0ffee530d6fecb63dbb8299e1e2722ce7da68f27 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Wed, 12 Nov 2025 19:03:11 -0300 Subject: [PATCH 02/19] Added order by over sequence number to prioritize older instances and tasks --- backend/postgres/postgres.go | 2 ++ backend/sqlite/sqlite.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index a2c2332..bb7232c 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -778,6 +778,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) + ORDER BY SequenceNumber ASC LIMIT 1 ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table @@ -864,6 +865,7 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 + ORDER BY SequenceNumber ASC LIMIT 1 ) RETURNING SequenceNumber, InstanceID, EventPayload`, be.workerName, diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 8343342..b3013d4 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,6 +763,7 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) + ORDER BY [SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table @@ -852,6 +853,7 @@ func (be *sqliteBackend) GetActivityWorkItem(ctx context.Context) (*backend.Acti WHERE [SequenceNumber] = ( SELECT [SequenceNumber] FROM NewTasks T WHERE T.[LockExpiration] IS NULL OR T.[LockExpiration] < ? + ORDER BY [SequenceNumber] ASC LIMIT 1 ) RETURNING [SequenceNumber], [InstanceID], [EventPayload]`, be.workerName, From b443bea7dd4562d01a7fe3a3049104fb99ef78f7 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 12:32:52 -0300 Subject: [PATCH 03/19] Fixed table alias issue. Applied resource to avoid race conditions. --- backend/postgres/postgres.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index bb7232c..7de7890 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -778,8 +778,9 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) - ORDER BY SequenceNumber ASC + ORDER BY I.SequenceNumber ASC LIMIT 1 + FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table newLockExpiration, // Updated LockExpiration for Instances table @@ -865,8 +866,9 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 - ORDER BY SequenceNumber ASC + ORDER BY T.SequenceNumber ASC LIMIT 1 + FOR UPDATE SKIP LOCKED ) RETURNING SequenceNumber, InstanceID, EventPayload`, be.workerName, newLockExpiration, From 48ba21fcf42665dfc389de7dd89505ffd3f38813 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 13:26:01 -0300 Subject: [PATCH 04/19] Fixed table alias issue for sqlite besides postgres --- backend/sqlite/sqlite.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index b3013d4..0a15b71 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,7 +763,7 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) - ORDER BY [SequenceNumber] ASC + ORDER BY I.[SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table @@ -853,7 +853,7 @@ func (be *sqliteBackend) GetActivityWorkItem(ctx context.Context) (*backend.Acti WHERE [SequenceNumber] = ( SELECT [SequenceNumber] FROM NewTasks T WHERE T.[LockExpiration] IS NULL OR T.[LockExpiration] < ? - ORDER BY [SequenceNumber] ASC + ORDER BY T.[SequenceNumber] ASC LIMIT 1 ) RETURNING [SequenceNumber], [InstanceID], [EventPayload]`, be.workerName, From 0f7ec06c2ea20bc7db98243c0c264032a410a496 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 17:36:28 -0300 Subject: [PATCH 05/19] Fixed wrong order by over nonexistent column Instances.SequenceNumber for sqlite. --- backend/sqlite/sqlite.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 0a15b71..c39b14f 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,7 +763,6 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) - ORDER BY I.[SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table From 1405b00656b56a8d637a10056f7258ff7880a906 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 17:52:54 -0300 Subject: [PATCH 06/19] Created index to improve queries with ORDER BY Instances.SequenceNumber --- backend/postgres/schema.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/postgres/schema.sql b/backend/postgres/schema.sql index 153a5ff..435fc3a 100644 --- a/backend/postgres/schema.sql +++ b/backend/postgres/schema.sql @@ -18,6 +18,9 @@ CREATE TABLE IF NOT EXISTS Instances ( ParentInstanceID TEXT NULL ); +-- This index is used to improve queries with ORDER BY Instances.SequenceNumber +CREATE INDEX IF NOT EXISTS IX_Instances_SequenceNumber ON Instances(SequenceNumber); + -- This index is used by LockNext and Purge logic CREATE INDEX IF NOT EXISTS IX_Instances_RuntimeStatus ON Instances(RuntimeStatus); From e0762cf003e2516394211b7e8ef45d2ca39c7406 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues de Jesus Date: Fri, 13 Feb 2026 16:39:51 -0300 Subject: [PATCH 07/19] - implemented Backend Start and Stop methods - adapted Backend CreateTaskHub and DeleteTaskHub to call Start and Stop methods instead of directly manage backend database --- backend/postgres/postgres.go | 40 ++++++++++++++++++++++---------- backend/sqlite/sqlite.go | 44 ++++++++++++++++++++++-------------- 2 files changed, 55 insertions(+), 29 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 7de7890..4b23b1e 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -79,16 +79,16 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba } // CreateTaskHub creates the postgres database and applies the schema -func (be *postgresBackend) CreateTaskHub(context.Context) error { - pool, err := pgxpool.NewWithConfig(context.Background(), be.options.PgOptions) - if err != nil { - be.logger.Error("CreateTaskHub", "failed to create a new postgres pool", err) - return err +func (be *postgresBackend) CreateTaskHub(ctx context.Context) error { + if err := be.Start(ctx); err != nil { + be.logger.Error("CreateTaskHub", "failed to start the backend", err) + return fmt.Errorf("failed to start the backend: %w", err) } - be.db = pool + // Initialize database - if _, err := be.db.Exec(context.Background(), schema); err != nil { - panic(fmt.Errorf("failed to initialize the database: %w", err)) + if _, err := be.db.Exec(ctx, schema); err != nil { + be.logger.Error("CreateTaskHub", "failed to initialize the database", err) + return fmt.Errorf("failed to initialize the database: %w", err) } return nil @@ -120,8 +120,10 @@ func (be *postgresBackend) DeleteTaskHub(ctx context.Context) error { return fmt.Errorf("failed to drop NewTasks table: %w", err) } - be.db.Close() - be.db = nil + if err := be.Stop(ctx); err != nil { + be.logger.Error("DeleteTaskHub", "failed to stop the backend", err) + return fmt.Errorf("failed to stop the backend: %w", err) + } return nil } @@ -989,12 +991,26 @@ func (be *postgresBackend) PurgeOrchestrationState(ctx context.Context, id api.I } // Start implements backend.Backend -func (*postgresBackend) Start(context.Context) error { +func (be *postgresBackend) Start(ctx context.Context) error { + if be.db == nil { + pool, err := pgxpool.NewWithConfig(ctx, be.options.PgOptions) + if err != nil { + be.logger.Error("Start", "failed to create a new postgres pool", err) + return fmt.Errorf("failed to create a new postgres pool %w", err) + } + be.db = pool + } + return nil } // Stop implements backend.Backend -func (*postgresBackend) Stop(context.Context) error { +func (be *postgresBackend) Stop(context.Context) error { + if be.db != nil { + be.db.Close() + be.db = nil + } + return nil } diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index c39b14f..fb09705 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -86,30 +86,21 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen } // CreateTaskHub creates the sqlite database and applies the schema -func (be *sqliteBackend) CreateTaskHub(context.Context) error { - db, err := sql.Open("sqlite", be.dsn) - if err != nil { - panic(fmt.Errorf("failed to open the database: %w", err)) +func (be *sqliteBackend) CreateTaskHub(ctx context.Context) error { + if err := be.Start(ctx); err != nil { + return fmt.Errorf("failed to start the backend: %w", err) } // Initialize database - if _, err := db.Exec(schema); err != nil { - panic(fmt.Errorf("failed to initialize the database: %w", err)) + if _, err := be.db.Exec(schema); err != nil { + return fmt.Errorf("failed to initialize the database: %w", err) } - // TODO: This is to avoid SQLITE_BUSY errors when there are concurrent - // operations on the database. However, it can hurt performance. - // We should consider removing this and looking for alternate - // solutions if sqlite performance becomes a problem for users. - db.SetMaxOpenConns(1) - - be.db = db - return nil } func (be *sqliteBackend) DeleteTaskHub(ctx context.Context) error { - be.db = nil + be.Stop(ctx) if be.options.FilePath == "" { // In-memory DB @@ -978,12 +969,31 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins } // Start implements backend.Backend -func (*sqliteBackend) Start(context.Context) error { +func (be *sqliteBackend) Start(context.Context) error { + if be.db == nil { + db, err := sql.Open("sqlite", be.dsn) + if err != nil { + return fmt.Errorf("failed to open the database: %w", err) + } + + // TODO: This is to avoid SQLITE_BUSY errors when there are concurrent + // operations on the database. However, it can hurt performance. + // We should consider removing this and looking for alternate + // solutions if sqlite performance becomes a problem for users. + db.SetMaxOpenConns(1) + + be.db = db + } + return nil } // Stop implements backend.Backend -func (*sqliteBackend) Stop(context.Context) error { +func (be *sqliteBackend) Stop(context.Context) error { + if be.db != nil { + be.db = nil + } + return nil } From 860c38209fc981e1b7b350fee8e3d43302ccbd86 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues de Jesus Date: Tue, 17 Feb 2026 13:27:01 -0300 Subject: [PATCH 08/19] refactoring for sqliteBackend.DeleteTaskHub just to delete db if db is not nil and handle error from Stop method if it happens --- backend/sqlite/sqlite.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index fb09705..7837ec1 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -100,7 +100,13 @@ func (be *sqliteBackend) CreateTaskHub(ctx context.Context) error { } func (be *sqliteBackend) DeleteTaskHub(ctx context.Context) error { - be.Stop(ctx) + if be.db == nil { + return nil + } + + if err := be.Stop(ctx); err != nil { + return fmt.Errorf("failed to stop the backend: %w", err) + } if be.options.FilePath == "" { // In-memory DB From fcf5205774f9569a9c1da18ff0c5c0a1fa87773d Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues de Jesus Date: Wed, 18 Feb 2026 10:55:36 -0300 Subject: [PATCH 09/19] refactored message for database deletion failure error in delete task hub method for sqlite backend --- backend/sqlite/sqlite.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 7837ec1..271da9a 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -119,7 +119,7 @@ func (be *sqliteBackend) DeleteTaskHub(ctx context.Context) error { } else if os.IsNotExist(err) { return backend.ErrTaskHubNotFound } else { - return err + return fmt.Errorf("failed to delete the database: %w", err) } } } From 668a6f6b58d8ce9f7305ceca4a905c54b64556d4 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Tue, 9 Jun 2026 16:55:52 -0300 Subject: [PATCH 10/19] add index for instances.parent instance id --- backend/postgres/schema.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/postgres/schema.sql b/backend/postgres/schema.sql index 435fc3a..65ac01b 100644 --- a/backend/postgres/schema.sql +++ b/backend/postgres/schema.sql @@ -27,6 +27,9 @@ CREATE INDEX IF NOT EXISTS IX_Instances_RuntimeStatus ON Instances(RuntimeStatus -- This index is intended to help the performance of multi-instance query CREATE INDEX IF NOT EXISTS IX_Instances_CreatedTime ON Instances(CreatedTime); +-- This index is used to improve queries that use Instances.ParentInstanceID +CREATE INDEX IF NOT EXISTS IX_Instances_ParentInstanceID ON Instances(ParentInstanceID); + CREATE TABLE IF NOT EXISTS History ( InstanceID TEXT NOT NULL, SequenceNumber SERIAL NOT NULL, From dc8b763b144c5b8e93d7334ec36f8837855c8e4f Mon Sep 17 00:00:00 2001 From: JeanCarlos_MartinsDa Date: Thu, 18 Jun 2026 21:09:33 -0300 Subject: [PATCH 11/19] added limit to 1000 in GetOrchestrationWorkItem --- backend/postgres/postgres.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 10d8336..af85c0e 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -140,7 +140,7 @@ func (be *postgresBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi } defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op - var visibleTime*time.Time = nil + var visibleTime *time.Time = nil if delay := wi.GetAbandonDelay(); delay > 0 { t := time.Now().UTC().Add(delay) visibleTime = &t @@ -769,7 +769,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op now := time.Now().UTC() - newLockExpiration:= now.Add(be.options.OrchestrationLockTimeout) + newLockExpiration := now.Add(be.options.OrchestrationLockTimeout) // Place a lock on an orchestration instance that has new events that are ready to be executed. row := tx.QueryRow( @@ -782,7 +782,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) ORDER BY I.SequenceNumber ASC - LIMIT 1 + LIMIT 1000 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table From bdaa47467b318822bcf2db1280d192f5aa3e3ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean=20Carlos=20Magalh=C3=A3es?= Date: Thu, 18 Jun 2026 21:12:31 -0300 Subject: [PATCH 12/19] increase limit to 100 in GetOrchestrationWorkItem --- backend/postgres/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 7d03000..740ca63 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -790,7 +790,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) ORDER BY I.SequenceNumber ASC - LIMIT 1 + LIMIT 1000 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table From 6f216ca672c2e74b928ab7b48c8bd639e30ead6f Mon Sep 17 00:00:00 2001 From: JeanCarlos_MartinsDa Date: Thu, 18 Jun 2026 22:04:19 -0300 Subject: [PATCH 13/19] early commit approach --- backend/postgres/postgres.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 740ca63..069add9 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -790,7 +790,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) ORDER BY I.SequenceNumber ASC - LIMIT 1000 + LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table @@ -827,30 +827,42 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe } defer events.Close() - maxDequeueCount := int32(0) + type rawEvent struct { + payload []byte + dequeue int32 + } - newEvents := make([]*protos.HistoryEvent, 0, 10) + rawEvents := []rawEvent{} for events.Next() { var eventPayload []byte var dequeueCount int32 if err := events.Scan(&eventPayload, &dequeueCount); err != nil { return nil, fmt.Errorf("failed to read history event: %w", err) } + rawEvents = append(rawEvents, rawEvent{ + payload: eventPayload, + dequeue: dequeueCount, + }) + } + events.Close() - if dequeueCount > maxDequeueCount { - maxDequeueCount = dequeueCount + if err = tx.Commit(ctx); err != nil { + return nil, fmt.Errorf("failed to update orchestration work-item: %w", err) + } + + maxDequeueCount := int32(0) + newEvents := make([]*protos.HistoryEvent, 0, len(rawEvents)) + for _, e := range rawEvents { + if e.dequeue > maxDequeueCount { + maxDequeueCount = e.dequeue } - e, err := backend.UnmarshalHistoryEvent(eventPayload) + evt, err := backend.UnmarshalHistoryEvent(e.payload) if err != nil { return nil, err } - newEvents = append(newEvents, e) - } - - if err = tx.Commit(ctx); err != nil { - return nil, fmt.Errorf("failed to update orchestration work-item: %w", err) + newEvents = append(newEvents, evt) } wi := &backend.OrchestrationWorkItem{ From 1f8e9b66bf1c760e01614735b92cd3090a0946d8 Mon Sep 17 00:00:00 2001 From: JeanCarlos_MartinsDa Date: Fri, 19 Jun 2026 11:59:30 -0300 Subject: [PATCH 14/19] use uuidv7 and ordering by instanceID and sequenceNumber --- backend/client.go | 2 +- backend/postgres/postgres.go | 10 +++++++--- client/client_grpc.go | 7 ++++++- go.mod | 2 +- go.sum | 2 ++ internal/helpers/history.go | 6 +++++- internal/helpers/worker.go | 3 ++- samples/parallel/parallel.go | 3 ++- 8 files changed, 26 insertions(+), 9 deletions(-) diff --git a/backend/client.go b/backend/client.go index aff3918..51f27d9 100644 --- a/backend/client.go +++ b/backend/client.go @@ -46,7 +46,7 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat } } if req.InstanceId == "" { - u, err := uuid.NewRandom() + u, err := uuid.NewV7() if err != nil { return api.EmptyInstanceID, fmt.Errorf("failed to generate instance ID: %w", err) } diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 069add9..1c65483 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -64,7 +64,11 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba } pid := os.Getpid() - uuidStr := uuid.NewString() + u, err := uuid.NewV7() + if err != nil { + return nil + } + uuidStr := u.String() if opts == nil { opts = NewPostgresOptions("localhost", 5432, "postgres", "postgres", "postgres") @@ -789,7 +793,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) - ORDER BY I.SequenceNumber ASC + ORDER BY I.InstanceID, I.SequenceNumber ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, @@ -889,7 +893,7 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 - ORDER BY T.SequenceNumber ASC + ORDER BY T.InstanceID, T.SequenceNumber ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING SequenceNumber, InstanceID, EventPayload`, diff --git a/client/client_grpc.go b/client/client_grpc.go index 996fd98..ef172f2 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -39,7 +39,12 @@ func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orches } } if req.InstanceId == "" { - req.InstanceId = uuid.NewString() + u, err := uuid.NewV7() + if err != nil { + return api.EmptyInstanceID, fmt.Errorf("failed to create uuid: %w", err) + } + + req.InstanceId = u.String() } resp, err := c.client.StartInstance(ctx, req) diff --git a/go.mod b/go.mod index ed00710..1d8b7a3 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.0 require ( github.com/cenkalti/backoff/v4 v4.1.3 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.1 github.com/marusama/semaphore/v2 v2.5.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 050e23d..aae8ee0 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbu github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/internal/helpers/history.go b/internal/helpers/history.go index e7d3aa5..6ca9cfe 100644 --- a/internal/helpers/history.go +++ b/internal/helpers/history.go @@ -21,6 +21,10 @@ func NewExecutionStartedEvent( parentTraceContext *protos.TraceContext, scheduledStartTimeStamp *timestamppb.Timestamp, ) *protos.HistoryEvent { + u, err := uuid.NewV7() + if err != nil { + return nil + } return &protos.HistoryEvent{ EventId: -1, Timestamp: timestamppb.New(time.Now()), @@ -31,7 +35,7 @@ func NewExecutionStartedEvent( Input: input, OrchestrationInstance: &protos.OrchestrationInstance{ InstanceId: instanceId, - ExecutionId: wrapperspb.String(uuid.New().String()), + ExecutionId: wrapperspb.String(u.String()), }, ParentTraceContext: parentTraceContext, ScheduledStartTimestamp: scheduledStartTimeStamp, diff --git a/internal/helpers/worker.go b/internal/helpers/worker.go index 4d507a0..aa2a567 100644 --- a/internal/helpers/worker.go +++ b/internal/helpers/worker.go @@ -14,6 +14,7 @@ func GetDefaultWorkerName() string { } pid := os.Getpid() - uuidStr := uuid.NewString() + u, _ := uuid.NewV7() + uuidStr := u.String() return fmt.Sprintf("%v,%d,%v", hostname, pid, uuidStr) } diff --git a/samples/parallel/parallel.go b/samples/parallel/parallel.go index 7465f37..50df044 100644 --- a/samples/parallel/parallel.go +++ b/samples/parallel/parallel.go @@ -117,7 +117,8 @@ func GetDevicesToUpdate(task.ActivityContext) (any, error) { const deviceCount = 10 deviceIDs := make([]string, deviceCount) for i := 0; i < deviceCount; i++ { - deviceIDs[i] = uuid.NewString() + u, _ := uuid.NewV7() + deviceIDs[i] = u.String() } return deviceIDs, nil } From b1c36f706675689244714d84a5dc877cb2bfadbb Mon Sep 17 00:00:00 2001 From: "Americas\\JeanCarlos_MartinsDa" Date: Mon, 22 Jun 2026 10:47:03 -0300 Subject: [PATCH 15/19] fix suggested changes --- backend/postgres/postgres.go | 2 +- client/client_grpc.go | 2 +- internal/helpers/worker.go | 7 +++++-- samples/parallel/parallel.go | 5 ++++- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 1c65483..e0066dd 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -66,7 +66,7 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba pid := os.Getpid() u, err := uuid.NewV7() if err != nil { - return nil + u = uuid.New() } uuidStr := u.String() diff --git a/client/client_grpc.go b/client/client_grpc.go index ef172f2..7475c40 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -41,7 +41,7 @@ func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orches if req.InstanceId == "" { u, err := uuid.NewV7() if err != nil { - return api.EmptyInstanceID, fmt.Errorf("failed to create uuid: %w", err) + return api.EmptyInstanceID, fmt.Errorf("failed to generate instance ID: %w", err) } req.InstanceId = u.String() diff --git a/internal/helpers/worker.go b/internal/helpers/worker.go index aa2a567..2290fa3 100644 --- a/internal/helpers/worker.go +++ b/internal/helpers/worker.go @@ -14,7 +14,10 @@ func GetDefaultWorkerName() string { } pid := os.Getpid() - u, _ := uuid.NewV7() - uuidStr := u.String() + uuidStr := uuid.NewString() + u, err := uuid.NewV7() + if err == nil { + uuidStr = u.String() + } return fmt.Sprintf("%v,%d,%v", hostname, pid, uuidStr) } diff --git a/samples/parallel/parallel.go b/samples/parallel/parallel.go index 50df044..4df03d9 100644 --- a/samples/parallel/parallel.go +++ b/samples/parallel/parallel.go @@ -117,7 +117,10 @@ func GetDevicesToUpdate(task.ActivityContext) (any, error) { const deviceCount = 10 deviceIDs := make([]string, deviceCount) for i := 0; i < deviceCount; i++ { - u, _ := uuid.NewV7() + u, err := uuid.NewV7() + if err != nil { + return nil, err + } deviceIDs[i] = u.String() } return deviceIDs, nil From 2f7d9c5a3445f859bdfc2a7d5dd5867fac6fe252 Mon Sep 17 00:00:00 2001 From: "Americas\\JeanCarlos_MartinsDa" Date: Mon, 22 Jun 2026 11:00:49 -0300 Subject: [PATCH 16/19] fallback to uuid --- internal/helpers/history.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/helpers/history.go b/internal/helpers/history.go index 6ca9cfe..0bf7dbf 100644 --- a/internal/helpers/history.go +++ b/internal/helpers/history.go @@ -23,7 +23,7 @@ func NewExecutionStartedEvent( ) *protos.HistoryEvent { u, err := uuid.NewV7() if err != nil { - return nil + u = uuid.New() } return &protos.HistoryEvent{ EventId: -1, From 203b0c06adcccd20999fce58ce612f3221644787 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues <49560330+mrjmarcelo@users.noreply.github.com> Date: Wed, 1 Jul 2026 13:31:44 -0300 Subject: [PATCH 17/19] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- client/client_grpc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/client_grpc.go b/client/client_grpc.go index 7475c40..8063153 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -40,11 +40,11 @@ func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orches } if req.InstanceId == "" { u, err := uuid.NewV7() - if err != nil { - return api.EmptyInstanceID, fmt.Errorf("failed to generate instance ID: %w", err) + if err == nil { + req.InstanceId = u.String() + } else { + req.InstanceId = uuid.NewString() } - - req.InstanceId = u.String() } resp, err := c.client.StartInstance(ctx, req) From 8166ff9990158c8968a091d0bb11fcfe08eb0460 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues <49560330+mrjmarcelo@users.noreply.github.com> Date: Wed, 1 Jul 2026 15:21:42 -0300 Subject: [PATCH 18/19] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- internal/helpers/worker.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/helpers/worker.go b/internal/helpers/worker.go index 2290fa3..a17005a 100644 --- a/internal/helpers/worker.go +++ b/internal/helpers/worker.go @@ -14,10 +14,12 @@ func GetDefaultWorkerName() string { } pid := os.Getpid() - uuidStr := uuid.NewString() - u, err := uuid.NewV7() - if err == nil { - uuidStr = u.String() - } + u, err := uuid.NewV7() + var uuidStr string + if err == nil { + uuidStr = u.String() + } else { + uuidStr = uuid.NewString() + } return fmt.Sprintf("%v,%d,%v", hostname, pid, uuidStr) } From abf38bab9128104cd6f2cc61e0d054b5a881ef6c Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues <49560330+mrjmarcelo@users.noreply.github.com> Date: Wed, 1 Jul 2026 16:55:23 -0300 Subject: [PATCH 19/19] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- samples/parallel/parallel.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/samples/parallel/parallel.go b/samples/parallel/parallel.go index 4df03d9..180eb0d 100644 --- a/samples/parallel/parallel.go +++ b/samples/parallel/parallel.go @@ -118,9 +118,10 @@ func GetDevicesToUpdate(task.ActivityContext) (any, error) { deviceIDs := make([]string, deviceCount) for i := 0; i < deviceCount; i++ { u, err := uuid.NewV7() - if err != nil { - return nil, err - } + if err != nil { + deviceIDs[i] = uuid.NewString() + continue + } deviceIDs[i] = u.String() } return deviceIDs, nil