Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ linters:
msg: Use errors.Is(err, fs.ErrExist) instead.
- pattern: 'os\.IsPermission'
msg: Use errors.Is(err, fs.ErrPermission) instead.
- pattern: 'sync\.Once\b($|[^FV])'
msg: Use sync.OnceFunc, sync.OnceValue, or sync.OnceValues instead.
analyze-types: true
copyloopvar:
check-alias: true
Expand Down
42 changes: 21 additions & 21 deletions bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package bundle

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -121,11 +120,8 @@ type Bundle struct {
// in the WSFS location containing the bundle state.
Metadata metadata.Metadata

// Store a pointer to the workspace client.
// It can be initialized on demand after loading the configuration.
clientOnce sync.Once
client *databricks.WorkspaceClient
clientErr error
// Returns the workspace client, initializing it on first call.
getClient func() (*databricks.WorkspaceClient, error)

// Files that are synced to the workspace.file_path
Files []fileset.File
Expand Down Expand Up @@ -225,16 +221,21 @@ func TryLoad(ctx context.Context) *Bundle {
return b
}

func (b *Bundle) WorkspaceClientE() (*databricks.WorkspaceClient, error) {
b.clientOnce.Do(func() {
var err error
b.client, err = b.Config.Workspace.Client()
func (b *Bundle) initClientOnce() {
b.getClient = sync.OnceValues(func() (*databricks.WorkspaceClient, error) {
w, err := b.Config.Workspace.Client()
if err != nil {
b.clientErr = fmt.Errorf("cannot resolve bundle auth configuration: %w", err)
return nil, fmt.Errorf("cannot resolve bundle auth configuration: %w", err)
}
return w, nil
})
}

return b.client, b.clientErr
func (b *Bundle) WorkspaceClientE() (*databricks.WorkspaceClient, error) {
if b.getClient == nil {
b.initClientOnce()
}
return b.getClient()
}

func (b *Bundle) WorkspaceClient() *databricks.WorkspaceClient {
Expand All @@ -249,16 +250,15 @@ func (b *Bundle) WorkspaceClient() *databricks.WorkspaceClient {
// SetWorkpaceClient sets the workspace client for this bundle.
// This is used to inject a mock client for testing.
func (b *Bundle) SetWorkpaceClient(w *databricks.WorkspaceClient) {
b.clientOnce.Do(func() {})
b.client = w
b.getClient = func() (*databricks.WorkspaceClient, error) {
return w, nil
}
}

// ClearWorkspaceClient resets the workspace client cache, allowing
// WorkspaceClientE() to attempt client creation again on the next call.
func (b *Bundle) ClearWorkspaceClient() {
b.clientOnce = sync.Once{}
b.client = nil
b.clientErr = nil
b.initClientOnce()
}

// LocalStateDir returns directory to use for temporary files for this bundle without creating
Expand Down Expand Up @@ -347,12 +347,12 @@ func (b *Bundle) GetSyncIncludePatterns(ctx context.Context) ([]string, error) {
// This map can be used to configure authentication for tools that
// we call into from this bundle context.
func (b *Bundle) AuthEnv() (map[string]string, error) {
if b.client == nil {
return nil, errors.New("workspace client not initialized yet")
w, err := b.WorkspaceClientE()
if err != nil {
return nil, err
}

cfg := b.client.Config
return auth.Env(cfg), nil
return auth.Env(w.Config), nil
}

// StateFilenameDirect returns (relative remote path, relative local path) for direct engine resource state
Expand Down
58 changes: 23 additions & 35 deletions bundle/direct/dresources/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,48 +75,36 @@ var resourcesYAML []byte
//go:embed resources.generated.yml
var resourcesGeneratedYAML []byte

var (
configOnce sync.Once
globalConfig *Config
generatedConfigOnce sync.Once
generatedConfig *Config
empty = ResourceLifecycleConfig{
IgnoreRemoteChanges: nil,
IgnoreLocalChanges: nil,
RecreateOnChanges: nil,
UpdateIDOnChanges: nil,
BackendDefaults: nil,
}
)
var empty = ResourceLifecycleConfig{
IgnoreRemoteChanges: nil,
IgnoreLocalChanges: nil,
RecreateOnChanges: nil,
UpdateIDOnChanges: nil,
BackendDefaults: nil,
}

// MustLoadConfig loads and parses the embedded resources.yml configuration.
// The config is loaded once and cached for subsequent calls.
// Panics if the embedded YAML is invalid.
func MustLoadConfig() *Config {
configOnce.Do(func() {
globalConfig = &Config{
Resources: nil,
}
if err := yaml.Unmarshal(resourcesYAML, globalConfig); err != nil {
func mustParseConfig(data []byte) func() *Config {
return sync.OnceValue(func() *Config {
c := &Config{Resources: nil}
if err := yaml.Unmarshal(data, c); err != nil {
panic(err)
}
return c
})
return globalConfig
}

// MustLoadGeneratedConfig loads and parses the embedded resources.generated.yml configuration.
// The config is loaded once and cached for subsequent calls.
// Panics if the embedded YAML is invalid.
var loadConfig = mustParseConfig(resourcesYAML)

var loadGeneratedConfig = mustParseConfig(resourcesGeneratedYAML)

// MustLoadConfig returns the parsed resources.yml configuration.
func MustLoadConfig() *Config {
return loadConfig()
}

// MustLoadGeneratedConfig returns the parsed resources.generated.yml configuration.
func MustLoadGeneratedConfig() *Config {
generatedConfigOnce.Do(func() {
generatedConfig = &Config{
Resources: nil,
}
if err := yaml.Unmarshal(resourcesGeneratedYAML, generatedConfig); err != nil {
panic(err)
}
})
return generatedConfig
return loadGeneratedConfig()
}

// GetResourceConfig returns the lifecycle config for a given resource type.
Expand Down
13 changes: 5 additions & 8 deletions internal/build/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ func (i Info) GetSanitizedVersion() string {
return version
}

var info Info

var once sync.Once

const DefaultSemver = "0.0.0-dev"

// getDefaultBuildVersion uses build information stored by Go itself
Expand Down Expand Up @@ -73,7 +69,7 @@ func getDefaultBuildVersion() string {
return out
}

func initialize() {
func initialize() Info {
// If buildVersion is empty it means the binary was NOT built through goreleaser.
// We try to pull version information from debug.BuildInfo().
if buildVersion == "" {
Expand All @@ -86,7 +82,7 @@ func initialize() {
panic(fmt.Sprintf(`version is not a valid semver string: "%s"`, buildVersion))
}

info = Info{
return Info{
ProjectName: buildProjectName,
Version: buildVersion,

Expand All @@ -106,9 +102,10 @@ func initialize() {
}
}

var getInfo = sync.OnceValue(initialize)

func GetInfo() Info {
once.Do(initialize)
return info
return getInfo()
}

func parseInt(s string) int64 {
Expand Down
6 changes: 4 additions & 2 deletions internal/build/variables.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package build

import "sync"

var (
buildProjectName string = "cli"
buildVersion string = ""
Expand All @@ -23,8 +25,8 @@ var (
buildTimestamp string = "0"
)

// This function is used to set the build version for testing purposes.
// SetBuildVersion sets the build version for testing purposes.
func SetBuildVersion(version string) {
buildVersion = version
info.Version = version
getInfo = sync.OnceValue(initialize)
}
11 changes: 4 additions & 7 deletions libs/apps/logstream/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,12 @@ func handleCloseError(err error) (bool, error) {
}

func watchContext(ctx context.Context, conn *websocket.Conn) func() {
var once sync.Once
closeCh := make(chan struct{})

closeConn := func() {
once.Do(func() {
_ = conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "context canceled"), time.Now().Add(time.Second))
_ = conn.Close()
})
}
closeConn := sync.OnceFunc(func() {
_ = conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "context canceled"), time.Now().Add(time.Second))
_ = conn.Close()
})

go func() {
select {
Expand Down
34 changes: 19 additions & 15 deletions libs/apps/vite/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Bridge struct {
tunnelID string
tunnelWriteChan chan prioritizedMessage
stopChan chan struct{}
stopOnce sync.Once
stop func()
httpClient *http.Client
connectionRequests chan *BridgeMessage
port int
Expand All @@ -101,7 +101,7 @@ func NewBridge(ctx context.Context, w *databricks.WorkspaceClient, appName strin
DisableCompression: false,
}

return &Bridge{
b := &Bridge{
ctx: ctx,
w: w,
appName: appName,
Expand All @@ -114,6 +114,22 @@ func NewBridge(ctx context.Context, w *databricks.WorkspaceClient, appName strin
connectionRequests: make(chan *BridgeMessage, 10),
port: port,
}

b.stop = sync.OnceFunc(func() {
close(b.stopChan)

if b.tunnelConn != nil {
_ = b.tunnelConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
b.tunnelConn.Close()
}

if b.hmrConn != nil {
_ = b.hmrConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
b.hmrConn.Close()
}
})

return b
}

func (vb *Bridge) getAuthHeaders(wsURL string) (http.Header, error) {
Expand Down Expand Up @@ -965,17 +981,5 @@ func (vb *Bridge) Start() error {
}

func (vb *Bridge) Stop() {
vb.stopOnce.Do(func() {
close(vb.stopChan)

if vb.tunnelConn != nil {
_ = vb.tunnelConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
vb.tunnelConn.Close()
}

if vb.hmrConn != nil {
_ = vb.hmrConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
vb.hmrConn.Close()
}
})
vb.stop()
}
27 changes: 12 additions & 15 deletions libs/cmdio/spinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func (m spinnerModel) View() string {
// The spinner automatically degrades in non-interactive terminals.
// Context cancellation will automatically close the spinner.
type spinner struct {
p *tea.Program // nil in non-interactive mode
c *cmdIO
ctx context.Context
once sync.Once
done chan struct{} // Closed when tea.Program finishes
p *tea.Program // nil in non-interactive mode
c *cmdIO
ctx context.Context
sendQuit func()
done chan struct{} // Closed when tea.Program finishes
}

// Update sends a status message to the spinner.
Expand All @@ -121,11 +121,7 @@ func (sp *spinner) Update(msg string) {
// It waits for the spinner to fully terminate before returning.
// It is safe to call Close multiple times and from multiple goroutines.
func (sp *spinner) Close() {
sp.once.Do(func() {
if sp.p != nil {
sp.p.Send(quitMsg{})
}
})
sp.sendQuit()
// Always wait for termination, even if we weren't the first caller
if sp.p != nil {
<-sp.done
Expand All @@ -147,7 +143,7 @@ func (sp *spinner) Close() {
func (c *cmdIO) NewSpinner(ctx context.Context, opts ...SpinnerOption) *spinner {
// Don't show spinner if not interactive
if !c.capabilities.SupportsInteractive() {
return &spinner{p: nil, c: c, ctx: ctx}
return &spinner{p: nil, c: c, ctx: ctx, sendQuit: func() {}}
}

// Create model and program
Expand All @@ -167,10 +163,11 @@ func (c *cmdIO) NewSpinner(ctx context.Context, opts ...SpinnerOption) *spinner

done := make(chan struct{})
sp := &spinner{
p: p,
c: c,
ctx: ctx,
done: done,
p: p,
c: c,
ctx: ctx,
sendQuit: sync.OnceFunc(func() { p.Send(quitMsg{}) }),
done: done,
}

// Start program in background
Expand Down
Loading