diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index eaf6dac1..8b063831 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -37,16 +37,15 @@ type Poller struct { done chan struct{} } -// workerState holds per-goroutine polling state. Backoff counters are -// per-worker so that with Capacity > 1, N workers each seeing one empty -// response don't combine into a "consecutive N empty" reading on a shared -// counter and trigger an unnecessarily long backoff. +// workerState holds the single poller's backoff state. Consecutive empty or +// error responses drive exponential backoff; a successful task fetch resets +// both counters so the next poll fires immediately. type workerState struct { consecutiveEmpty int64 consecutiveErrors int64 - // lastBackoff is the last interval reported to the PollBackoffSeconds gauge - // from this worker; used to suppress redundant no-op Set calls when the - // backoff plateaus (e.g. at FetchIntervalMax). + // lastBackoff is the last interval reported to the PollBackoffSeconds gauge; + // used to suppress redundant no-op Set calls when the backoff plateaus + // (e.g. at FetchIntervalMax). lastBackoff time.Duration } @@ -73,22 +72,57 @@ func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { } func (p *Poller) Poll() { + sem := make(chan struct{}, p.cfg.Runner.Capacity) wg := &sync.WaitGroup{} - for i := 0; i < p.cfg.Runner.Capacity; i++ { - wg.Add(1) - go p.poll(wg) - } - wg.Wait() + s := &workerState{} - // signal that we shutdown - close(p.done) + defer func() { + wg.Wait() + close(p.done) + }() + + for { + select { + case sem <- struct{}{}: + case <-p.pollingCtx.Done(): + return + } + + task, ok := p.fetchTask(p.pollingCtx, s) + if !ok { + <-sem + if !p.waitBackoff(s) { + return + } + continue + } + + s.resetBackoff() + + wg.Add(1) + go func(t *runnerv1.Task) { + defer wg.Done() + defer func() { <-sem }() + p.runTaskWithRecover(p.jobsCtx, t) + }(task) + } } func (p *Poller) PollOnce() { - p.pollOnce(&workerState{}) - - // signal that we're done - close(p.done) + defer close(p.done) + s := &workerState{} + for { + task, ok := p.fetchTask(p.pollingCtx, s) + if !ok { + if !p.waitBackoff(s) { + return + } + continue + } + s.resetBackoff() + p.runTaskWithRecover(p.jobsCtx, task) + return + } } func (p *Poller) Shutdown(ctx context.Context) error { @@ -101,13 +135,13 @@ func (p *Poller) Shutdown(ctx context.Context) error { // our timeout for shutting down ran out case <-ctx.Done(): - // when both the timeout fires and the graceful shutdown - // completed succsfully, this branch of the select may - // fire. Do a non-blocking check here against the graceful - // shutdown status to avoid sending an error if we don't need to. - _, ok := <-p.done - if !ok { + // Both the timeout and the graceful shutdown may fire + // simultaneously. Do a non-blocking check to avoid forcing + // a shutdown when graceful already completed. + select { + case <-p.done: return nil + default: } // force a shutdown of all running jobs @@ -120,18 +154,27 @@ func (p *Poller) Shutdown(ctx context.Context) error { } } -func (p *Poller) poll(wg *sync.WaitGroup) { - defer wg.Done() - s := &workerState{} - for { - p.pollOnce(s) +func (s *workerState) resetBackoff() { + s.consecutiveEmpty = 0 + s.consecutiveErrors = 0 + s.lastBackoff = 0 +} - select { - case <-p.pollingCtx.Done(): - return - default: - continue - } +// waitBackoff sleeps for the current backoff interval (with jitter). +// Returns false if the polling context was cancelled during the wait. +func (p *Poller) waitBackoff(s *workerState) bool { + base := p.calculateInterval(s) + if base != s.lastBackoff { + metrics.PollBackoffSeconds.Set(base.Seconds()) + s.lastBackoff = base + } + timer := time.NewTimer(addJitter(base)) + select { + case <-timer.C: + return true + case <-p.pollingCtx.Done(): + timer.Stop() + return false } } @@ -167,34 +210,6 @@ func addJitter(d time.Duration) time.Duration { return d + time.Duration(jitter) } -func (p *Poller) pollOnce(s *workerState) { - for { - task, ok := p.fetchTask(p.pollingCtx, s) - if !ok { - base := p.calculateInterval(s) - if base != s.lastBackoff { - metrics.PollBackoffSeconds.Set(base.Seconds()) - s.lastBackoff = base - } - timer := time.NewTimer(addJitter(base)) - select { - case <-timer.C: - case <-p.pollingCtx.Done(): - timer.Stop() - return - } - continue - } - - // Got a task — reset backoff counters for fast subsequent polling. - s.consecutiveEmpty = 0 - s.consecutiveErrors = 0 - - p.runTaskWithRecover(p.jobsCtx, task) - return - } -} - func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) { defer func() { if r := recover(); r != nil { diff --git a/internal/app/poll/poller_test.go b/internal/app/poll/poller_test.go index bf9a25ae..98c0a846 100644 --- a/internal/app/poll/poller_test.go +++ b/internal/app/poll/poller_test.go @@ -19,11 +19,10 @@ import ( "gitea.com/gitea/act_runner/internal/pkg/config" ) -// TestPoller_PerWorkerCounters verifies that each worker maintains its own -// backoff counters. With a shared counter, N workers each seeing one empty -// response would inflate the counter to N and trigger an unnecessarily long -// backoff. With per-worker state, each worker only sees its own count. -func TestPoller_PerWorkerCounters(t *testing.T) { +// TestPoller_WorkerStateCounters verifies that workerState correctly tracks +// consecutive empty responses independently per state instance, and that +// fetchTask increments only the relevant counter. +func TestPoller_WorkerStateCounters(t *testing.T) { client := mocks.NewClient(t) client.On("FetchTask", mock.Anything, mock.Anything).Return( func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) { @@ -77,8 +76,8 @@ func TestPoller_FetchErrorIncrementsErrorsOnly(t *testing.T) { assert.Equal(t, int64(0), s.consecutiveEmpty) } -// TestPoller_CalculateInterval verifies the per-worker exponential backoff -// math is correctly driven by the worker's own counters. +// TestPoller_CalculateInterval verifies the exponential backoff math is +// correctly driven by the workerState counters. func TestPoller_CalculateInterval(t *testing.T) { cfg, err := config.LoadDefault("") require.NoError(t, err) diff --git a/internal/pkg/metrics/metrics.go b/internal/pkg/metrics/metrics.go index 968ed9b8..5ce08382 100644 --- a/internal/pkg/metrics/metrics.go +++ b/internal/pkg/metrics/metrics.go @@ -89,7 +89,7 @@ var ( Namespace: Namespace, Subsystem: "poll", Name: "backoff_seconds", - Help: "Last observed polling backoff interval. With Capacity > 1, reflects whichever worker wrote last.", + Help: "Last observed polling backoff interval in seconds.", }) JobsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{