Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions client/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"go.opentelemetry.io/otel/codes"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/internal/helpers"
"github.com/microsoft/durabletask-go/internal/protos"
)

Expand Down Expand Up @@ -42,8 +44,20 @@ func (c *TaskHubGrpcClient) ScheduleNewOrchestration(ctx context.Context, orches
req.InstanceId = uuid.NewString()
}

// Start a client span for the schedule operation and propagate its trace
// context to the server via ParentTraceContext. The server (a task hub
// worker or the Azure Functions Durable Task extension) reads it to parent
// the orchestration span, linking the caller's trace to the orchestration
// it starts. This mirrors the in-process backend client, which stamps the
// same span onto the ExecutionStarted event.
ctx, span := helpers.StartNewCreateOrchestrationSpan(ctx, req.Name, req.Version.GetValue(), req.InstanceId)
defer span.End()
req.ParentTraceContext = helpers.TraceContextFromSpan(span)

resp, err := c.client.StartInstance(ctx, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
if ctx.Err() != nil {
return api.EmptyInstanceID, ctx.Err()
}
Expand Down
78 changes: 78 additions & 0 deletions client/client_grpc_trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package client

import (
"context"
"net"
"testing"

"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/internal/protos"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)

// captureSidecar is a minimal TaskHubSidecarService implementation that records
// the CreateInstanceRequest it receives so a test can assert on the propagated
// parent trace context.
type captureSidecar struct {
protos.UnimplementedTaskHubSidecarServiceServer
lastStart *protos.CreateInstanceRequest
}

func (s *captureSidecar) StartInstance(_ context.Context, req *protos.CreateInstanceRequest) (*protos.CreateInstanceResponse, error) {
s.lastStart = req
return &protos.CreateInstanceResponse{InstanceId: req.InstanceId}, nil
}

// Test_ScheduleNewOrchestration_PropagatesParentTraceContext verifies that the
// gRPC client attaches the caller's trace context to CreateInstanceRequest.
// ParentTraceContext, which the server (the Azure Functions Durable Task
// extension, or a durabletask-go task hub) uses to parent the orchestration
// span — linking the caller's trace to the orchestration it starts.
func Test_ScheduleNewOrchestration_PropagatesParentTraceContext(t *testing.T) {
// A sampled provider makes the client emit a sampled span whose context is
// propagated; an unsampled span would intentionally carry no trace context.
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
t.Cleanup(func() { _ = tp.Shutdown(context.Background()) })

srv := &captureSidecar{}
lis := bufconn.Listen(1 << 20)
gs := grpc.NewServer()
protos.RegisterTaskHubSidecarServiceServer(gs, srv)
go func() { _ = gs.Serve(lis) }()
t.Cleanup(gs.Stop)

conn, err := grpc.Dial(
"bufnet",
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
return lis.DialContext(ctx)
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
t.Cleanup(func() { _ = conn.Close() })

c := NewTaskHubGrpcClient(conn, backend.DefaultLogger())

// Start an outer "caller" span; the propagated trace context must carry its
// trace ID, proving the schedule operation joins the caller's trace.
ctx, caller := otel.Tracer("test").Start(context.Background(), "caller")
defer caller.End()
wantTraceID := caller.SpanContext().TraceID().String()

if _, err := c.ScheduleNewOrchestration(ctx, "MyOrchestrator"); err != nil {
t.Fatalf("schedule: %v", err)
}

require.NotNil(t, srv.lastStart, "server should have received a StartInstance request")
tc := srv.lastStart.GetParentTraceContext()
require.NotNil(t, tc, "client should propagate a parent trace context")
assert.Contains(t, tc.GetTraceParent(), wantTraceID,
"propagated traceparent %q should carry the caller's trace ID %q", tc.GetTraceParent(), wantTraceID)
}
Loading
Loading