diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 56f3381..e03f7b6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -10,7 +10,7 @@ jobs: strategy: fail-fast: false matrix: - php: ["8.4"] + php: ["8.5"] go: [stable] os: ["ubuntu-latest"] steps: diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index 8d33d81..0000000 --- a/CLAUDE.md +++ /dev/null @@ -1,99 +0,0 @@ -# CLAUDE.md - -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. - -## Project Overview - -This is the RoadRunner Pool library - a Go package for managing PHP worker processes in the RoadRunner application server. It provides process management, IPC communication, and worker lifecycle control. - -## Key Commands - -### Testing -```bash -# Run all tests with race detection -make test - -# Run tests with coverage -make test_coverage - -# Run specific package tests -go test -v -race ./pool/static_pool -go test -v -race ./worker -go test -v -race ./ipc/pipe -go test -v -race ./ipc/socket -go test -v -race ./worker_watcher - -# Run fuzz tests (30 seconds) -go test -v -race -fuzz=FuzzStaticPoolEcho -fuzztime=30s -tags=debug ./pool/static_pool -``` - -### Linting -```bash -# Run golangci-lint (with race build tags) -golangci-lint run --timeout=10m --build-tags=race -``` - -### Dependencies -```bash -# Update dependencies -go mod tidy -go mod download -``` - -## Architecture Overview - -### Core Components - -1. **Worker Process (`worker/`)**: Manages individual PHP worker processes - - `Process` struct handles process lifecycle, communication via goridge relay - - Uses FSM (Finite State Machine) for state management - - Implements sync.Pool for efficient memory reuse - -2. **Static Pool (`pool/static_pool/`)**: Fixed-size worker pool implementation - - Manages a pool of workers with configurable size - - Handles task distribution and worker lifecycle - - Supports supervised mode with TTL and memory limits - - Dynamic allocation support for scaling workers based on load - -3. **IPC Layer (`ipc/`)**: Inter-process communication implementations - - `pipe/`: Named pipe communication (Unix sockets on Linux/Mac, named pipes on Windows) - - `socket/`: TCP socket communication - - Both implement the `Factory` interface for worker creation - -4. **Worker Watcher (`worker_watcher/`)**: Monitors and manages worker states - - Tracks worker TTLs, idle time, and memory usage - - Handles worker allocation and deallocation - - Implements container-based worker storage - -5. **FSM (`fsm/`)**: Finite State Machine for worker state tracking - - States: Inactive, Ready, Working, Stopped, Errored, Invalid - - Thread-safe state transitions with atomic operations - -6. **Payload (`payload/`)**: Message payload handling between workers and pool - - Manages request/response serialization - - Handles context propagation - -### Key Interfaces - -- `pool.Pool`: Main pool interface for worker management -- `pool.Factory`: Interface for creating worker connections -- `pool.Command`: Interface for creating worker commands -- `relay.Relay`: Communication interface (goridge) - -### Configuration - -Pool configuration supports: -- Number of workers (`NumWorkers`) -- Max jobs per worker (`MaxJobs`) -- Queue size limits (`MaxQueueSize`) -- Various timeouts (allocate, destroy, reset, stream) -- Supervisor settings (TTL, memory limits, watch intervals) -- Dynamic allocation options (max workers, spawn rate, idle timeout) - -### Testing Approach - -- Uses PHP test workers in `tests/` directory -- Extensive unit tests with race detection -- Fuzz testing for reliability -- Coverage reporting via codecov -- Tests run on multiple OS platforms (Linux, macOS, Windows) \ No newline at end of file diff --git a/fsm/fsm.go b/fsm/fsm.go index ca740a9..d66a5c3 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -9,28 +9,27 @@ import ( // NewFSM returns new FSM implementation based on initial state func NewFSM(initialState int64, log *zap.Logger) *Fsm { - return &Fsm{ - log: log, - currentState: &initialState, - } + f := &Fsm{log: log} + f.currentState.Store(initialState) + return f } // Fsm is general https://en.wikipedia.org/wiki/Finite-state_machine to transition between worker states type Fsm struct { log *zap.Logger - numExecs uint64 + numExecs atomic.Uint64 // to be lightweight, use UnixNano - lastUsed uint64 - currentState *int64 + lastUsed atomic.Uint64 + currentState atomic.Int64 } // CurrentState (see interface) func (s *Fsm) CurrentState() int64 { - return atomic.LoadInt64(s.currentState) + return s.currentState.Load() } func (s *Fsm) Compare(state int64) bool { - return atomic.LoadInt64(s.currentState) == state + return s.currentState.Load() == state } /* @@ -43,12 +42,12 @@ func (s *Fsm) Transition(to int64) { return } - atomic.StoreInt64(s.currentState, to) + s.currentState.Store(to) } // String returns current StateImpl as string. func (s *Fsm) String() string { - switch atomic.LoadInt64(s.currentState) { + switch s.currentState.Load() { case StateInactive: return "inactive" case StateReady: @@ -73,34 +72,34 @@ func (s *Fsm) String() string { return "ttlReached" case StateMaxMemoryReached: return "maxMemoryReached" + default: + return "undefined" } - - return "undefined" } // NumExecs returns number of registered WorkerProcess execs. func (s *Fsm) NumExecs() uint64 { - return atomic.LoadUint64(&s.numExecs) + return s.numExecs.Load() } // IsActive returns true if WorkerProcess not Inactive or Stopped func (s *Fsm) IsActive() bool { - return atomic.LoadInt64(s.currentState) == StateWorking || - atomic.LoadInt64(s.currentState) == StateReady + return s.currentState.Load() == StateWorking || + s.currentState.Load() == StateReady } // RegisterExec register new execution atomically func (s *Fsm) RegisterExec() { - atomic.AddUint64(&s.numExecs, 1) + s.numExecs.Add(1) } // SetLastUsed Update last used time func (s *Fsm) SetLastUsed(lu uint64) { - atomic.StoreUint64(&s.lastUsed, lu) + s.lastUsed.Store(lu) } func (s *Fsm) LastUsed() uint64 { - return atomic.LoadUint64(&s.lastUsed) + return s.lastUsed.Load() } // Acceptors (also called detectors or recognizers) produce binary output, @@ -113,23 +112,24 @@ func (s *Fsm) recognizer(to int64) error { case StateInactive: // from // No-one can transition to Inactive - if atomic.LoadInt64(s.currentState) == StateDestroyed { + if s.currentState.Load() == StateDestroyed { return errors.E(op, errors.Errorf("can't transition from state: %s", s.String())) } // to from StateWorking/StateInactive only case StateReady: // from - switch atomic.LoadInt64(s.currentState) { + switch s.currentState.Load() { case StateWorking, StateInactive: return nil + default: + return errors.E(op, errors.Errorf("can't transition from state: %s", s.String())) } - return errors.E(op, errors.Errorf("can't transition from state: %s", s.String())) // to case StateWorking: // from // StateWorking can be transitioned only from StateReady - if atomic.LoadInt64(s.currentState) == StateReady { + if s.currentState.Load() == StateReady { return nil } @@ -146,7 +146,7 @@ func (s *Fsm) recognizer(to int64) error { StateMaxMemoryReached, StateExecTTLReached: // from - if atomic.LoadInt64(s.currentState) == StateDestroyed { + if s.currentState.Load() == StateDestroyed { return errors.E(op, errors.Errorf("can't transition from state: %s", s.String())) } // to diff --git a/go.mod b/go.mod index 5bbda70..e6d3f79 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/roadrunner-server/pool -go 1.25 +go 1.26 require ( github.com/roadrunner-server/errors v1.4.1 diff --git a/go.sum b/go.sum index 6a1b1a9..a81fe57 100644 --- a/go.sum +++ b/go.sum @@ -28,12 +28,8 @@ github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKl github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4= -github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4= github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA= github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI= -github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso= -github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ= github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw= github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= @@ -42,16 +38,12 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/ipc/pipe/pipe_spawn_test.go b/ipc/pipe/pipe_spawn_test.go index 9508372..6019e3c 100644 --- a/ipc/pipe/pipe_spawn_test.go +++ b/ipc/pipe/pipe_spawn_test.go @@ -1,7 +1,6 @@ package pipe import ( - "context" "os/exec" "sync" "testing" @@ -20,7 +19,7 @@ var log = zap.NewNop() func Test_GetState2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) go func() { assert.NoError(t, w.Wait()) assert.Equal(t, fsm.StateStopped, w.State().CurrentState()) @@ -36,14 +35,12 @@ func Test_GetState2(t *testing.T) { func Test_Kill2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { assert.Error(t, w.Wait()) assert.Equal(t, fsm.StateErrored, w.State().CurrentState()) - }() + }) assert.NoError(t, err) assert.NotNil(t, w) @@ -59,7 +56,7 @@ func Test_Kill2(t *testing.T) { func Test_Pipe_Start2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -77,7 +74,7 @@ func Test_Pipe_StartError2(t *testing.T) { t.Errorf("error running the command: error %v", err) } - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -89,7 +86,7 @@ func Test_Pipe_PipeError3(t *testing.T) { t.Errorf("error creating the STDIN pipe: error %v", err) } - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -101,31 +98,31 @@ func Test_Pipe_PipeError4(t *testing.T) { t.Errorf("error creating the STDIN pipe: error %v", err) } - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) assert.Error(t, err) assert.Nil(t, w) } func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) assert.Nil(t, w) assert.Error(t, err) } func Test_Pipe_Invalid2(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) assert.Error(t, err) assert.Nil(t, w) } func Test_Pipe_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) assert.NoError(t, err) - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -145,11 +142,11 @@ func Test_Pipe_Echo2(t *testing.T) { func Test_Pipe_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) assert.NoError(t, err) require.NotNil(t, w) - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) @@ -162,7 +159,7 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { f := NewPipeFactory(log) for b.Loop() { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorkerWithContext(context.Background(), cmd) + w, _ := f.SpawnWorkerWithContext(b.Context(), cmd) go func() { if w.Wait() != nil { b.Fail() @@ -179,7 +176,7 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithContext(b.Context(), cmd) b.ReportAllocs() @@ -197,7 +194,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -205,7 +202,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(b.Context(), cmd) if err != nil { b.Fatal(err) } @@ -218,7 +215,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -226,7 +223,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(b.Context(), cmd) if err != nil { b.Fatal(err) } @@ -239,7 +236,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -248,7 +245,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { func Test_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) if err != nil { t.Fatal(err) } @@ -263,7 +260,7 @@ func Test_Echo2(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -276,7 +273,7 @@ func Test_Echo2(t *testing.T) { func Test_BadPayload2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) go func() { assert.NoError(t, w.Wait()) @@ -288,7 +285,7 @@ func Test_BadPayload2(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{}) + res, err := w.Exec(t.Context(), &payload.Payload{}) assert.NoError(t, err) assert.NotNil(t, res) } @@ -296,7 +293,7 @@ func Test_BadPayload2(t *testing.T) { func Test_String2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -315,7 +312,7 @@ func Test_String2(t *testing.T) { func Test_Echo_Slow2(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -326,7 +323,7 @@ func Test_Echo_Slow2(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -339,12 +336,12 @@ func Test_Echo_Slow2(t *testing.T) { func Test_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) if err != nil { t.Fatal(err) } - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res) @@ -355,7 +352,7 @@ func Test_Broken2(t *testing.T) { func Test_Error2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -367,7 +364,7 @@ func Test_Error2(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res) @@ -380,7 +377,7 @@ func Test_Error2(t *testing.T) { func Test_NumExecs2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithContext(t.Context(), cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -391,21 +388,21 @@ func Test_NumExecs2(t *testing.T) { } }() - _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + _, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(1), w.State().NumExecs()) w.State().Transition(fsm.StateReady) - _, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + _, err = w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) w.State().Transition(fsm.StateReady) - _, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + _, err = w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/ipc/pipe/pipe_test.go b/ipc/pipe/pipe_test.go index 1119a80..3f2f14f 100644 --- a/ipc/pipe/pipe_test.go +++ b/ipc/pipe/pipe_test.go @@ -17,7 +17,7 @@ import ( func Test_GetState(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -38,17 +38,15 @@ func Test_GetState(t *testing.T) { func Test_Kill(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { assert.Error(t, w.Wait()) assert.Equal(t, fsm.StateErrored, w.State().CurrentState()) - }() + }) assert.NoError(t, err) assert.NotNil(t, w) @@ -63,7 +61,7 @@ func Test_Kill(t *testing.T) { func Test_Pipe_Start(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -85,7 +83,7 @@ func Test_Pipe_StartError(t *testing.T) { t.Errorf("error running the command: error %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) @@ -100,7 +98,7 @@ func Test_Pipe_PipeError(t *testing.T) { t.Errorf("error creating the STDIN pipe: error %v", err) } - ctx := context.Background() + ctx := t.Context() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) @@ -115,7 +113,7 @@ func Test_Pipe_PipeError2(t *testing.T) { t.Errorf("error creating the STDIN pipe: error %v", err) } - ctx := context.Background() + ctx := t.Context() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) @@ -123,7 +121,7 @@ func Test_Pipe_PipeError2(t *testing.T) { func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - ctx := context.Background() + ctx := t.Context() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -134,7 +132,7 @@ func Test_Pipe_Failboot(t *testing.T) { func Test_Pipe_Invalid(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/invalid.php") - ctx := context.Background() + ctx := t.Context() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) @@ -143,7 +141,7 @@ func Test_Pipe_Invalid(t *testing.T) { func Test_Pipe_Echo(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() + ctx := t.Context() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) if err != nil { t.Fatal(err) @@ -156,7 +154,7 @@ func Test_Pipe_Echo(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body) @@ -174,7 +172,7 @@ func Test_Pipe_Echo(t *testing.T) { func Test_Pipe_Broken(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - ctx := context.Background() + ctx := t.Context() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) require.NoError(t, err) require.NotNil(t, w) @@ -184,7 +182,7 @@ func Test_Pipe_Broken(t *testing.T) { require.Error(t, errW) }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) @@ -197,7 +195,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { f := NewPipeFactory(log) for b.Loop() { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorkerWithContext(context.Background(), cmd) + w, _ := f.SpawnWorkerWithContext(b.Context(), cmd) go func() { if w.Wait() != nil { b.Fail() @@ -214,7 +212,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory(log).SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithContext(b.Context(), cmd) b.ReportAllocs() @@ -232,7 +230,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -240,7 +238,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() + ctx := b.Context() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) if err != nil { b.Fatal(err) @@ -254,7 +252,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -262,7 +260,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { func Test_Echo(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -280,7 +278,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -292,7 +290,7 @@ func Test_Echo(t *testing.T) { func Test_BadPayload(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -307,14 +305,14 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{}) + res, err := w.Exec(t.Context(), &payload.Payload{}) assert.NoError(t, err) assert.NotNil(t, res) } func Test_String(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -335,7 +333,7 @@ func Test_String(t *testing.T) { func Test_Echo_Slow(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") w, _ := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -349,7 +347,7 @@ func Test_Echo_Slow(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -360,7 +358,7 @@ func Test_Echo_Slow(t *testing.T) { } func Test_Broken(t *testing.T) { - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -368,7 +366,7 @@ func Test_Broken(t *testing.T) { t.Fatal(err) } - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res) @@ -378,7 +376,7 @@ func Test_Broken(t *testing.T) { func Test_Error(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") w, _ := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -393,7 +391,7 @@ func Test_Error(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res) @@ -405,7 +403,7 @@ func Test_Error(t *testing.T) { func Test_NumExecs(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, _ := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) @@ -419,21 +417,21 @@ func Test_NumExecs(t *testing.T) { } }() - _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + _, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } w.State().Transition(fsm.StateReady) assert.Equal(t, uint64(1), w.State().NumExecs()) - _, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + _, err = w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, uint64(2), w.State().NumExecs()) w.State().Transition(fsm.StateReady) - _, err = w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + _, err = w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } @@ -442,7 +440,7 @@ func Test_NumExecs(t *testing.T) { } func Benchmark_WorkerPipeTTL(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() + ctx := b.Context() log, _ = zap.NewDevelopment() w, err := NewPipeFactory(log).SpawnWorkerWithContext(ctx, cmd) diff --git a/ipc/socket/socket_spawn_test.go b/ipc/socket/socket_spawn_test.go index 803c5b0..1d511f6 100644 --- a/ipc/socket/socket_spawn_test.go +++ b/ipc/socket/socket_spawn_test.go @@ -29,7 +29,7 @@ func Test_Tcp_Start2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -62,7 +62,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { } }() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() w, err := f.SpawnWorkerWithContext(ctx, cmd) @@ -96,7 +96,7 @@ func Test_Tcp_StartError2(t *testing.T) { t.Errorf("error executing the command: error %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -119,7 +119,7 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() w, err2 := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -140,7 +140,7 @@ func Test_Tcp_Invalid2(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*15) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -161,7 +161,7 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*15) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -169,14 +169,12 @@ func Test_Tcp_Broken2(t *testing.T) { t.Fatal(err) } wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { errW := w.Wait() assert.Error(t, errW) - }() + }) - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) wg.Wait() @@ -201,7 +199,7 @@ func Test_Tcp_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(t.Context(), time.Minute) defer cancel() w, _ := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -215,7 +213,7 @@ func Test_Tcp_Echo2(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -238,7 +236,7 @@ func Test_Unix_Start2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(context.Background(), cmd) + w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(t.Context(), cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -265,7 +263,7 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*15) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -286,7 +284,7 @@ func Test_Unix_Timeout2(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + ctx, cancel := context.WithTimeout(t.Context(), time.Millisecond*100) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -309,7 +307,7 @@ func Test_Unix_Invalid2(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*15) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -330,7 +328,7 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*15) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -338,14 +336,12 @@ func Test_Unix_Broken2(t *testing.T) { t.Fatal(err) } wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { errW := w.Wait() assert.Error(t, errW) - }() + }) - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) @@ -369,7 +365,7 @@ func Test_Unix_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(t.Context(), time.Minute) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -386,7 +382,7 @@ func Test_Unix_Echo2(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -411,7 +407,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { for b.Loop() { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := f.SpawnWorkerWithContext(context.Background(), cmd) + w, err := f.SpawnWorkerWithContext(b.Context(), cmd) if err != nil { b.Fatal(err) } @@ -439,7 +435,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(b.Context(), time.Second*15) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -454,7 +450,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -475,7 +471,7 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { for b.Loop() { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := f.SpawnWorkerWithContext(context.Background(), cmd) + w, err := f.SpawnWorkerWithContext(b.Context(), cmd) if err != nil { b.Fatal(err) } @@ -499,7 +495,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(b.Context(), time.Second*15) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -514,7 +510,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/ipc/socket/socket_test.go b/ipc/socket/socket_test.go index 5af839e..b666b5b 100755 --- a/ipc/socket/socket_test.go +++ b/ipc/socket/socket_test.go @@ -15,7 +15,7 @@ import ( ) func Test_Tcp_Start(t *testing.T) { - ctx := context.Background() + ctx := t.Context() time.Sleep(time.Millisecond * 10) // to ensure free socket ls, err := net.Listen("tcp", "127.0.0.1:9007") @@ -48,7 +48,7 @@ func Test_Tcp_Start(t *testing.T) { func Test_Tcp_StartCloseFactory(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() + ctx := t.Context() ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { } else { @@ -80,7 +80,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { func Test_Tcp_StartError(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*15) defer cancel() ls, err := net.Listen("tcp", "127.0.0.1:9007") @@ -110,7 +110,7 @@ func Test_Tcp_StartError(t *testing.T) { func Test_Tcp_Failboot(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*20) defer cancel() ls, err := net.Listen("tcp", "127.0.0.1:9007") @@ -134,7 +134,7 @@ func Test_Tcp_Failboot(t *testing.T) { func Test_Tcp_Timeout(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx, cancel := context.WithTimeout(context.Background(), time.Microsecond) + ctx, cancel := context.WithTimeout(t.Context(), time.Microsecond) defer cancel() ls, err := net.Listen("tcp", "127.0.0.1:9007") @@ -159,7 +159,7 @@ func Test_Tcp_Timeout(t *testing.T) { func Test_Tcp_Invalid(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second) defer cancel() ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { @@ -182,7 +182,7 @@ func Test_Tcp_Invalid(t *testing.T) { func Test_Tcp_Broken(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() + ctx := t.Context() ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { @@ -202,14 +202,12 @@ func Test_Tcp_Broken(t *testing.T) { t.Fatal(err) } wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { errW := w.Wait() assert.Error(t, errW) - }() + }) - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) wg.Wait() @@ -223,7 +221,7 @@ func Test_Tcp_Broken(t *testing.T) { func Test_Tcp_Echo(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() + ctx := t.Context() ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { defer func() { @@ -249,7 +247,7 @@ func Test_Tcp_Echo(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -260,7 +258,7 @@ func Test_Tcp_Echo(t *testing.T) { } func Test_Unix_Start(t *testing.T) { - ctx := context.Background() + ctx := t.Context() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -302,7 +300,7 @@ func Test_Unix_Failboot(t *testing.T) { t.Skip("socket is busy") } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() cmd := exec.Command("php", "../../tests/failboot.php") @@ -326,7 +324,7 @@ func Test_Unix_Timeout(t *testing.T) { } cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + ctx, cancel := context.WithTimeout(t.Context(), time.Millisecond*100) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -349,7 +347,7 @@ func Test_Unix_Invalid(t *testing.T) { } cmd := exec.Command("php", "../../tests/invalid.php") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*10) defer cancel() w, err := NewSocketServer(ls, log).SpawnWorkerWithContext(ctx, cmd) @@ -358,7 +356,7 @@ func Test_Unix_Invalid(t *testing.T) { } func Test_Unix_Broken(t *testing.T) { - ctx := context.Background() + ctx := t.Context() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -377,14 +375,12 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal(err) } wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() + wg.Go(func() { errW := w.Wait() assert.Error(t, errW) - }() + }) - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res) @@ -397,7 +393,7 @@ func Test_Unix_Broken(t *testing.T) { } func Test_Unix_Echo(t *testing.T) { - ctx := context.Background() + ctx := t.Context() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -426,7 +422,7 @@ func Test_Unix_Echo(t *testing.T) { } }() - res, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}) + res, err := w.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -437,7 +433,7 @@ func Test_Unix_Echo(t *testing.T) { } func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { - ctx := context.Background() + ctx := b.Context() ls, err := net.Listen("tcp", "127.0.0.1:9007") if err == nil { defer func() { @@ -470,7 +466,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { } func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { - ctx := context.Background() + ctx := b.Context() ls, err := net.Listen("tcp", "127.0.0.1:9007") if err == nil { defer func() { @@ -497,14 +493,14 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } } func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { - ctx := context.Background() + ctx := b.Context() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -533,7 +529,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { } func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { - ctx := context.Background() + ctx := b.Context() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -560,7 +556,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { }() for b.Loop() { - if _, err := w.Exec(context.Background(), &payload.Payload{Body: []byte("hello")}); err != nil { + if _, err := w.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/pool/allocator.go b/pool/allocator.go index 4ec9b10..f9b6049 100644 --- a/pool/allocator.go +++ b/pool/allocator.go @@ -51,14 +51,13 @@ func AllocateParallel(numWorkers uint64, allocator func() (*worker.Process, erro // constant number of stack simplify logic for i := range numWorkers { - ii := i eg.Go(func() error { w, err := allocator() if err != nil { return errors.E(op, errors.WorkerAllocate, err) } - workers[ii] = w + workers[i] = w return nil }) } @@ -66,13 +65,12 @@ func AllocateParallel(numWorkers uint64, allocator func() (*worker.Process, erro err := eg.Wait() if err != nil { for j := range workers { - jj := j - if workers[jj] != nil { + if workers[j] != nil { go func() { - _ = workers[jj].Wait() + _ = workers[j].Wait() }() - _ = workers[jj].Kill() + _ = workers[j].Kill() } } return nil, err diff --git a/pool/ratelimiter/ratelimiter_test.go b/pool/ratelimiter/ratelimiter_test.go index ab919e8..36cbc3d 100644 --- a/pool/ratelimiter/ratelimiter_test.go +++ b/pool/ratelimiter/ratelimiter_test.go @@ -96,17 +96,15 @@ func TestRateLimiter_RaceCondition(t *testing.T) { var successCount atomic.Int64 var wg sync.WaitGroup - wg.Add(numGoroutines) for range numGoroutines { - go func() { - defer wg.Done() + wg.Go(func() { for range iterations { if rl.TryAcquire() { successCount.Add(1) rl.Release() } } - }() + }) } wg.Wait() diff --git a/pool/static_pool/debug.go b/pool/static_pool/debug.go index 4a423ab..8fe8981 100644 --- a/pool/static_pool/debug.go +++ b/pool/static_pool/debug.go @@ -38,7 +38,7 @@ func (sp *Pool) execDebug(ctx context.Context, p *payload.Payload, stopCh chan s resp <- newPExec(rsp, nil) // in case of stream, we should not return worker immediately - go func() { + go func() { //nolint:gosec // G118 - intentional: per-iteration exec timeout must be independent of request context // would be called on Goexit defer func() { sp.log.Debug("stopping [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) diff --git a/pool/static_pool/dyn_allocator.go b/pool/static_pool/dyn_allocator.go index d344541..276cda0 100644 --- a/pool/static_pool/dyn_allocator.go +++ b/pool/static_pool/dyn_allocator.go @@ -64,7 +64,7 @@ func newDynAllocator( func (da *dynAllocator) addMoreWorkers() { // set the last allocation try time // we need to store this to prevent immediate deallocation in the TTL listener - da.lastAllocTry.Store(p(time.Now().UTC())) + da.lastAllocTry.Store(new(time.Now().UTC())) if !da.rateLimit.TryAcquire() { da.log.Warn("rate limit exceeded for dynamic allocation, skipping") @@ -98,7 +98,7 @@ func (da *dynAllocator) addMoreWorkers() { // we're starting from the 1 because we already allocated one worker which would be released in the Exec function // i < da.spawnRate - we can't allocate more workers than the spawn rate - for i := uint64(0); i < da.spawnRate; i++ { + for range da.spawnRate { // spawn as many workers as the user specified in the spawn rate configuration, but not more than max workers if da.currAllocated.Load() >= da.maxWorkers { break @@ -205,7 +205,3 @@ func (da *dynAllocator) startIdleTTLListener() { } }() } - -func p[T any](v T) *T { - return &v -} diff --git a/pool/static_pool/dyn_allocator_test.go b/pool/static_pool/dyn_allocator_test.go index c07a74e..74fb633 100644 --- a/pool/static_pool/dyn_allocator_test.go +++ b/pool/static_pool/dyn_allocator_test.go @@ -1,7 +1,6 @@ package static_pool import ( - "context" "os/exec" "sync" "testing" @@ -32,9 +31,8 @@ var testDynCfg = &pool.Config{ } func Test_DynAllocator(t *testing.T) { - ctx := context.Background() np, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(dynlog()), testDynCfg, @@ -43,14 +41,13 @@ func Test_DynAllocator(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, np) - r, err := np.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + r, err := np.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, err) resp := <-r assert.Equal(t, []byte("hello"), resp.Body()) assert.NoError(t, err) - - np.Destroy(ctx) + t.Cleanup(func() { np.Destroy(t.Context()) }) } func Test_DynAllocatorManyReq(t *testing.T) { @@ -67,9 +64,8 @@ func Test_DynAllocatorManyReq(t *testing.T) { }, } - ctx := context.Background() np, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "slow_req", "pipes") }, @@ -81,26 +77,24 @@ func Test_DynAllocatorManyReq(t *testing.T) { assert.NotNil(t, np) wg := &sync.WaitGroup{} - wg.Add(1000) go func() { for range 1000 { - go func() { - defer wg.Done() - r, erre := np.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + wg.Go(func() { + r, erre := np.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) if erre != nil { t.Log("failed request: ", erre.Error()) return } resp := <-r assert.Equal(t, []byte("hello"), resp.Body()) - }() + }) } }() go func() { for range 10 { time.Sleep(time.Second) - _ = np.Reset(context.Background()) + _ = np.Reset(t.Context()) } }() @@ -109,8 +103,7 @@ func Test_DynAllocatorManyReq(t *testing.T) { time.Sleep(time.Second * 30) assert.Equal(t, 5, len(np.Workers())) - - np.Destroy(ctx) + t.Cleanup(func() { np.Destroy(t.Context()) }) } func Test_DynamicPool_OverMax(t *testing.T) { @@ -125,9 +118,8 @@ func Test_DynamicPool_OverMax(t *testing.T) { }, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/worker-slow-dyn.php") }, @@ -139,11 +131,10 @@ func Test_DynamicPool_OverMax(t *testing.T) { assert.NotNil(t, p) wg := &sync.WaitGroup{} - wg.Add(2) - go func() { + wg.Go(func() { t.Log("sending request 1") - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, err) select { case resp := <-r: @@ -153,31 +144,27 @@ func Test_DynamicPool_OverMax(t *testing.T) { case <-time.After(time.Second * 10): assert.Fail(t, "timeout") } - - wg.Done() - }() - go func() { + }) + wg.Go(func() { // sleep to ensure the first request is being processed first // this request should trigger dynamic allocation attempt and return an error time.Sleep(time.Second) t.Log("sending request 2") - _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.Error(t, err) - wg.Done() - }() + }) t.Log("waiting for the requests 1 and 2") wg.Wait() t.Log("wait 1 and 2 finished") - wg.Add(2) // request 3 and 4 should be processed normally, since we have 2 workers now (1 initial + 1 dynamic) t.Log("starting requests 3 and 4") require.Len(t, p.Workers(), 2) - go func() { + wg.Go(func() { t.Log("request 3") - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, err) select { case resp := <-r: @@ -187,12 +174,10 @@ func Test_DynamicPool_OverMax(t *testing.T) { case <-time.After(time.Second * 10): assert.Fail(t, "timeout") } - - wg.Done() - }() - go func() { + }) + wg.Go(func() { t.Log("request 4") - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, err) select { case resp := <-r: @@ -202,15 +187,15 @@ func Test_DynamicPool_OverMax(t *testing.T) { case <-time.After(time.Second * 10): assert.Fail(t, "timeout") } - - wg.Done() - }() + }) t.Log("waiting for the requests 3 and 4") wg.Wait() time.Sleep(time.Second * 20) assert.Len(t, p.Workers(), 1) - p.Destroy(ctx) + t.Cleanup(func() { + p.Destroy(t.Context()) + }) } func Test_DynamicPool(t *testing.T) { @@ -225,9 +210,8 @@ func Test_DynamicPool(t *testing.T) { }, } - ctx := context.Background() - p, err := NewPool( - ctx, + p, errp := NewPool( + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/worker-slow-dyn.php") }, @@ -235,15 +219,12 @@ func Test_DynamicPool(t *testing.T) { dynAllCfg, log(), ) - assert.NoError(t, err) + assert.NoError(t, errp) assert.NotNil(t, p) wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - defer wg.Done() - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + wg.Go(func() { + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, err) select { case resp := <-r: @@ -252,21 +233,21 @@ func Test_DynamicPool(t *testing.T) { case <-time.After(time.Second * 10): assert.Fail(t, "timeout") } - }() + }) - go func() { + wg.Go(func() { time.Sleep(time.Second) - _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.Error(t, err) - wg.Done() - }() + }) wg.Wait() time.Sleep(time.Second * 20) require.Len(t, p.Workers(), 1) - - p.Destroy(ctx) + t.Cleanup(func() { + p.Destroy(t.Context()) + }) } func Test_DynamicPool_500W(t *testing.T) { @@ -282,9 +263,8 @@ func Test_DynamicPool_500W(t *testing.T) { }, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/worker-slow-dyn.php") }, @@ -297,10 +277,9 @@ func Test_DynamicPool_500W(t *testing.T) { require.Len(t, p.Workers(), 1) wg := &sync.WaitGroup{} - wg.Add(2) - go func() { - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + wg.Go(func() { + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) select { case resp := <-r: @@ -309,21 +288,157 @@ func Test_DynamicPool_500W(t *testing.T) { case <-time.After(time.Second * 10): assert.Fail(t, "timeout") } + }) - wg.Done() - }() - - go func() { + wg.Go(func() { time.Sleep(time.Second * 1) - _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.Error(t, err) - wg.Done() - }() + }) wg.Wait() time.Sleep(time.Second * 30) require.Len(t, p.Workers(), 1) - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) +} + +// Test_DynAllocator_100Workers verifies that the dynamic allocator can scale up to 100 dynamic workers +// in batches of 20 (spawnRate) and then properly deallocate them all after the idle timeout. +func Test_DynAllocator_100Workers(t *testing.T) { + cfg := &pool.Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second * 30, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 100, + SpawnRate: 20, + IdleTimeout: time.Second * 3, + }, + } + + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/client.php", "delay", "pipes") + }, + pipe.NewPipeFactory(dynlog()), + cfg, + dynlog(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + require.Len(t, p.Workers(), 5) + + wg := &sync.WaitGroup{} + // Fire requests in waves to sustain pressure across rate limiter windows. + // Each wave saturates current workers; failed requests trigger addMoreWorkers (20 per call, rate-limited to 1/sec). + // Over 8 waves ≈ 8 seconds: up to 5 batches * 20 = 100 dynamic workers. + for wave := range 8 { + for range 60 { + wg.Go(func() { + // 5-second delay keeps workers busy long enough to create sustained pressure + r, erre := p.Exec(t.Context(), &payload.Payload{Body: []byte("5000"), Context: nil}, make(chan struct{})) + if erre != nil { + return + } + <-r + }) + } + if wave < 7 { + time.Sleep(time.Second) + } + } + + wg.Wait() + + totalAfterLoad := len(p.Workers()) + dynAfterLoad := p.NumDynamic() + t.Log("workers after load:", totalAfterLoad, "dynamic:", dynAfterLoad) + assert.Greater(t, dynAfterLoad, uint64(0), "dynamic allocation should have occurred") + + // Wait for idle timeout (3s) + deallocation cycles. + // With 100 dynamic workers and SpawnRate=20: 5 deallocation batches, each triggered at IdleTimeout interval. + // 5 batches * 3s = 15s + extra buffer. + time.Sleep(time.Second * 30) + + assert.Equal(t, uint64(0), p.NumDynamic(), "all dynamic workers should be deallocated") + assert.Len(t, p.Workers(), 5, "should return to base worker count") +} + +// Test_DynAllocator_ReallocationCycle verifies that after all dynamic workers are deallocated +// (idle TTL listener exits, started=false), a new pressure spike correctly restarts the listener +// and allocates workers again. This tests the full listener lifecycle: start → deallocate → stop → restart. +func Test_DynAllocator_ReallocationCycle(t *testing.T) { + cfg := &pool.Config{ + NumWorkers: 2, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second * 20, + DynamicAllocatorOpts: &pool.DynamicAllocationOpts{ + MaxWorkers: 10, + SpawnRate: 5, + IdleTimeout: time.Second * 2, + }, + } + + p, err := NewPool( + t.Context(), + func(cmd []string) *exec.Cmd { + return exec.Command("php", "../../tests/client.php", "delay", "pipes") + }, + pipe.NewPipeFactory(dynlog()), + cfg, + dynlog(), + ) + require.NoError(t, err) + require.NotNil(t, p) + t.Cleanup(func() { p.Destroy(t.Context()) }) + require.Len(t, p.Workers(), 2) + + // === Cycle 1: Trigger allocation, then wait for full deallocation === + wg := &sync.WaitGroup{} + for range 30 { + wg.Go(func() { + r, erre := p.Exec(t.Context(), &payload.Payload{Body: []byte("3000"), Context: nil}, make(chan struct{})) + if erre != nil { + return + } + <-r + }) + } + wg.Wait() + + dyn1 := p.NumDynamic() + t.Log("cycle 1 - dynamic workers allocated:", dyn1) + assert.Greater(t, dyn1, uint64(0), "cycle 1: dynamic allocation should have occurred") + + // Wait for full deallocation: idle timeout (2s) + deallocation batches (2 batches * 2s = 4s) + buffer + time.Sleep(time.Second * 10) + + assert.Equal(t, uint64(0), p.NumDynamic(), "cycle 1: all dynamic workers should be deallocated") + assert.Len(t, p.Workers(), 2, "cycle 1: should return to base count") + + // === Cycle 2: Re-trigger allocation (listener must restart from started=false) === + for range 30 { + wg.Go(func() { + r, erre := p.Exec(t.Context(), &payload.Payload{Body: []byte("3000"), Context: nil}, make(chan struct{})) + if erre != nil { + return + } + <-r + }) + } + wg.Wait() + + dyn2 := p.NumDynamic() + t.Log("cycle 2 - dynamic workers allocated:", dyn2) + assert.Greater(t, dyn2, uint64(0), "cycle 2: dynamic allocation should have occurred (listener restarted)") + + // Wait for full deallocation again + time.Sleep(time.Second * 10) + + assert.Equal(t, uint64(0), p.NumDynamic(), "cycle 2: all dynamic workers should be deallocated") + assert.Len(t, p.Workers(), 2, "cycle 2: should return to base count after re-allocation") } diff --git a/pool/static_pool/fuzz_test.go b/pool/static_pool/fuzz_test.go index 408efe3..5fdd0cd 100644 --- a/pool/static_pool/fuzz_test.go +++ b/pool/static_pool/fuzz_test.go @@ -1,7 +1,6 @@ package static_pool import ( - "context" "os/exec" "testing" @@ -13,9 +12,8 @@ import ( func FuzzStaticPoolEcho(f *testing.F) { f.Add([]byte("hello")) - ctx := context.Background() p, err := NewPool( - ctx, + f.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -31,7 +29,7 @@ func FuzzStaticPoolEcho(f *testing.F) { data = []byte("1") } - respCh, err := p.Exec(ctx, &payload.Payload{Body: data}, sc) + respCh, err := p.Exec(t.Context(), &payload.Payload{Body: data}, sc) assert.NoError(t, err) res := <-respCh assert.NotNil(t, res) @@ -40,6 +38,5 @@ func FuzzStaticPoolEcho(f *testing.F) { assert.Equal(t, data, res.Body()) }) - - p.Destroy(ctx) + f.Cleanup(func() { p.Destroy(f.Context()) }) } diff --git a/pool/static_pool/options.go b/pool/static_pool/options.go index f715982..42a9a8a 100644 --- a/pool/static_pool/options.go +++ b/pool/static_pool/options.go @@ -14,7 +14,7 @@ func WithLogger(z *zap.Logger) Options { func WithQueueSize(l uint64) Options { return func(p *Pool) { - p.maxQueueSize = l + p.maxQueueSize.Store(l) } } diff --git a/pool/static_pool/pool.go b/pool/static_pool/pool.go index be47a16..e5a894f 100644 --- a/pool/static_pool/pool.go +++ b/pool/static_pool/pool.go @@ -40,8 +40,8 @@ type Pool struct { // allocate new worker allocator func() (*worker.Process, error) // exec queue size - queue uint64 - maxQueueSize uint64 + queue atomic.Uint64 + maxQueueSize atomic.Uint64 // used in the supervised mode supervisedExec bool stopCh chan struct{} @@ -77,7 +77,6 @@ func NewPool(ctx context.Context, cmd pool.Command, factory pool.Factory, cfg *p cmd: cmd, factory: factory, log: log, - queue: 0, stopCh: make(chan struct{}), } @@ -170,7 +169,7 @@ func (sp *Pool) Exec(ctx context.Context, p *payload.Payload, stopCh chan struct } // check if we have space to put the request - if atomic.LoadUint64(&sp.maxQueueSize) != 0 && atomic.LoadUint64(&sp.queue) >= atomic.LoadUint64(&sp.maxQueueSize) { + if sp.maxQueueSize.Load() != 0 && sp.queue.Load() >= sp.maxQueueSize.Load() { return nil, errors.E(op, errors.QueueSize, errors.Str("max queue size reached")) } @@ -188,8 +187,8 @@ func (sp *Pool) Exec(ctx context.Context, p *payload.Payload, stopCh chan struct /* register a request in the QUEUE */ - atomic.AddUint64(&sp.queue, 1) - defer atomic.AddUint64(&sp.queue, ^uint64(0)) + sp.queue.Add(1) + defer sp.queue.Add(^uint64(0)) // see notes at the end of the file begin: @@ -279,7 +278,7 @@ begin: resp <- newPExec(rsp, nil) // in case of stream, we should not return worker back immediately - go func() { + go func() { //nolint:gosec // G118 - intentional: per-iteration exec timeout must be independent of request context // would be called on Goexit defer func() { sp.log.Debug("release [stream] worker", zap.Int("pid", int(w.Pid())), zap.String("state", w.State().String())) @@ -373,7 +372,7 @@ begin: } func (sp *Pool) QueueSize() uint64 { - return atomic.LoadUint64(&sp.queue) + return sp.queue.Load() } func (sp *Pool) NumDynamic() uint64 { @@ -394,7 +393,7 @@ func (sp *Pool) Destroy(ctx context.Context) { defer cancel() } sp.ww.Destroy(ctx) - atomic.StoreUint64(&sp.queue, 0) + sp.queue.Store(0) close(sp.stopCh) } diff --git a/pool/static_pool/pool_test.go b/pool/static_pool/pool_test.go index 477115a..6142392 100644 --- a/pool/static_pool/pool_test.go +++ b/pool/static_pool/pool_test.go @@ -34,9 +34,8 @@ var log = func() *zap.Logger { } func Test_NewPool(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -45,14 +44,12 @@ func Test_NewPool(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, err) resp := <-r - assert.Equal(t, []byte("hello"), resp.Body()) - assert.NoError(t, err) - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_MaxWorkers(t *testing.T) { @@ -62,9 +59,8 @@ func Test_MaxWorkers(t *testing.T) { DestroyTimeout: time.Second, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/worker-slow-dyn.php") }, @@ -83,9 +79,8 @@ func Test_NewPoolAddRemoveWorkers(t *testing.T) { DestroyTimeout: time.Second * 500, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg2, @@ -94,11 +89,11 @@ func Test_NewPoolAddRemoveWorkers(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - resp := <-r + r, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + assert.NoError(t, err) + resp := <-r assert.Equal(t, []byte("hello"), resp.Body()) - assert.NoError(t, err) for range 100 { err = p.AddWorker() @@ -108,19 +103,17 @@ func Test_NewPoolAddRemoveWorkers(t *testing.T) { err = p.AddWorker() assert.NoError(t, err) - err = p.RemoveWorker(ctx) + err = p.RemoveWorker(t.Context()) assert.NoError(t, err) - err = p.RemoveWorker(ctx) + err = p.RemoveWorker(t.Context()) assert.NoError(t, err) - - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_StaticPool_NilFactory(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, nil, testCfg, @@ -131,9 +124,8 @@ func Test_StaticPool_NilFactory(t *testing.T) { } func Test_StaticPool_NilConfig(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), nil, @@ -144,10 +136,8 @@ func Test_StaticPool_NilConfig(t *testing.T) { } func Test_StaticPool_ImmediateDestroy(t *testing.T) { - ctx := context.Background() - p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -155,18 +145,15 @@ func Test_StaticPool_ImmediateDestroy(t *testing.T) { ) assert.NoError(t, err) assert.NotNil(t, p) + _, _ = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - _, _ = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - - ctx, cancel := context.WithTimeout(ctx, time.Nanosecond) - defer cancel() - - p.Destroy(ctx) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_RemoveWorker(t *testing.T) { - ctx := context.Background() - testCfg2 := &pool.Config{ NumWorkers: 5, AllocateTimeout: time.Second * 5, @@ -174,7 +161,7 @@ func Test_StaticPool_RemoveWorker(t *testing.T) { } p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg2, @@ -183,28 +170,30 @@ func Test_StaticPool_RemoveWorker(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) wrks := p.Workers() for range wrks { - assert.NoError(t, p.RemoveWorker(ctx)) + assert.NoError(t, p.RemoveWorker(t.Context())) } // 1 worker should be in the pool - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) err = p.AddWorker() assert.NoError(t, err) - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) // after removing all workers, we should have 1 worker + 1 we added assert.Len(t, p.Workers(), 2) - - p.Destroy(ctx) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_Pool_Reallocate(t *testing.T) { @@ -214,9 +203,8 @@ func Test_Pool_Reallocate(t *testing.T) { DestroyTimeout: time.Second * 500, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg2, @@ -226,18 +214,15 @@ func Test_Pool_Reallocate(t *testing.T) { require.NotNil(t, p) wg := sync.WaitGroup{} - wg.Add(1) - require.NoError(t, os.Rename("../../tests/client.php", "../../tests/client.bak")) - go func() { + wg.Go(func() { for range 50 { time.Sleep(time.Millisecond * 100) - _, errResp := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, errResp := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, errResp) } - wg.Done() - }() + }) _ = p.Workers()[0].Kill() @@ -247,13 +232,12 @@ func Test_Pool_Reallocate(t *testing.T) { wg.Wait() t.Cleanup(func() { - p.Destroy(ctx) + p.Destroy(t.Context()) + p = nil }) } func Test_NewPoolReset(t *testing.T) { - ctx := context.Background() - testCfg2 := &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 500, @@ -261,7 +245,7 @@ func Test_NewPoolReset(t *testing.T) { } p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg2, @@ -276,28 +260,24 @@ func Test_NewPoolReset(t *testing.T) { } pid := w[0].Pid() - pldd, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + pldd, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, err) pld := <-pldd require.NotNil(t, pld.Body()) wg := &sync.WaitGroup{} - wg.Add(1) - go func() { + wg.Go(func() { for range 100 { time.Sleep(time.Millisecond * 10) - pldG, errG := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + pldG, errG := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, errG) pldGG := <-pldG require.NotNil(t, pldGG.Body()) } + }) - wg.Done() - }() - - require.NoError(t, p.Reset(context.Background())) - - pldd, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + require.NoError(t, p.Reset(t.Context())) + pldd, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) require.NoError(t, err) pld = <-pldd require.NotNil(t, pld.Body()) @@ -309,12 +289,15 @@ func Test_NewPoolReset(t *testing.T) { require.NotEqual(t, pid, w2[0].Pid()) wg.Wait() - p.Destroy(ctx) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_Invalid(t *testing.T) { p, err := NewPool( - context.Background(), + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, pipe.NewPipeFactory(log()), testCfg, @@ -327,7 +310,7 @@ func Test_StaticPool_Invalid(t *testing.T) { func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := NewPool( - context.Background(), + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -339,7 +322,10 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) - p.Destroy(context.Background()) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_QueueSizeLimit(t *testing.T) { @@ -348,7 +334,7 @@ func Test_StaticPool_QueueSizeLimit(t *testing.T) { AllocateTimeout: time.Second * 500, DestroyTimeout: time.Second * 500, } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*500) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*500) defer cancel() p, err := NewPool( @@ -361,41 +347,40 @@ func Test_StaticPool_QueueSizeLimit(t *testing.T) { WithQueueSize(1), ) require.NoError(t, err) - defer p.Destroy(ctx) assert.NotNil(t, p) wg := &sync.WaitGroup{} - wg.Add(2) - go func() { + wg.Go(func() { time.Sleep(time.Second * 2) _, err1 := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) require.Error(t, err1) - wg.Done() - }() - go func() { + }) + wg.Go(func() { time.Sleep(time.Second * 2) _, err2 := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) require.Error(t, err2) - wg.Done() - }() + }) re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + assert.NoError(t, err) res := <-re - assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body()) assert.Empty(t, res.Context()) assert.Equal(t, "hello world", res.Payload().String()) wg.Wait() + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_Echo(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -403,26 +388,26 @@ func Test_StaticPool_Echo(t *testing.T) { ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) - + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + assert.NoError(t, err) res := <-re - assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body()) assert.Empty(t, res.Context()) assert.Equal(t, "hello", res.Payload().String()) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_Echo_NilContext(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -430,11 +415,9 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) res := <-re @@ -444,12 +427,15 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.Empty(t, res.Context()) assert.Equal(t, "hello", res.Payload().String()) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_Echo_Context(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -457,13 +443,10 @@ func Test_StaticPool_Echo_Context(t *testing.T) { ) assert.NoError(t, err) - defer p.Destroy(ctx) - assert.NotNil(t, p) - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: []byte("world")}, make(chan struct{})) + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: []byte("world")}, make(chan struct{})) assert.NoError(t, err) - res := <-re assert.NotNil(t, res) @@ -471,12 +454,15 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, res.Context()) assert.Equal(t, "world", string(res.Context())) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_JobError(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -487,7 +473,7 @@ func Test_StaticPool_JobError(t *testing.T) { time.Sleep(time.Second * 2) - res, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + res, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.Error(t, err) assert.Nil(t, res) @@ -496,17 +482,18 @@ func Test_StaticPool_JobError(t *testing.T) { } assert.Contains(t, err.Error(), "hello") - p.Destroy(ctx) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_Broken_Replace(t *testing.T) { - ctx := context.Background() - z, err := zap.NewProduction() require.NoError(t, err) p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -516,16 +503,16 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) - res, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + res, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.Error(t, err) assert.Nil(t, res) - - p.Destroy(ctx) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_Broken_FromOutside(t *testing.T) { - ctx := context.Background() - cfg2 := &pool.Config{ NumWorkers: 1, AllocateTimeout: time.Second * 5, @@ -533,7 +520,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), cfg2, @@ -543,7 +530,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.NoError(t, err) res := <-re @@ -569,12 +556,16 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { for _, w := range list { assert.Equal(t, fsm.StateReady, w.State().CurrentState()) } - p.Destroy(context.Background()) + + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewPool( - context.Background(), + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -592,9 +583,8 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { } func Test_StaticPool_Replace_Worker(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -614,13 +604,14 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + require.NoError(t, err) res := <-re + require.Equal(t, lastPID, string(res.Body())) - require.NoError(t, err) for range 10 { - re, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + re, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) require.NoError(t, err) res = <-re @@ -633,13 +624,15 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { lastPID = string(res.Body()) } - p.Destroy(context.Background()) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_DebugAddRemove(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -657,7 +650,7 @@ func Test_StaticPool_DebugAddRemove(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.NoError(t, err) res := <-re @@ -671,18 +664,20 @@ func Test_StaticPool_DebugAddRemove(t *testing.T) { assert.Len(t, p.Workers(), 0) - ctxT, cancel := context.WithTimeout(ctx, time.Microsecond) + ctxT, cancel := context.WithTimeout(t.Context(), time.Microsecond) err = p.RemoveWorker(ctxT) - cancel() assert.NoError(t, err) + cancel() - p.Destroy(context.Background()) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } func Test_StaticPool_Debug_Worker(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -700,7 +695,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.Len(t, p.Workers(), 0) var lastPID string - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.NoError(t, err) res := <-re @@ -711,11 +706,10 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { for range 10 { assert.Len(t, p.Workers(), 0) - re, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) - + re, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + assert.NoError(t, err) res = <-re - assert.NoError(t, err) assert.NotNil(t, res) assert.NotNil(t, res.Body()) assert.Empty(t, res.Context()) @@ -724,14 +718,16 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { lastPID = string(res.Body()) } - p.Destroy(context.Background()) + t.Cleanup(func() { + p.Destroy(t.Context()) + p = nil + }) } // identical to replace but controlled on worker side func Test_Static_Pool_Destroy_And_Close(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -745,16 +741,18 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) - p.Destroy(ctx) - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("100")}, make(chan struct{})) - assert.Error(t, err) + t.Cleanup(func() { + p.Destroy(t.Context()) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("100")}, make(chan struct{})) + assert.Error(t, err) + p = nil + }) } // identical to replace but controlled on worker side func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -769,23 +767,24 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, errP := p.Exec(ctx, &payload.Payload{Body: []byte("100")}, make(chan struct{})) - if errP != nil { - t.Errorf("error executing payload: error %v", err) - } + _, errP := p.Exec(t.Context(), &payload.Payload{Body: []byte("100")}, make(chan struct{})) + assert.NoError(t, errP) }() - time.Sleep(time.Millisecond * 100) - p.Destroy(ctx) - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("100")}, make(chan struct{})) - assert.Error(t, err) + time.Sleep(time.Second * 2) + + t.Cleanup(func() { + p.Destroy(t.Context()) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("100")}, make(chan struct{})) + assert.Error(t, err) + p = nil + }) } // identical to replace but controlled on worker side func Test_Static_Pool_Handle_Dead(t *testing.T) { - ctx := context.Background() p, err := NewPool( - context.Background(), + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, @@ -801,19 +800,20 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) + for i := range p.Workers() { p.Workers()[i].State().Transition(fsm.StateErrored) } - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.NoError(t, err) - p.Destroy(ctx) + p.Destroy(t.Context()) } // identical to replace but controlled on worker side func Test_Static_Pool_Slow_Destroy(t *testing.T) { p, err := NewPool( - context.Background(), + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, @@ -828,15 +828,12 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - - p.Destroy(context.Background()) + p.Destroy(t.Context()) } func Test_StaticPool_ResetTimeout(t *testing.T) { - ctx := context.Background() - p, err := NewPool( - ctx, + t.Context(), // sleep for the 3 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(log()), @@ -853,24 +850,20 @@ func Test_StaticPool_ResetTimeout(t *testing.T) { assert.NotNil(t, p) go func() { - _, _ = p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + _, _ = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) }() time.Sleep(time.Second) - err = p.Reset(ctx) + err = p.Reset(t.Context()) assert.NoError(t, err) - t.Cleanup(func() { - p.Destroy(ctx) - }) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_StaticPool_NoFreeWorkers(t *testing.T) { - ctx := context.Background() - p, err := NewPool( - ctx, + t.Context(), // sleep for the 3 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(log()), @@ -887,24 +880,22 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.NotNil(t, p) go func() { - _, _ = p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + _, _ = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) }() time.Sleep(time.Second) - res, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + res, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.Error(t, err) assert.Nil(t, res) time.Sleep(time.Second) - - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) } // identical to replace but controlled on worker side func Test_StaticPool_Stop_Worker(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -917,13 +908,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - defer p.Destroy(ctx) + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + p.Destroy(ctx) + }) time.Sleep(time.Second) var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.NoError(t, err) res := <-re @@ -931,7 +926,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { assert.Equal(t, lastPID, string(res.Body())) for range 10 { - re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + re, err := p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) res := <-re @@ -946,10 +941,8 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { } func Test_StaticPool_QueueSize(t *testing.T) { - ctx := context.Background() - p, err := NewPool( - ctx, + t.Context(), // sleep for the 3 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep_short.php", "pipes") }, pipe.NewPipeFactory(log()), @@ -967,7 +960,7 @@ func Test_StaticPool_QueueSize(t *testing.T) { for range 10 { go func() { - _, _ = p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + _, _ = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) }() } @@ -975,14 +968,13 @@ func Test_StaticPool_QueueSize(t *testing.T) { require.LessOrEqual(t, p.QueueSize(), uint64(10)) time.Sleep(time.Second * 20) require.Less(t, p.QueueSize(), uint64(10)) - - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) } // identical to replace but controlled on worker side func Test_Static_Pool_WrongCommand1(t *testing.T) { p, err := NewPool( - context.Background(), + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, @@ -1002,7 +994,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_WrongCommand2(t *testing.T) { p, err := NewPool( - context.Background(), + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -1018,9 +1010,8 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { } func Test_CRC_WithPayload(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/crc_error.php") }, pipe.NewPipeFactory(log()), testCfg, @@ -1053,9 +1044,8 @@ Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allo Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op */ func Benchmark_Pool_Echo(b *testing.B) { - ctx := context.Background() p, err := NewPool( - ctx, + b.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), testCfg, @@ -1076,7 +1066,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ReportAllocs() sc := make(chan struct{}) for b.Loop() { - _, err = p.Exec(ctx, pld, sc) + _, err = p.Exec(b.Context(), pld, sc) assert.NoError(b, err) } } @@ -1085,9 +1075,8 @@ func Benchmark_Pool_Echo(b *testing.B) { // PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op // PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op func Benchmark_Pool_Echo_Batched(b *testing.B) { - ctx := context.Background() p, err := NewPool( - ctx, + b.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -1098,7 +1087,11 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { log(), ) assert.NoError(b, err) - defer p.Destroy(ctx) + b.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + p.Destroy(ctx) + }) bd := make([]byte, 1024) c := make([]byte, 1024) @@ -1113,14 +1106,12 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { var wg sync.WaitGroup sc := make(chan struct{}) for b.Loop() { - wg.Add(1) - go func() { - defer wg.Done() - if _, err := p.Exec(ctx, pld, sc); err != nil { + wg.Go(func() { + if _, err := p.Exec(b.Context(), pld, sc); err != nil { b.Fail() l.Println(err) } - }() + }) } wg.Wait() @@ -1128,9 +1119,8 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { // Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op func Benchmark_Pool_Echo_Replaced(b *testing.B) { - ctx := context.Background() p, err := NewPool( - ctx, + b.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -1142,13 +1132,17 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { log(), ) assert.NoError(b, err) - defer p.Destroy(ctx) + b.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + p.Destroy(ctx) + }) b.ReportAllocs() sc := make(chan struct{}) for b.Loop() { - if _, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, sc); err != nil { + if _, err := p.Exec(b.Context(), &payload.Payload{Body: []byte("hello")}, sc); err != nil { b.Fail() l.Println(err) } diff --git a/pool/static_pool/supervisor_test.go b/pool/static_pool/supervisor_test.go index 865917d..de77ef4 100644 --- a/pool/static_pool/supervisor_test.go +++ b/pool/static_pool/supervisor_test.go @@ -29,9 +29,8 @@ var cfgSupervised = &pool.Config{ } func Test_SupervisedPool_Exec(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(log()), cfgSupervised, @@ -47,7 +46,7 @@ func Test_SupervisedPool_Exec(t *testing.T) { for range 10 { time.Sleep(time.Second) - _, err = p.Exec(ctx, &payload.Payload{ + _, err = p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -56,16 +55,12 @@ func Test_SupervisedPool_Exec(t *testing.T) { time.Sleep(time.Second) require.NotEqual(t, pidBefore, p.Workers()[0].Pid()) - - ctxNew, cancel := context.WithTimeout(ctx, time.Second) - p.Destroy(ctxNew) - cancel() + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(log()), cfgSupervised, @@ -81,7 +76,7 @@ func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) { for range 10 { time.Sleep(time.Second) - _, err = p.Exec(ctx, &payload.Payload{ + _, err = p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -90,17 +85,12 @@ func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) { time.Sleep(time.Second) require.NotEqual(t, pidBefore, p.Workers()[0].Pid()) - - ctxNew, cancel := context.WithTimeout(ctx, time.Second) - p.Destroy(ctxNew) - cancel() + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_SupervisedPool_ImmediateDestroy(t *testing.T) { - ctx := context.Background() - p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -119,18 +109,17 @@ func Test_SupervisedPool_ImmediateDestroy(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - _, _ = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, _ = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) - ctx, cancel := context.WithTimeout(ctx, time.Nanosecond) + ctx, cancel := context.WithTimeout(t.Context(), time.Nanosecond) defer cancel() p.Destroy(ctx) } func Test_SupervisedPool_NilFactory(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), nil, @@ -141,9 +130,8 @@ func Test_SupervisedPool_NilFactory(t *testing.T) { } func Test_SupervisedPool_NilConfig(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, nil, cfgSupervised, @@ -154,10 +142,8 @@ func Test_SupervisedPool_NilConfig(t *testing.T) { } func Test_SupervisedPool_RemoveNoWorkers(t *testing.T) { - ctx := context.Background() - p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), cfgSupervised, @@ -166,23 +152,21 @@ func Test_SupervisedPool_RemoveNoWorkers(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) wrks := p.Workers() for range wrks { - assert.NoError(t, p.RemoveWorker(ctx)) + assert.NoError(t, p.RemoveWorker(t.Context())) } assert.Len(t, p.Workers(), 1) - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_SupervisedPool_RemoveWorker(t *testing.T) { - ctx := context.Background() - p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), cfgSupervised, @@ -191,33 +175,31 @@ func Test_SupervisedPool_RemoveWorker(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) wrks := p.Workers() for range wrks { - assert.NoError(t, p.RemoveWorker(ctx)) + assert.NoError(t, p.RemoveWorker(t.Context())) } // should not be error, 1 worker should be in the pool - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) err = p.AddWorker() assert.NoError(t, err) - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{})) assert.NoError(t, err) assert.Len(t, p.Workers(), 2) - - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_SupervisedPoolReset(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(log()), cfgSupervised, @@ -232,7 +214,7 @@ func Test_SupervisedPoolReset(t *testing.T) { } pid := w[0].Pid() - require.NoError(t, p.Reset(context.Background())) + require.NoError(t, p.Reset(t.Context())) w2 := p.Workers() if len(w2) == 0 { @@ -240,14 +222,13 @@ func Test_SupervisedPoolReset(t *testing.T) { } require.NotEqual(t, pid, w2[0].Pid()) - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) } // This test should finish without freezes func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/supervised.php") }, pipe.NewPipeFactory(log()), &pool.Config{ @@ -273,14 +254,13 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { for range 10 { time.Sleep(time.Second) - _, err = p.Exec(ctx, &payload.Payload{ + _, err = p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) assert.NoError(t, err) } - - p.Destroy(context.Background()) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { @@ -296,9 +276,8 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { MaxWorkerMemory: 100, }, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(log()), cfgExecTTL, @@ -307,11 +286,10 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - defer p.Destroy(context.Background()) pid := p.Workers()[0].Pid() - _, err = p.Exec(ctx, &payload.Payload{ + _, err = p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -320,6 +298,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { time.Sleep(time.Second * 1) // should be new worker with new pid assert.NotEqual(t, pid, p.Workers()[0].Pid()) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func TestSupervisedPool_TTL_WorkerRestarted(t *testing.T) { @@ -330,9 +309,8 @@ func TestSupervisedPool_TTL_WorkerRestarted(t *testing.T) { TTL: 5 * time.Second, }, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") }, pipe.NewPipeFactory(log()), cfgExecTTL, @@ -344,7 +322,7 @@ func TestSupervisedPool_TTL_WorkerRestarted(t *testing.T) { pid := p.Workers()[0].Pid() - respCh, err := p.Exec(ctx, &payload.Payload{ + respCh, err := p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -360,7 +338,7 @@ func TestSupervisedPool_TTL_WorkerRestarted(t *testing.T) { require.Equal(t, p.Workers()[0].State().CurrentState(), fsm.StateReady) pid = p.Workers()[0].Pid() - respCh, err = p.Exec(ctx, &payload.Payload{ + respCh, err = p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -375,8 +353,7 @@ func TestSupervisedPool_TTL_WorkerRestarted(t *testing.T) { // should be new worker with new pid assert.NotEqual(t, pid, p.Workers()[0].Pid()) require.Equal(t, p.Workers()[0].State().CurrentState(), fsm.StateReady) - - p.Destroy(context.Background()) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func TestSupervisedPool_Idle(t *testing.T) { @@ -392,9 +369,8 @@ func TestSupervisedPool_Idle(t *testing.T) { MaxWorkerMemory: 100, }, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") }, pipe.NewPipeFactory(log()), cfgExecTTL, @@ -406,7 +382,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - respCh, err := p.Exec(ctx, &payload.Payload{ + respCh, err := p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -420,7 +396,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.Exec(ctx, &payload.Payload{ + _, err = p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -429,7 +405,7 @@ func TestSupervisedPool_Idle(t *testing.T) { require.Len(t, p.Workers(), 1) // should be new worker with new pid assert.NotEqual(t, pid, p.Workers()[0].Pid()) - p.Destroy(context.Background()) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { @@ -443,9 +419,8 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { MaxWorkerMemory: 100, }, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, pipe.NewPipeFactory(log()), cfgExecTTL, @@ -457,8 +432,8 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { pid := p.Workers()[0].Pid() - time.Sleep(time.Millisecond * 100) - respCh, err := p.Exec(ctx, &payload.Payload{ + time.Sleep(time.Second) + respCh, err := p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -479,7 +454,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { // should be destroyed, state should be Ready, not Invalid assert.NotEqual(t, pid, p.Workers()[0].Pid()) assert.Equal(t, fsm.StateReady, p.Workers()[0].State().CurrentState()) - p.Destroy(context.Background()) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func TestSupervisedPool_ExecTTL_OK(t *testing.T) { @@ -495,9 +470,8 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { MaxWorkerMemory: 100, }, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, pipe.NewPipeFactory(log()), cfgExecTTL, @@ -506,12 +480,11 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - defer p.Destroy(context.Background()) pid := p.Workers()[0].Pid() time.Sleep(time.Millisecond * 100) - respCh, err := p.Exec(ctx, &payload.Payload{ + respCh, err := p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -525,6 +498,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { time.Sleep(time.Second * 1) // should be the same pid assert.Equal(t, pid, p.Workers()[0].Pid()) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func TestSupervisedPool_ShouldRespond(t *testing.T) { @@ -541,9 +515,8 @@ func TestSupervisedPool_ShouldRespond(t *testing.T) { // constructed // max memory // constructed - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/should-not-be-killed.php", "pipes") }, @@ -555,7 +528,7 @@ func TestSupervisedPool_ShouldRespond(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - respCh, err := p.Exec(ctx, &payload.Payload{ + respCh, err := p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -567,7 +540,7 @@ func TestSupervisedPool_ShouldRespond(t *testing.T) { assert.Empty(t, resp.Context()) time.Sleep(time.Second) - p.Destroy(context.Background()) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func TestSupervisedPool_MaxMemoryReached(t *testing.T) { @@ -587,9 +560,8 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { // constructed // max memory // constructed - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(log()), cfgExecTTL, @@ -599,7 +571,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - respCh, err := p.Exec(ctx, &payload.Payload{ + respCh, err := p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -611,28 +583,27 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.Empty(t, resp.Context()) time.Sleep(time.Second) - p.Destroy(context.Background()) + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_SupervisedPool_FastCancel(t *testing.T) { - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php") }, pipe.NewPipeFactory(log()), cfgSupervised, log(), ) assert.NoError(t, err) - defer p.Destroy(ctx) assert.NotNil(t, p) - newCtx, cancel := context.WithTimeout(ctx, time.Second) + newCtx, cancel := context.WithTimeout(t.Context(), time.Second) defer cancel() _, err = p.Exec(newCtx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") + t.Cleanup(func() { p.Destroy(t.Context()) }) } func Test_SupervisedPool_AllocateFailedOK(t *testing.T) { @@ -646,9 +617,8 @@ func Test_SupervisedPool_AllocateFailedOK(t *testing.T) { }, } - ctx := context.Background() p, err := NewPool( - ctx, + t.Context(), func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") }, pipe.NewPipeFactory(log()), cfgExecTTL, @@ -661,7 +631,7 @@ func Test_SupervisedPool_AllocateFailedOK(t *testing.T) { time.Sleep(time.Second) // should be ok - _, err = p.Exec(ctx, &payload.Payload{ + _, err = p.Exec(t.Context(), &payload.Payload{ Context: []byte(""), Body: []byte("foo"), }, make(chan struct{})) @@ -679,16 +649,14 @@ func Test_SupervisedPool_AllocateFailedOK(t *testing.T) { if r := recover(); r != nil { assert.Fail(t, "panic should not be fired!") } else { - p.Destroy(context.Background()) + p.Destroy(t.Context()) } }() } func Test_SupervisedPool_NoFreeWorkers(t *testing.T) { - ctx := context.Background() - p, err := NewPool( - ctx, + t.Context(), // sleep for the 3 seconds func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(log()), @@ -705,16 +673,15 @@ func Test_SupervisedPool_NoFreeWorkers(t *testing.T) { assert.NotNil(t, p) go func() { - ctxNew, cancel := context.WithTimeout(ctx, time.Second*5) + ctxNew, cancel := context.WithTimeout(t.Context(), time.Second*5) defer cancel() _, _ = p.Exec(ctxNew, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) }() time.Sleep(time.Second) - _, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{})) + _, err = p.Exec(t.Context(), &payload.Payload{Body: []byte("hello")}, make(chan struct{})) assert.Error(t, err) time.Sleep(time.Second) - - p.Destroy(ctx) + t.Cleanup(func() { p.Destroy(t.Context()) }) } diff --git a/worker/worker.go b/worker/worker.go index d08ddaa..a1916eb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -381,7 +381,7 @@ func (w *Process) Exec(ctx context.Context, p *payload.Payload) (*payload.Payloa c := w.getCh() // set last used time - w.State().SetLastUsed(uint64(time.Now().UnixNano())) //nolint:gosec + w.State().SetLastUsed(uint64(time.Now().UnixNano())) w.State().Transition(fsm.StateWorking) go func() { diff --git a/worker/worker_test.go b/worker/worker_test.go index 593f9e2..d7da6bc 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,7 +1,6 @@ package worker import ( - "context" "os/exec" "testing" @@ -36,7 +35,7 @@ func Test_NotStarted_Exec(t *testing.T) { w, err := InitBaseWorker(cmd) require.NoError(t, err) - _, err = w.Exec(context.Background(), &payload.Payload{ + _, err = w.Exec(t.Context(), &payload.Payload{ Body: []byte("hello"), }) diff --git a/worker_watcher/container/channel/vec.go b/worker_watcher/container/channel/vec.go index 5475c8b..29cd707 100644 --- a/worker_watcher/container/channel/vec.go +++ b/worker_watcher/container/channel/vec.go @@ -14,17 +14,15 @@ import ( type Vec struct { rwm sync.RWMutex // destroy signal - destroy uint64 + destroy atomic.Bool // reset signal - reset uint64 + reset atomic.Bool // channel with the workers workers chan *worker.Process } func NewVector() *Vec { vec := &Vec{ - destroy: 0, - reset: 0, // currently, we can have up to 2048 workers in the pool workers: make(chan *worker.Process, 2048), } @@ -53,12 +51,12 @@ func (v *Vec) Len() int { func (v *Vec) Pop(ctx context.Context) (*worker.Process, error) { // remove all workers and return - if atomic.LoadUint64(&v.destroy) == 1 { + if v.destroy.Load() { return nil, errors.E(errors.WatcherStopped) } // wait for the reset to complete - for atomic.CompareAndSwapUint64(&v.reset, 1, 1) { + for v.reset.Load() { select { case <-ctx.Done(): default: @@ -79,15 +77,15 @@ func (v *Vec) Pop(ctx context.Context) (*worker.Process, error) { } func (v *Vec) ResetDone() { - atomic.StoreUint64(&v.reset, 0) + v.reset.Store(false) } func (v *Vec) Reset() { - atomic.StoreUint64(&v.reset, 1) + v.reset.Store(true) } func (v *Vec) Destroy() { - atomic.StoreUint64(&v.destroy, 1) + v.destroy.Store(true) } func (v *Vec) Remove() { diff --git a/worker_watcher/container/channel/vec_test.go b/worker_watcher/container/channel/vec_test.go index d4856c9..62045e6 100644 --- a/worker_watcher/container/channel/vec_test.go +++ b/worker_watcher/container/channel/vec_test.go @@ -33,7 +33,7 @@ func Test_Vec_PushPop(t *testing.T) { assert.Equal(t, 1, vec.Len()) // Pop worker - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second) defer cancel() popped, err := vec.Pop(ctx) @@ -57,7 +57,7 @@ func Test_Vec_Len(t *testing.T) { assert.Equal(t, 5, vec.Len()) // Pop one - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second) defer cancel() _, err := vec.Pop(ctx) @@ -78,7 +78,7 @@ func Test_Vec_PopAfterDestroy(t *testing.T) { vec.Destroy() // Pop should fail immediately with WatcherStopped - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second) defer cancel() popped, err := vec.Pop(ctx) @@ -93,7 +93,7 @@ func Test_Vec_PopWithCanceledContext(t *testing.T) { // Don't push any workers - channel is empty // Create an already canceled context - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(t.Context()) cancel() // Cancel immediately popped, err := vec.Pop(ctx) @@ -107,7 +107,7 @@ func Test_Vec_PopContextTimeout(t *testing.T) { vec := NewVector() // Empty channel - no workers to pop - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) defer cancel() start := time.Now() diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index adfba4c..cd0b2ff 100644 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -36,6 +36,7 @@ type WorkerWatcher struct { allocator Allocator allocateTimeout time.Duration + stopCh chan struct{} } // NewSyncWorkerWatcher is a constructor for the Watcher @@ -48,6 +49,7 @@ func NewSyncWorkerWatcher(allocator Allocator, log *zap.Logger, numWorkers uint6 allocateTimeout: allocateTimeout, workers: sync.Map{}, allocator: allocator, + stopCh: make(chan struct{}, 1), } // pass a ptr to the number of workers to avoid blocking in the TTL loop @@ -182,8 +184,6 @@ func (ww *WorkerWatcher) Allocate() error { for { select { case <-tt: - // reduce the number of workers - ww.numWorkers.Add(^uint64(0)) allocateFreq.Stop() // timeout exceeds, worker can't be allocated return errors.E(op, errors.WorkerAllocate, err) @@ -199,6 +199,10 @@ func (ww *WorkerWatcher) Allocate() error { // reallocated allocateFreq.Stop() goto done + + case <-ww.stopCh: + allocateFreq.Stop() + return errors.E(op, errors.WatcherStopped) } } } @@ -215,6 +219,7 @@ done: ww.workers.Store(sw.Pid(), sw) // push the worker to the container ww.Release(sw) + return nil } @@ -323,6 +328,11 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { ww.mu.Lock() // do not release new workers ww.container.Destroy() + // stop allocation of new workers if any (idempotent — safe on repeated Destroy calls) + select { + case ww.stopCh <- struct{}{}: + default: + } ww.mu.Unlock() tt := time.NewTicker(time.Second * 1) @@ -366,6 +376,7 @@ func (ww *WorkerWatcher) Destroy(ctx context.Context) { return case <-ctx.Done(): // kill workers + ww.log.Debug("destroy: context canceled", zap.Error(ctx.Err())) ww.mu.Lock() wg := &sync.WaitGroup{} ww.workers.Range(func(key, value any) bool { @@ -422,6 +433,12 @@ func (ww *WorkerWatcher) wait(w *worker.Process) { err = ww.Allocate() if err != nil { + // watcher is shutting down, no need to track the counter + if errors.Is(errors.WatcherStopped, err) { + return + } + + ww.numWorkers.Add(^uint64(0)) // dead worker was not replaced ww.log.Error("failed to allocate the worker", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) if ww.numWorkers.Load() == 0 { panic("no workers available, can't run the application")