Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8c8fef0
Implement Phases 8-10: Testing, Launch Prep & Documentation
samikshya-db Jan 30, 2026
a1f8b0c
Rebase onto updated PR #320; remove ForceEnableTelemetry; fix test al…
samikshya-db Mar 18, 2026
434114d
Fix gofmt formatting in telemetry files
samikshya-db Apr 2, 2026
fb91627
Remove LAUNCH.md and TROUBLESHOOTING.md
samikshya-db Apr 2, 2026
aa1f4ba
Fix rows.NewRows calls to include telemetry parameters
samikshya-db Apr 6, 2026
e23c725
Fix runQuery test calls to match updated function signature
samikshya-db Apr 6, 2026
1c7c572
Address PR review feedback
samikshya-db Apr 10, 2026
b81f0f1
Fix golangci-lint failures
samikshya-db Apr 10, 2026
edc6e10
Fix remaining golangci-lint failures
samikshya-db Apr 10, 2026
d1c5f09
Make connection-close telemetry flush blocking
samikshya-db Apr 10, 2026
1e5ae75
Fix generateEventID to use crypto/rand without modulo bias
samikshya-db Apr 10, 2026
194ecb1
Rewrite telemetry DESIGN.md as a concise reference doc
samikshya-db Apr 10, 2026
d9cd31b
Wire up DELETE_SESSION, EXECUTE_STATEMENT, CLOSE_STATEMENT telemetry
samikshya-db Apr 10, 2026
09862ba
Remove telemetry/benchmark_test.go
samikshya-db Apr 10, 2026
a35153f
Address PR review: fix shutdown race, featureflag data race, re-add b…
samikshya-db Apr 10, 2026
abd87f5
Wire telemetry tuning params from DSN through to telemetry client
samikshya-db Apr 10, 2026
82b38ab
Fix all golangci-lint issues across the repository
samikshya-db Apr 10, 2026
0b0c544
Merge branch 'main' into stack/PECOBLR-1384-telemetry-phase8-validation
samikshya-db Apr 13, 2026
af10e36
Revert .golangci.yml to main-compatible v1.51 format
samikshya-db Apr 13, 2026
1f44c8f
Fix golangci-lint v1.51 errors: remove dead code, allow unused nolints
samikshya-db Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ linters-settings:
exclude-generated: true
severity: "low"
confidence: "low"
nolintlint:
allow-unused: true

run:
timeout: 5m
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ To disable Cloud Fetch (e.g., when handling smaller datasets or to avoid additio
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?useCloudFetch=false
```

### Telemetry Configuration (Optional)

The driver includes optional telemetry to help improve performance and reliability. Telemetry is **disabled by default** and requires explicit opt-in.

**Opt-in to telemetry** (respects server-side feature flags):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=true
```

**Opt-out of telemetry** (explicitly disable):
```
token:[your token]@[Workspace hostname]:[Port number][Endpoint HTTP Path]?enableTelemetry=false
```

**What data is collected:**
- ✅ Query latency and performance metrics
- ✅ Error codes (not error messages)
- ✅ Feature usage (CloudFetch, LZ4, etc.)
- ✅ Driver version and environment info

**What is NOT collected:**
- ❌ SQL query text
- ❌ Query results or data values
- ❌ Table/column names
- ❌ User identities or credentials

Telemetry has < 1% performance overhead and uses circuit breaker protection to ensure it never impacts your queries. For more details, see `telemetry/DESIGN.md` and `telemetry/TROUBLESHOOTING.md`.

### Connecting with a new Connector

You can also connect with a new connector object. For example:
Expand Down
15 changes: 8 additions & 7 deletions auth/oauth/u2m/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ func NewAuthenticator(hostName string, timeout time.Duration) (auth.Authenticato
cloud := oauth.InferCloudFromHost(hostName)

var clientID, redirectURL string
if cloud == oauth.AWS {
switch cloud {
case oauth.AWS:
clientID = awsClientId
redirectURL = awsRedirectURL
} else if cloud == oauth.Azure {
case oauth.Azure:
clientID = azureClientId
redirectURL = azureRedirectURL
} else if cloud == oauth.GCP {
case oauth.GCP:
clientID = gcpClientId
redirectURL = gcpRedirectURL
} else {
default:
return nil, errors.New("unhandled cloud type: " + cloud.String())
}

Expand Down Expand Up @@ -147,14 +148,14 @@ func (tsp *tokenSourceProvider) GetTokenSource() (oauth2.TokenSource, error) {
if err != nil {
return nil, err
}
defer listener.Close()
defer listener.Close() //nolint:errcheck

srv := &http.Server{
ReadHeaderTimeout: 3 * time.Second,
WriteTimeout: 30 * time.Second,
}

defer srv.Close()
defer srv.Close() //nolint:errcheck

// Start local server to wait for callback
go func() {
Expand Down Expand Up @@ -209,7 +210,7 @@ func (tsp *tokenSourceProvider) ServeHTTP(w http.ResponseWriter, r *http.Request
if resp.err != "" {
log.Error().Msg(resp.err)
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(errorHTML("Identity Provider returned an error: " + resp.err)))
_, err := w.Write([]byte(errorHTML("Identity Provider returned an error: " + resp.err))) //nolint:gosec // XSS not a concern for local OAuth callback
if err != nil {
log.Error().Err(err).Msg("unable to write error response")
}
Expand Down
6 changes: 3 additions & 3 deletions auth/tokenprovider/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (p *FederationProvider) tryTokenExchange(ctx context.Context, subjectToken
}

// Create request
req, err := http.NewRequestWithContext(ctx, "POST", exchangeURL, strings.NewReader(data.Encode()))
req, err := http.NewRequestWithContext(ctx, "POST", exchangeURL, strings.NewReader(data.Encode())) //nolint:gosec // URL is from trusted config
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
Expand All @@ -147,11 +147,11 @@ func (p *FederationProvider) tryTokenExchange(ctx context.Context, subjectToken
req.Header.Set("Accept", "*/*")

// Make request
resp, err := p.httpClient.Do(req)
resp, err := p.httpClient.Do(req) //nolint:gosec // G704: URL is from trusted configuration
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
defer resp.Body.Close() //nolint:errcheck

body, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions auth/tokenprovider/federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func TestFederationProvider_TokenExchangeSuccess(t *testing.T) {
assert.Equal(t, "application/x-www-form-urlencoded", r.Header.Get("Content-Type"))
assert.Equal(t, "*/*", r.Header.Get("Accept"))

// Parse form data
// Parse form data - limit body size to prevent G120
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
err := r.ParseForm()
require.NoError(t, err)

Expand Down Expand Up @@ -155,13 +156,14 @@ func TestFederationProvider_TokenExchangeWithClientID(t *testing.T) {

// Create mock server that checks for client_id
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
err := r.ParseForm()
require.NoError(t, err)

// Verify client_id is present
assert.Equal(t, clientID, r.FormValue("client_id"))

response := map[string]interface{}{
response := map[string]interface{}{ //nolint:gosec // G101: test token, not a real credential
"access_token": "sp-wide-federation-token",
"token_type": "Bearer",
"expires_in": 3600,
Expand Down
4 changes: 2 additions & 2 deletions auth/tokenprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestExternalTokenProvider(t *testing.T) {
callCount := 0
tokenFunc := func() (string, error) {
callCount++
return "external-token-" + string(rune(callCount)), nil
return "external-token-" + string(rune(callCount)), nil //nolint:gosec // G115: test counter, values are always small
}

provider := NewExternalTokenProvider(tokenFunc)
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestExternalTokenProvider(t *testing.T) {
counter := 0
tokenFunc := func() (string, error) {
counter++
return "token-" + string(rune(counter)), nil
return "token-" + string(rune(counter)), nil //nolint:gosec // G115: test counter, values are always small
}

provider := NewExternalTokenProvider(tokenFunc)
Expand Down
59 changes: 44 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ func (c *conn) Close() error {
log := logger.WithContext(c.id, "", "")
ctx := driverctx.NewContextWithConnId(context.Background(), c.id)

// Close telemetry and release resources
// Time CloseSession so we can record DELETE_SESSION before flushing telemetry
closeStart := time.Now()
_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
SessionHandle: c.session.SessionHandle,
})

// Record DELETE_SESSION regardless of error (matches JDBC), then flush and release
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeDeleteSession, time.Since(closeStart).Milliseconds(), err)
_ = c.telemetry.Close(ctx)
telemetry.ReleaseForConnection(c.cfg.Host)
}

_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
SessionHandle: c.session.SessionHandle,
})

if err != nil {
log.Err(err).Msg("databricks: failed to close connection")
return dbsqlerrint.NewBadConnectionError(err)
Expand Down Expand Up @@ -93,7 +96,7 @@ func (c *conn) Ping(ctx context.Context) error {
log.Err(err).Msg("databricks: failed to ping")
return dbsqlerrint.NewBadConnectionError(err)
}
defer rows.Close()
defer rows.Close() //nolint:errcheck

log.Debug().Msg("databricks: ping successful")
return nil
Expand Down Expand Up @@ -155,9 +158,13 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
alreadyClosed := exStmtResp.DirectResults != nil && exStmtResp.DirectResults.CloseOperation != nil
newCtx := driverctx.NewContextWithCorrelationId(driverctx.NewContextWithConnId(context.Background(), c.id), corrId)
if !alreadyClosed && (opStatusResp == nil || opStatusResp.GetOperationState() != cli_service.TOperationState_CLOSED_STATE) {
closeOpStart := time.Now()
_, err1 := c.client.CloseOperation(newCtx, &cli_service.TCloseOperationReq{
OperationHandle: exStmtResp.OperationHandle,
})
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeCloseStatement, time.Since(closeOpStart).Milliseconds(), err1)
}
if err1 != nil {
log.Err(err1).Msg("databricks: failed to close operation after executing statement")
closeOpErr = err1 // Capture for telemetry
Expand Down Expand Up @@ -216,7 +223,15 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
// Telemetry callback for tracking row fetching metrics
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}

rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate)
return rows, err

}
Expand Down Expand Up @@ -381,7 +396,14 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
}
}

executeStart := time.Now()
resp, err := c.client.ExecuteStatement(ctx, &req)
// Record the Thrift call latency as a separate operation metric.
// This is distinct from the statement-level metric (BeforeExecuteWithTime), which
// measures end-to-end latency including polling and row fetching.
if c.telemetry != nil {
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeExecuteStatement, time.Since(executeStart).Milliseconds(), err)
}
var log *logger.DBSQLLogger
log, ctx = client.LoggerAndContext(ctx, resp)

Expand Down Expand Up @@ -514,11 +536,11 @@ func (c *conn) handleStagingPut(ctx context.Context, presignedUrl string, header
}
client := &http.Client{}

dat, err := os.Open(localFile)
dat, err := os.Open(localFile) //nolint:gosec // localFile is provided by the application, not user input
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading local file", err)
}
defer dat.Close()
defer dat.Close() //nolint:errcheck

info, err := dat.Stat()
if err != nil {
Expand All @@ -535,7 +557,7 @@ func (c *conn) handleStagingPut(ctx context.Context, presignedUrl string, header
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error sending http request", err)
}
defer res.Body.Close()
defer res.Body.Close() //nolint:errcheck
content, err := io.ReadAll(res.Body)

if err != nil || !Succeeded(res) {
Expand All @@ -559,7 +581,7 @@ func (c *conn) handleStagingGet(ctx context.Context, presignedUrl string, header
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error sending http request", err)
}
defer res.Body.Close()
defer res.Body.Close() //nolint:errcheck
content, err := io.ReadAll(res.Body)

if err != nil || !Succeeded(res) {
Expand All @@ -583,7 +605,7 @@ func (c *conn) handleStagingRemove(ctx context.Context, presignedUrl string, hea
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error sending http request", err)
}
defer res.Body.Close()
defer res.Body.Close() //nolint:errcheck
content, err := io.ReadAll(res.Body)

if err != nil || !Succeeded(res) {
Expand Down Expand Up @@ -646,11 +668,18 @@ func (c *conn) execStagingOperation(
}

if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
// Telemetry callback for staging operation row fetching
telemetryUpdate := func(chunkCount int, bytesDownloaded int64) {
if c.telemetry != nil {
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
}
}
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, telemetryUpdate)
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
}
defer row.Close()
defer row.Close() //nolint:errcheck

} else {
return dbsqlerrint.NewDriverError(ctx, "staging ctx must be provided.", nil)
Expand All @@ -663,7 +692,7 @@ func (c *conn) execStagingOperation(
if err != nil {
return dbsqlerrint.NewDriverError(ctx, "error fetching staging operation results", err)
}
var stringValues []string = make([]string, 4)
stringValues := make([]string, 4)
for i, val := range sqlRow { // this will either be 3 (remove op) or 4 (put/get) elements
if s, ok := val.(string); ok {
stringValues[i] = s
Expand Down
4 changes: 2 additions & 2 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func TestConn_executeStatement(t *testing.T) {
for _, opTest := range operationStateTests {
closeOperationCount = 0
executeStatementCount = 0
executeStatementResp.DirectResults.OperationStatus.OperationState = &opTest.state
executeStatementResp.DirectResults.OperationStatus.DisplayMessage = &opTest.err
executeStatementResp.DirectResults.OperationStatus.OperationState = &opTest.state //nolint:gosec // G601: pointer is used only within this loop iteration
executeStatementResp.DirectResults.OperationStatus.DisplayMessage = &opTest.err //nolint:gosec // G601: pointer is used only within this loop iteration
_, err := testConn.ExecContext(context.Background(), "select 1", []driver.NamedValue{})
if opTest.err == "" {
assert.NoError(t, err)
Expand Down
12 changes: 10 additions & 2 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
}

protocolVersion := int64(c.cfg.ThriftProtocolVersion)

sessionStart := time.Now()
session, err := tclient.OpenSession(ctx, &cli_service.TOpenSessionReq{
ClientProtocolI64: &protocolVersion,
Configuration: sessionParams,
Expand All @@ -64,6 +66,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
},
CanUseMultipleCatalogs: &c.cfg.CanUseMultipleCatalogs,
})
sessionLatencyMs := time.Since(sessionStart).Milliseconds()

if err != nil {
return nil, dbsqlerrint.NewRequestError(ctx, fmt.Sprintf("error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath), err)
}
Expand All @@ -80,11 +84,15 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
conn.telemetry = telemetry.InitializeForConnection(
ctx,
c.cfg.Host,
c.cfg.DriverVersion,
c.client,
c.cfg.EnableTelemetry,
c.cfg.TelemetryBatchSize,
c.cfg.TelemetryFlushInterval,
)
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs, nil)
}

log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)
Expand Down Expand Up @@ -290,8 +298,8 @@ func WithTransport(t http.RoundTripper) ConnOption {
return func(c *config.Config) {
c.Transport = t

if c.CloudFetchConfig.HTTPClient == nil {
c.CloudFetchConfig.HTTPClient = &http.Client{
if c.HTTPClient == nil {
c.HTTPClient = &http.Client{
Transport: t,
}
}
Expand Down
4 changes: 2 additions & 2 deletions connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ func TestNewConnector(t *testing.T) {

coni, ok := con.(*connector)
require.True(t, ok)
assert.NotNil(t, coni.cfg.CloudFetchConfig.HTTPClient)
assert.Equal(t, customTransport, coni.cfg.CloudFetchConfig.HTTPClient.Transport)
assert.NotNil(t, coni.cfg.HTTPClient)
assert.Equal(t, customTransport, coni.cfg.HTTPClient.Transport)
})
}

Expand Down
Loading
Loading