128 lines
2.4 KiB
Go
128 lines
2.4 KiB
Go
// taskpool: simple bounded worker pool with safe shutdown + fast dispatch
|
|
package taskpool
|
|
|
|
// 80% of AI generated code
|
|
// human designed & reviewed
|
|
|
|
import (
|
|
"runtime"
|
|
"sync"
|
|
)
|
|
|
|
// Task = value + handler
|
|
type Task[T any] struct {
|
|
Value T
|
|
Handle func(T)
|
|
}
|
|
|
|
// Pool manages worker goroutines processing tasks from a queue.
|
|
type Pool[T any] struct {
|
|
workers int // number of worker goroutines
|
|
queueSize int // channel capacity
|
|
|
|
mu sync.RWMutex // RLock for dispatch, Lock for open/close
|
|
tasks chan Task[T] // task queue (nil when closed)
|
|
wg sync.WaitGroup
|
|
closed bool // prevents sends after close
|
|
}
|
|
|
|
// New creates a pool.
|
|
// nWorkers <= 0 → NumCPU
|
|
// queueSize <= 0 → 256
|
|
func New[T any](nWorkers, queueSize int) *Pool[T] {
|
|
if nWorkers <= 0 {
|
|
nWorkers = runtime.NumCPU()
|
|
}
|
|
if queueSize <= 0 {
|
|
queueSize = 256
|
|
}
|
|
return &Pool[T]{workers: nWorkers, queueSize: queueSize}
|
|
}
|
|
|
|
// Open starts workers (idempotent, restartable after Close).
|
|
func (p *Pool[T]) Open() {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if !p.closed && p.tasks != nil { // already running
|
|
return
|
|
}
|
|
|
|
p.tasks = make(chan Task[T], p.queueSize)
|
|
p.closed = false
|
|
|
|
for i := 0; i < p.workers; i++ {
|
|
p.wg.Add(1)
|
|
go func() {
|
|
defer p.wg.Done()
|
|
for t := range p.tasks { // exits on close
|
|
t.Handle(t.Value)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Dispatch: try queue → fallback inline.
|
|
// Non-blocking, safe under concurrent Close.
|
|
func (p *Pool[T]) Dispatch(t Task[T]) {
|
|
p.mu.RLock()
|
|
tasks := p.tasks
|
|
closed := p.closed
|
|
|
|
if closed || tasks == nil {
|
|
p.mu.RUnlock()
|
|
t.Handle(t.Value) // pool unavailable → run inline
|
|
return
|
|
}
|
|
|
|
select {
|
|
case tasks <- t: // fast path
|
|
p.mu.RUnlock()
|
|
default:
|
|
p.mu.RUnlock()
|
|
t.Handle(t.Value) // queue full → backpressure
|
|
}
|
|
}
|
|
|
|
// TryDispatch: enqueue only, no fallback.
|
|
func (p *Pool[T]) TryDispatch(t Task[T]) bool {
|
|
p.mu.RLock()
|
|
tasks := p.tasks
|
|
closed := p.closed
|
|
|
|
if closed || tasks == nil {
|
|
p.mu.RUnlock()
|
|
return false
|
|
}
|
|
|
|
select {
|
|
case tasks <- t:
|
|
p.mu.RUnlock()
|
|
return true
|
|
default:
|
|
p.mu.RUnlock()
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Close stops workers and waits for completion.
|
|
// Safe to call multiple times.
|
|
func (p *Pool[T]) Close() {
|
|
p.mu.Lock()
|
|
if p.closed {
|
|
p.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
p.closed = true
|
|
tasks := p.tasks
|
|
p.tasks = nil // prevent future sends
|
|
p.mu.Unlock()
|
|
|
|
if tasks != nil {
|
|
close(tasks) // safe: no send can happen (RLock prevents race)
|
|
}
|
|
|
|
p.wg.Wait() // wait for workers to drain
|
|
}
|