diff --git a/backend/client.go b/backend/client.go index aff3918..51f27d9 100644 --- a/backend/client.go +++ b/backend/client.go @@ -46,7 +46,7 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat } } if req.InstanceId == "" { - u, err := uuid.NewRandom() + u, err := uuid.NewV7() if err != nil { return api.EmptyInstanceID, fmt.Errorf("failed to generate instance ID: %w", err) } diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 7d03000..e0066dd 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -64,7 +64,11 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba } pid := os.Getpid() - uuidStr := uuid.NewString() + u, err := uuid.NewV7() + if err != nil { + u = uuid.New() + } + uuidStr := u.String() if opts == nil { opts = NewPostgresOptions("localhost", 5432, "postgres", "postgres", "postgres") @@ -789,7 +793,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) - ORDER BY I.SequenceNumber ASC + ORDER BY I.InstanceID, I.SequenceNumber ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, @@ -827,30 +831,42 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe } defer events.Close() - maxDequeueCount := int32(0) + type rawEvent struct { + payload []byte + dequeue int32 + } - newEvents := make([]*protos.HistoryEvent, 0, 10) + rawEvents := []rawEvent{} for events.Next() { var eventPayload []byte var dequeueCount int32 if err := events.Scan(&eventPayload, &dequeueCount); err != nil { return nil, fmt.Errorf("failed to read history event: %w", err) } + rawEvents = append(rawEvents, rawEvent{ + payload: eventPayload, + dequeue: dequeueCount, + }) + } + events.Close() - if dequeueCount > maxDequeueCount { - maxDequeueCount = dequeueCount + if err = tx.Commit(ctx); err != nil { + return nil, fmt.Errorf("failed to update orchestration work-item: %w", err) + } + + maxDequeueCount := int32(0) + newEvents := make([]*protos.HistoryEvent, 0, len(rawEvents)) + for _, e := range rawEvents { + if e.dequeue > maxDequeueCount { + maxDequeueCount = e.dequeue } - e, err := backend.UnmarshalHistoryEvent(eventPayload) + evt, err := backend.UnmarshalHistoryEvent(e.payload) if err != nil { return nil, err } - newEvents = append(newEvents, e) - } - - if err = tx.Commit(ctx); err != nil { - return nil, fmt.Errorf("failed to update orchestration work-item: %w", err) + newEvents = append(newEvents, evt) } wi := &backend.OrchestrationWorkItem{ @@ -877,7 +893,7 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 - ORDER BY T.SequenceNumber ASC + ORDER BY T.InstanceID, T.SequenceNumber ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING SequenceNumber, InstanceID, EventPayload`, diff --git a/backend/postgres/schema.sql b/backend/postgres/schema.sql index 435fc3a..65ac01b 100644 --- a/backend/postgres/schema.sql +++ b/backend/postgres/schema.sql @@ -27,6 +27,9 @@ CREATE INDEX IF NOT EXISTS IX_Instances_RuntimeStatus ON Instances(RuntimeStatus -- This index is intended to help the performance of multi-instance query CREATE INDEX IF NOT EXISTS IX_Instances_CreatedTime ON Instances(CreatedTime); +-- This index is used to improve queries that use Instances.ParentInstanceID +CREATE INDEX IF NOT EXISTS IX_Instances_ParentInstanceID ON Instances(ParentInstanceID); + CREATE TABLE IF NOT EXISTS History ( InstanceID TEXT NOT NULL, SequenceNumber SERIAL NOT NULL, diff --git a/client/client_grpc.go b/client/client_grpc.go index 996fd98..8063153 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -39,7 +39,12 @@ func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orches } } if req.InstanceId == "" { - req.InstanceId = uuid.NewString() + u, err := uuid.NewV7() + if err == nil { + req.InstanceId = u.String() + } else { + req.InstanceId = uuid.NewString() + } } resp, err := c.client.StartInstance(ctx, req) diff --git a/go.mod b/go.mod index ed00710..1d8b7a3 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.23.0 require ( github.com/cenkalti/backoff/v4 v4.1.3 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.1 github.com/marusama/semaphore/v2 v2.5.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 050e23d..aae8ee0 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbu github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= diff --git a/internal/helpers/history.go b/internal/helpers/history.go index e7d3aa5..0bf7dbf 100644 --- a/internal/helpers/history.go +++ b/internal/helpers/history.go @@ -21,6 +21,10 @@ func NewExecutionStartedEvent( parentTraceContext *protos.TraceContext, scheduledStartTimeStamp *timestamppb.Timestamp, ) *protos.HistoryEvent { + u, err := uuid.NewV7() + if err != nil { + u = uuid.New() + } return &protos.HistoryEvent{ EventId: -1, Timestamp: timestamppb.New(time.Now()), @@ -31,7 +35,7 @@ func NewExecutionStartedEvent( Input: input, OrchestrationInstance: &protos.OrchestrationInstance{ InstanceId: instanceId, - ExecutionId: wrapperspb.String(uuid.New().String()), + ExecutionId: wrapperspb.String(u.String()), }, ParentTraceContext: parentTraceContext, ScheduledStartTimestamp: scheduledStartTimeStamp, diff --git a/internal/helpers/worker.go b/internal/helpers/worker.go index 4d507a0..a17005a 100644 --- a/internal/helpers/worker.go +++ b/internal/helpers/worker.go @@ -14,6 +14,12 @@ func GetDefaultWorkerName() string { } pid := os.Getpid() - uuidStr := uuid.NewString() + u, err := uuid.NewV7() + var uuidStr string + if err == nil { + uuidStr = u.String() + } else { + uuidStr = uuid.NewString() + } return fmt.Sprintf("%v,%d,%v", hostname, pid, uuidStr) } diff --git a/samples/parallel/parallel.go b/samples/parallel/parallel.go index 7465f37..180eb0d 100644 --- a/samples/parallel/parallel.go +++ b/samples/parallel/parallel.go @@ -117,7 +117,12 @@ func GetDevicesToUpdate(task.ActivityContext) (any, error) { const deviceCount = 10 deviceIDs := make([]string, deviceCount) for i := 0; i < deviceCount; i++ { - deviceIDs[i] = uuid.NewString() + u, err := uuid.NewV7() + if err != nil { + deviceIDs[i] = uuid.NewString() + continue + } + deviceIDs[i] = u.String() } return deviceIDs, nil }