diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index 0cb45fe9..0dc8a4c3 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -34,9 +34,15 @@ type Poller struct { shutdownJobs context.CancelFunc done chan struct{} +} - consecutiveEmpty atomic.Int64 // count of consecutive polls with no task available - consecutiveErrors atomic.Int64 // count of consecutive fetch errors +// workerState holds per-goroutine polling state. Backoff counters are +// per-worker so that with Capacity > 1, N workers each seeing one empty +// response don't combine into a "consecutive N empty" reading on a shared +// counter and trigger an unnecessarily long backoff. +type workerState struct { + consecutiveEmpty int64 + consecutiveErrors int64 } func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { @@ -74,7 +80,7 @@ func (p *Poller) Poll() { } func (p *Poller) PollOnce() { - p.pollOnce() + p.pollOnce(&workerState{}) // signal that we're done close(p.done) @@ -111,8 +117,9 @@ func (p *Poller) Shutdown(ctx context.Context) error { func (p *Poller) poll(wg *sync.WaitGroup) { defer wg.Done() + s := &workerState{} for { - p.pollOnce() + p.pollOnce(s) select { case <-p.pollingCtx.Done(): @@ -126,11 +133,11 @@ func (p *Poller) poll(wg *sync.WaitGroup) { // calculateInterval returns the polling interval with exponential backoff based on // consecutive empty or error responses. The interval starts at FetchInterval and // doubles with each consecutive empty/error, capped at FetchIntervalMax. -func (p *Poller) calculateInterval() time.Duration { +func (p *Poller) calculateInterval(s *workerState) time.Duration { base := p.cfg.Runner.FetchInterval maxInterval := p.cfg.Runner.FetchIntervalMax - n := max(p.consecutiveEmpty.Load(), p.consecutiveErrors.Load()) + n := max(s.consecutiveEmpty, s.consecutiveErrors) if n <= 1 { return base } @@ -155,11 +162,11 @@ func addJitter(d time.Duration) time.Duration { return d + time.Duration(jitter) } -func (p *Poller) pollOnce() { +func (p *Poller) pollOnce(s *workerState) { for { - task, ok := p.fetchTask(p.pollingCtx) + task, ok := p.fetchTask(p.pollingCtx, s) if !ok { - interval := addJitter(p.calculateInterval()) + interval := addJitter(p.calculateInterval(s)) timer := time.NewTimer(interval) select { case <-timer.C: @@ -171,8 +178,8 @@ func (p *Poller) pollOnce() { } // Got a task — reset backoff counters for fast subsequent polling. - p.consecutiveEmpty.Store(0) - p.consecutiveErrors.Store(0) + s.consecutiveEmpty = 0 + s.consecutiveErrors = 0 p.runTaskWithRecover(p.jobsCtx, task) return @@ -192,7 +199,7 @@ func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) { } } -func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { +func (p *Poller) fetchTask(ctx context.Context, s *workerState) (*runnerv1.Task, bool) { reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout) defer cancel() @@ -206,15 +213,15 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { } if err != nil { log.WithError(err).Error("failed to fetch task") - p.consecutiveErrors.Add(1) + s.consecutiveErrors++ return nil, false } // Successful response — reset error counter. - p.consecutiveErrors.Store(0) + s.consecutiveErrors = 0 if resp == nil || resp.Msg == nil { - p.consecutiveEmpty.Add(1) + s.consecutiveEmpty++ return nil, false } @@ -223,7 +230,7 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { } if resp.Msg.Task == nil { - p.consecutiveEmpty.Add(1) + s.consecutiveEmpty++ return nil, false } diff --git a/internal/app/poll/poller_test.go b/internal/app/poll/poller_test.go new file mode 100644 index 00000000..bf9a25ae --- /dev/null +++ b/internal/app/poll/poller_test.go @@ -0,0 +1,108 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package poll + +import ( + "context" + "errors" + "testing" + "time" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + connect_go "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "gitea.com/gitea/act_runner/internal/pkg/client/mocks" + "gitea.com/gitea/act_runner/internal/pkg/config" +) + +// TestPoller_PerWorkerCounters verifies that each worker maintains its own +// backoff counters. With a shared counter, N workers each seeing one empty +// response would inflate the counter to N and trigger an unnecessarily long +// backoff. With per-worker state, each worker only sees its own count. +func TestPoller_PerWorkerCounters(t *testing.T) { + client := mocks.NewClient(t) + client.On("FetchTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) { + // Always return an empty response. + return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil + }, + ) + + cfg, err := config.LoadDefault("") + require.NoError(t, err) + p := &Poller{client: client, cfg: cfg} + + ctx := context.Background() + s1 := &workerState{} + s2 := &workerState{} + + // Each worker independently observes one empty response. + _, ok := p.fetchTask(ctx, s1) + require.False(t, ok) + _, ok = p.fetchTask(ctx, s2) + require.False(t, ok) + + assert.Equal(t, int64(1), s1.consecutiveEmpty, "worker 1 should only count its own empty response") + assert.Equal(t, int64(1), s2.consecutiveEmpty, "worker 2 should only count its own empty response") + + // Worker 1 sees a second empty; worker 2 stays at 1. + _, ok = p.fetchTask(ctx, s1) + require.False(t, ok) + assert.Equal(t, int64(2), s1.consecutiveEmpty) + assert.Equal(t, int64(1), s2.consecutiveEmpty, "worker 2's counter must not be affected by worker 1's empty fetches") +} + +// TestPoller_FetchErrorIncrementsErrorsOnly verifies that a fetch error +// increments only the per-worker error counter, not the empty counter. +func TestPoller_FetchErrorIncrementsErrorsOnly(t *testing.T) { + client := mocks.NewClient(t) + client.On("FetchTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) { + return nil, errors.New("network unreachable") + }, + ) + + cfg, err := config.LoadDefault("") + require.NoError(t, err) + p := &Poller{client: client, cfg: cfg} + + s := &workerState{} + _, ok := p.fetchTask(context.Background(), s) + require.False(t, ok) + assert.Equal(t, int64(1), s.consecutiveErrors) + assert.Equal(t, int64(0), s.consecutiveEmpty) +} + +// TestPoller_CalculateInterval verifies the per-worker exponential backoff +// math is correctly driven by the worker's own counters. +func TestPoller_CalculateInterval(t *testing.T) { + cfg, err := config.LoadDefault("") + require.NoError(t, err) + cfg.Runner.FetchInterval = 2 * time.Second + cfg.Runner.FetchIntervalMax = 60 * time.Second + p := &Poller{cfg: cfg} + + cases := []struct { + name string + empty, errs int64 + wantInterval time.Duration + }{ + {"first poll, no backoff", 0, 0, 2 * time.Second}, + {"single empty, still base", 1, 0, 2 * time.Second}, + {"two empties, doubled", 2, 0, 4 * time.Second}, + {"five empties, capped path", 5, 0, 32 * time.Second}, + {"many empties, capped at max", 20, 0, 60 * time.Second}, + {"errors drive backoff too", 0, 3, 8 * time.Second}, + {"max(empty, errors) wins", 2, 4, 16 * time.Second}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s := &workerState{consecutiveEmpty: tc.empty, consecutiveErrors: tc.errs} + assert.Equal(t, tc.wantInterval, p.calculateInterval(s)) + }) + } +}