Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d31a393
Fixed issue that causes error on parallel task execution on Postgres …
mrjmarcelo May 23, 2025
7173e70
Merge branch 'main' of https://github.com/microsoft/durabletask-go
Nov 12, 2025
0ffee53
Added order by over sequence number to prioritize older instances and…
Nov 12, 2025
b443bea
Fixed table alias issue. Applied resource to avoid race conditions.
Nov 14, 2025
48ba21f
Fixed table alias issue for sqlite besides postgres
Nov 14, 2025
0f7ec06
Fixed wrong order by over nonexistent column Instances.SequenceNumber…
Nov 14, 2025
1405b00
Created index to improve queries with ORDER BY Instances.SequenceNumber
Nov 14, 2025
c2f0698
Merge remote-tracking branch 'microsoft/main'
mrjmarcelo Feb 13, 2026
e0762cf
- implemented Backend Start and Stop methods
mrjmarcelo Feb 13, 2026
cb1f11e
Merge pull request #1 from mrjmarcelo/fix/backend-start-and-stop-meth…
mrjmarcelo Feb 13, 2026
860c382
refactoring for sqliteBackend.DeleteTaskHub just to delete db if db i…
mrjmarcelo Feb 17, 2026
de2cf63
Merge pull request #2 from mrjmarcelo/develop
mrjmarcelo Feb 18, 2026
338d340
Merge remote-tracking branch 'microsoft/main'
mrjmarcelo Feb 18, 2026
fcf5205
refactored message for database deletion failure error in delete task…
mrjmarcelo Feb 18, 2026
668a6f6
add index for instances.parent instance id
Jun 9, 2026
9b69789
Merge remote-tracking branch 'microsoft/main'
Jun 15, 2026
dc8b763
added limit to 1000 in GetOrchestrationWorkItem
Jun 19, 2026
bdaa474
increase limit to 100 in GetOrchestrationWorkItem
jeanmartins Jun 19, 2026
2ffcac3
Merge branch 'main' of https://github.com/jeanmartins/durabletask-go
Jun 19, 2026
6f216ca
early commit approach
Jun 19, 2026
1f8e9b6
use uuidv7 and ordering by instanceID and sequenceNumber
Jun 19, 2026
b1c36f7
fix suggested changes
Jun 22, 2026
2f7d9c5
fallback to uuid
Jun 22, 2026
a23efb8
Merge remote-tracking branch 'jeanmartins/main'
Jun 24, 2026
203b0c0
Potential fix for pull request finding
mrjmarcelo Jul 1, 2026
8166ff9
Potential fix for pull request finding
mrjmarcelo Jul 1, 2026
abf38ba
Potential fix for pull request finding
mrjmarcelo Jul 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat
}
}
if req.InstanceId == "" {
u, err := uuid.NewRandom()
u, err := uuid.NewV7()
if err != nil {
return api.EmptyInstanceID, fmt.Errorf("failed to generate instance ID: %w", err)
}
Expand Down
42 changes: 29 additions & 13 deletions backend/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Ba
}

pid := os.Getpid()
uuidStr := uuid.NewString()
u, err := uuid.NewV7()
if err != nil {
u = uuid.New()
}
uuidStr := u.String()

if opts == nil {
opts = NewPostgresOptions("localhost", 5432, "postgres", "postgres", "postgres")
Expand Down Expand Up @@ -789,7 +793,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe
SELECT 1 FROM NewEvents E
WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4)
)
ORDER BY I.SequenceNumber ASC
ORDER BY I.InstanceID, I.SequenceNumber ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
) RETURNING InstanceID`,
Expand Down Expand Up @@ -827,30 +831,42 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe
}
defer events.Close()

Comment thread
mrjmarcelo marked this conversation as resolved.
Comment on lines 832 to 833
maxDequeueCount := int32(0)
type rawEvent struct {
payload []byte
dequeue int32
Comment thread
mrjmarcelo marked this conversation as resolved.
}

newEvents := make([]*protos.HistoryEvent, 0, 10)
rawEvents := []rawEvent{}
for events.Next() {
var eventPayload []byte
var dequeueCount int32
if err := events.Scan(&eventPayload, &dequeueCount); err != nil {
return nil, fmt.Errorf("failed to read history event: %w", err)
}
rawEvents = append(rawEvents, rawEvent{
payload: eventPayload,
dequeue: dequeueCount,
})
}
events.Close()

if dequeueCount > maxDequeueCount {
maxDequeueCount = dequeueCount
if err = tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("failed to update orchestration work-item: %w", err)
}
Comment thread
mrjmarcelo marked this conversation as resolved.

maxDequeueCount := int32(0)
newEvents := make([]*protos.HistoryEvent, 0, len(rawEvents))
for _, e := range rawEvents {
if e.dequeue > maxDequeueCount {
maxDequeueCount = e.dequeue
}

e, err := backend.UnmarshalHistoryEvent(eventPayload)
evt, err := backend.UnmarshalHistoryEvent(e.payload)
if err != nil {
return nil, err
}
Comment thread
mrjmarcelo marked this conversation as resolved.
Comment on lines +864 to 867

newEvents = append(newEvents, e)
}

if err = tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("failed to update orchestration work-item: %w", err)
newEvents = append(newEvents, evt)
}

wi := &backend.OrchestrationWorkItem{
Expand All @@ -877,7 +893,7 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac
WHERE SequenceNumber = (
SELECT SequenceNumber FROM NewTasks T
WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3
ORDER BY T.SequenceNumber ASC
ORDER BY T.InstanceID, T.SequenceNumber ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
) RETURNING SequenceNumber, InstanceID, EventPayload`,
Expand Down
3 changes: 3 additions & 0 deletions backend/postgres/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ CREATE INDEX IF NOT EXISTS IX_Instances_RuntimeStatus ON Instances(RuntimeStatus
-- This index is intended to help the performance of multi-instance query
CREATE INDEX IF NOT EXISTS IX_Instances_CreatedTime ON Instances(CreatedTime);

-- This index is used to improve queries that use Instances.ParentInstanceID
CREATE INDEX IF NOT EXISTS IX_Instances_ParentInstanceID ON Instances(ParentInstanceID);

CREATE TABLE IF NOT EXISTS History (
InstanceID TEXT NOT NULL,
SequenceNumber SERIAL NOT NULL,
Expand Down
7 changes: 6 additions & 1 deletion client/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orches
}
}
if req.InstanceId == "" {
req.InstanceId = uuid.NewString()
u, err := uuid.NewV7()
if err == nil {
req.InstanceId = u.String()
} else {
req.InstanceId = uuid.NewString()
}
}

resp, err := c.client.StartInstance(ctx, req)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/google/uuid v1.3.0
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.7.1
github.com/marusama/semaphore/v2 v2.5.0
github.com/stretchr/testify v1.8.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbu
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
Expand Down
6 changes: 5 additions & 1 deletion internal/helpers/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func NewExecutionStartedEvent(
parentTraceContext *protos.TraceContext,
scheduledStartTimeStamp *timestamppb.Timestamp,
) *protos.HistoryEvent {
u, err := uuid.NewV7()
if err != nil {
u = uuid.New()
}
return &protos.HistoryEvent{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
Expand All @@ -31,7 +35,7 @@ func NewExecutionStartedEvent(
Input: input,
OrchestrationInstance: &protos.OrchestrationInstance{
InstanceId: instanceId,
ExecutionId: wrapperspb.String(uuid.New().String()),
ExecutionId: wrapperspb.String(u.String()),
},
ParentTraceContext: parentTraceContext,
ScheduledStartTimestamp: scheduledStartTimeStamp,
Expand Down
8 changes: 7 additions & 1 deletion internal/helpers/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ func GetDefaultWorkerName() string {
}

pid := os.Getpid()
uuidStr := uuid.NewString()
u, err := uuid.NewV7()
var uuidStr string
if err == nil {
uuidStr = u.String()
} else {
uuidStr = uuid.NewString()
}
return fmt.Sprintf("%v,%d,%v", hostname, pid, uuidStr)
}
7 changes: 6 additions & 1 deletion samples/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ func GetDevicesToUpdate(task.ActivityContext) (any, error) {
const deviceCount = 10
deviceIDs := make([]string, deviceCount)
for i := 0; i < deviceCount; i++ {
deviceIDs[i] = uuid.NewString()
u, err := uuid.NewV7()
if err != nil {
deviceIDs[i] = uuid.NewString()
continue
}
deviceIDs[i] = u.String()
}
return deviceIDs, nil
}
Expand Down