diff --git a/go.mod b/go.mod index 25e3cf24..e1d56a7d 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/Azure/go-autorest/autorest v0.11.30 github.com/Azure/go-autorest/autorest/adal v0.9.24 github.com/Azure/go-autorest/tracing v0.6.1 - github.com/alitto/pond/v2 v2.6.2 + github.com/alitto/pond/v2 v2.7.0 github.com/dlclark/regexp2 v1.11.5 github.com/docker/cli v29.3.0+incompatible github.com/golang-jwt/jwt/v4 v4.5.2 diff --git a/go.sum b/go.sum index 6af7a8a0..26727fe9 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/Azure/go-autorest/tracing v0.6.1 h1:YUMSrC/CeD1ZnnXcNYU4a/fzsO35u2Fsful9L/2nyR0= github.com/Azure/go-autorest/tracing v0.6.1/go.mod h1:/3EgjbsjraOqiicERAeu3m7/z0x1TzjQGAwDrJrXGkc= -github.com/alitto/pond/v2 v2.6.2 h1:Sphe40g0ILeM1pA2c2K+Th0DGU+pt0A/Kprr+WB24Pw= -github.com/alitto/pond/v2 v2.6.2/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE= +github.com/alitto/pond/v2 v2.7.0 h1:c76L+yN916m/DRXjGCeUBHHu92uWnh/g1bwVk4zyyXg= +github.com/alitto/pond/v2 v2.7.0/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= diff --git a/vendor/github.com/alitto/pond/v2/README.md b/vendor/github.com/alitto/pond/v2/README.md index 98fe789e..f5fc9e73 100644 --- a/vendor/github.com/alitto/pond/v2/README.md +++ b/vendor/github.com/alitto/pond/v2/README.md @@ -317,6 +317,8 @@ customCtx, cancel := context.WithCancel(context.Background()) pool := pond.NewPool(10, pond.WithContext(customCtx)) ``` +When a pool-level context is canceled, workers stop accepting new work and any queued tasks are drained so the pool can shut down cleanly without deadlocking. Drained tasks are not executed and resolve with a cancellation error. + ### Stopping a pool You can stop a pool using the `Stop` method. This will stop all workers and prevent new tasks from being submitted. You can also wait for all submitted tasks by calling the `Wait` method. @@ -525,8 +527,9 @@ Each worker pool instance exposes useful metrics that can be queried through the - `pool.SubmittedTasks() uint64`: Total number of tasks submitted since the pool was created and before it was stopped. This includes tasks that were dropped because the queue was full - `pool.WaitingTasks() uint64`: Current number of tasks in the queue that are waiting to be executed - `pool.SuccessfulTasks() uint64`: Total number of tasks that have successfully completed their execution since the pool was created -- `pool.FailedTasks() uint64`: Total number of tasks that completed with panic since the pool was created -- `pool.CompletedTasks() uint64`: Total number of tasks that have completed their execution either successfully or with panic since the pool was created +- `pool.FailedTasks() uint64`: Total number of tasks that completed with a non-cancellation error (including panics) since the pool was created +- `pool.CanceledTasks() uint64`: Total number of tasks accepted by the pool that were canceled before executing user code due to context cancellation +- `pool.CompletedTasks() uint64`: Total number of tasks that have completed their execution either successfully or with a non-cancellation error since the pool was created - `pool.DroppedTasks() uint64`: Total number of tasks that were dropped because the queue was full since the pool was created In our [Prometheus example](./examples/prometheus/main.go) we showcase how to configure collectors for these metrics and expose them to Prometheus. diff --git a/vendor/github.com/alitto/pond/v2/group.go b/vendor/github.com/alitto/pond/v2/group.go index 8bbbba08..f57a5343 100644 --- a/vendor/github.com/alitto/pond/v2/group.go +++ b/vendor/github.com/alitto/pond/v2/group.go @@ -118,9 +118,13 @@ func (g *abstractTaskGroup[T, E, O]) submit(task any) { // Check if the context has been cancelled to prevent running tasks that are not needed if err := g.future.Context().Err(); err != nil { + // Wrap the error with the context canceled error to reflect that the task was canceled. + err = errors.Join(ErrContextCanceled, err) + g.futureResolver(index, &result[O]{ Err: err, }, err) + return err } diff --git a/vendor/github.com/alitto/pond/v2/pool.go b/vendor/github.com/alitto/pond/v2/pool.go index 29cd5505..4fcae8a1 100644 --- a/vendor/github.com/alitto/pond/v2/pool.go +++ b/vendor/github.com/alitto/pond/v2/pool.go @@ -52,11 +52,16 @@ type basePool interface { SuccessfulTasks() uint64 // Returns the total number of tasks that have completed (either successfully or with an error). + // Tasks accepted by the pool but canceled before execution are excluded. CompletedTasks() uint64 // Returns the number of tasks that have been dropped because the queue was full. DroppedTasks() uint64 + // Returns the number of tasks accepted by the pool that were canceled + // before executing user code due to pool context cancellation. + CanceledTasks() uint64 + // Returns the maximum concurrency of the pool. MaxConcurrency() int @@ -148,6 +153,7 @@ type pool struct { successfulTaskCount atomic.Uint64 failedTaskCount atomic.Uint64 droppedTaskCount atomic.Uint64 + canceledTaskCount atomic.Uint64 } func (p *pool) Context() context.Context { @@ -230,6 +236,10 @@ func (p *pool) DroppedTasks() uint64 { return p.droppedTaskCount.Load() } +func (p *pool) CanceledTasks() uint64 { + return p.canceledTaskCount.Load() +} + func (p *pool) worker(task any) { var readTaskErr, err error exitedNormally := false @@ -276,7 +286,26 @@ func (p *pool) subpoolWorker(task any) func() (output any, err error) { // Attempt to submit the next task to the parent pool if task, err := p.readTask(); err == nil { - p.parent.submit(p.subpoolWorker(task), p.nonBlocking) + for { + submitErr := p.parent.submit(p.subpoolWorker(task), p.nonBlocking) + if submitErr == nil { + break + } + + // Wrap the error with the context canceled error to reflect that the task was canceled. + if errors.Is(submitErr, ErrPoolStopped) { + err = errors.Join(ErrContextCanceled, submitErr) + p.updateMetrics(err) + p.parent.updateMetrics(err) + } + + // If the parent pool is stopped/canceled it won't accept submissions. + // Keep draining the subpool queue so workers can exit cleanly. + task, err = p.readTask() + if err != nil { + break + } + } } return @@ -324,7 +353,7 @@ func (p *pool) wrapTask(task any) (Task, func() error, func(error)) { ctx := p.Context() future, resolve := future.NewFuture(ctx) - wrappedTask := wrapTask[struct{}, func(error)](task, resolve, p.panicRecovery) + wrappedTask := wrapTask[struct{}, func(error)](task, resolve, ctx, p.panicRecovery) return future, wrappedTask, resolve } @@ -433,19 +462,6 @@ func (p *pool) launchWorker(task any) { func (p *pool) readTask() (task any, err error) { p.mutex.Lock() - // Check if the pool context has been cancelled - select { - case <-p.ctx.Done(): - // Context cancelled, worker will exit - p.workerCount.Add(-1) - p.workerWaitGroup.Done() - p.mutex.Unlock() - - err = p.ctx.Err() - return - default: - } - if p.tasks.Len() == 0 { // No more tasks in the queue, worker will exit p.workerCount.Add(-1) @@ -490,7 +506,11 @@ func (p *pool) notifySubmitWaiter() { func (p *pool) updateMetrics(err error) { if err != nil { - p.failedTaskCount.Add(1) + if errors.Is(err, ErrContextCanceled) { + p.canceledTaskCount.Add(1) + } else { + p.failedTaskCount.Add(1) + } } else { p.successfulTaskCount.Add(1) } diff --git a/vendor/github.com/alitto/pond/v2/result.go b/vendor/github.com/alitto/pond/v2/result.go index 52debc56..6db87506 100644 --- a/vendor/github.com/alitto/pond/v2/result.go +++ b/vendor/github.com/alitto/pond/v2/result.go @@ -75,7 +75,8 @@ func (p *resultPool[R]) TrySubmitErr(task func() (R, error)) (ResultTask[R], boo } func (p *resultPool[R]) submit(task any, nonBlocking bool) (ResultTask[R], bool) { - future, resolve := future.NewValueFuture[R](p.Context()) + ctx := p.Context() + future, resolve := future.NewValueFuture[R](ctx) if p.Stopped() { var zero R @@ -83,7 +84,7 @@ func (p *resultPool[R]) submit(task any, nonBlocking bool) (ResultTask[R], bool) return future, false } - wrapped := wrapTask[R, func(R, error)](task, resolve, p.pool.panicRecovery) + wrapped := wrapTask[R, func(R, error)](task, resolve, ctx, p.pool.panicRecovery) if err := p.pool.submit(wrapped, nonBlocking); err != nil { var zero R diff --git a/vendor/github.com/alitto/pond/v2/task.go b/vendor/github.com/alitto/pond/v2/task.go index b93b7be4..f30d542f 100644 --- a/vendor/github.com/alitto/pond/v2/task.go +++ b/vendor/github.com/alitto/pond/v2/task.go @@ -1,6 +1,7 @@ package pond import ( + "context" "errors" "fmt" "runtime/debug" @@ -8,14 +9,28 @@ import ( var ErrPanic = errors.New("task panicked") +var ErrContextCanceled = errors.New("context canceled") + type wrappedTask[R any, C func(error) | func(R, error)] struct { task any callback C + ctx context.Context panicRecovery bool } func (t wrappedTask[R, C]) Run() error { - result, err := invokeTask[R](t.task, t.panicRecovery) + var result R + var err error + + if t.ctx != nil { + if ctxErr := t.ctx.Err(); ctxErr != nil { + err = errors.Join(ErrContextCanceled, ctxErr) + } + } + + if err == nil { + result, err = invokeTask[R](t.task, t.panicRecovery) + } switch c := any(t.callback).(type) { case func(error): @@ -29,10 +44,11 @@ func (t wrappedTask[R, C]) Run() error { return err } -func wrapTask[R any, C func(error) | func(R, error)](task any, callback C, panicRecovery bool) func() error { +func wrapTask[R any, C func(error) | func(R, error)](task any, callback C, ctx context.Context, panicRecovery bool) func() error { wrapped := &wrappedTask[R, C]{ task: task, callback: callback, + ctx: ctx, panicRecovery: panicRecovery, } diff --git a/vendor/modules.txt b/vendor/modules.txt index 926698d9..07569a72 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -21,7 +21,7 @@ github.com/Azure/go-autorest/logger # github.com/Azure/go-autorest/tracing v0.6.1 ## explicit; go 1.15 github.com/Azure/go-autorest/tracing -# github.com/alitto/pond/v2 v2.6.2 +# github.com/alitto/pond/v2 v2.7.0 ## explicit; go 1.20 github.com/alitto/pond/v2 github.com/alitto/pond/v2/internal/future