refactor(poll): use per-worker backoff counters

- Introduce workerState holding consecutiveEmpty and consecutiveErrors
- Plumb workerState through pollOnce, fetchTask and calculateInterval
- Drop the shared atomic.Int64 counters from Poller

With Capacity > 1, the previous shared counters inflated whenever multiple
workers each saw a single empty response, triggering an unnecessarily long
backoff. Per-worker state keeps each goroutine's backoff independent.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Bo-Yi Wu
2026-04-12 11:24:33 +08:00
parent 2931fe9e48
commit 1b9633ab2f
2 changed files with 131 additions and 16 deletions

View File

@@ -34,9 +34,15 @@ type Poller struct {
shutdownJobs context.CancelFunc shutdownJobs context.CancelFunc
done chan struct{} done chan struct{}
}
consecutiveEmpty atomic.Int64 // count of consecutive polls with no task available // workerState holds per-goroutine polling state. Backoff counters are
consecutiveErrors atomic.Int64 // count of consecutive fetch errors // per-worker so that with Capacity > 1, N workers each seeing one empty
// response don't combine into a "consecutive N empty" reading on a shared
// counter and trigger an unnecessarily long backoff.
type workerState struct {
consecutiveEmpty int64
consecutiveErrors int64
} }
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
@@ -74,7 +80,7 @@ func (p *Poller) Poll() {
} }
func (p *Poller) PollOnce() { func (p *Poller) PollOnce() {
p.pollOnce() p.pollOnce(&workerState{})
// signal that we're done // signal that we're done
close(p.done) close(p.done)
@@ -111,8 +117,9 @@ func (p *Poller) Shutdown(ctx context.Context) error {
func (p *Poller) poll(wg *sync.WaitGroup) { func (p *Poller) poll(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
s := &workerState{}
for { for {
p.pollOnce() p.pollOnce(s)
select { select {
case <-p.pollingCtx.Done(): case <-p.pollingCtx.Done():
@@ -126,11 +133,11 @@ func (p *Poller) poll(wg *sync.WaitGroup) {
// calculateInterval returns the polling interval with exponential backoff based on // calculateInterval returns the polling interval with exponential backoff based on
// consecutive empty or error responses. The interval starts at FetchInterval and // consecutive empty or error responses. The interval starts at FetchInterval and
// doubles with each consecutive empty/error, capped at FetchIntervalMax. // doubles with each consecutive empty/error, capped at FetchIntervalMax.
func (p *Poller) calculateInterval() time.Duration { func (p *Poller) calculateInterval(s *workerState) time.Duration {
base := p.cfg.Runner.FetchInterval base := p.cfg.Runner.FetchInterval
maxInterval := p.cfg.Runner.FetchIntervalMax maxInterval := p.cfg.Runner.FetchIntervalMax
n := max(p.consecutiveEmpty.Load(), p.consecutiveErrors.Load()) n := max(s.consecutiveEmpty, s.consecutiveErrors)
if n <= 1 { if n <= 1 {
return base return base
} }
@@ -155,11 +162,11 @@ func addJitter(d time.Duration) time.Duration {
return d + time.Duration(jitter) return d + time.Duration(jitter)
} }
func (p *Poller) pollOnce() { func (p *Poller) pollOnce(s *workerState) {
for { for {
task, ok := p.fetchTask(p.pollingCtx) task, ok := p.fetchTask(p.pollingCtx, s)
if !ok { if !ok {
interval := addJitter(p.calculateInterval()) interval := addJitter(p.calculateInterval(s))
timer := time.NewTimer(interval) timer := time.NewTimer(interval)
select { select {
case <-timer.C: case <-timer.C:
@@ -171,8 +178,8 @@ func (p *Poller) pollOnce() {
} }
// Got a task — reset backoff counters for fast subsequent polling. // Got a task — reset backoff counters for fast subsequent polling.
p.consecutiveEmpty.Store(0) s.consecutiveEmpty = 0
p.consecutiveErrors.Store(0) s.consecutiveErrors = 0
p.runTaskWithRecover(p.jobsCtx, task) p.runTaskWithRecover(p.jobsCtx, task)
return return
@@ -192,7 +199,7 @@ func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
} }
} }
func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { func (p *Poller) fetchTask(ctx context.Context, s *workerState) (*runnerv1.Task, bool) {
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout) reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
defer cancel() defer cancel()
@@ -206,15 +213,15 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
} }
if err != nil { if err != nil {
log.WithError(err).Error("failed to fetch task") log.WithError(err).Error("failed to fetch task")
p.consecutiveErrors.Add(1) s.consecutiveErrors++
return nil, false return nil, false
} }
// Successful response — reset error counter. // Successful response — reset error counter.
p.consecutiveErrors.Store(0) s.consecutiveErrors = 0
if resp == nil || resp.Msg == nil { if resp == nil || resp.Msg == nil {
p.consecutiveEmpty.Add(1) s.consecutiveEmpty++
return nil, false return nil, false
} }
@@ -223,7 +230,7 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
} }
if resp.Msg.Task == nil { if resp.Msg.Task == nil {
p.consecutiveEmpty.Add(1) s.consecutiveEmpty++
return nil, false return nil, false
} }

View File

@@ -0,0 +1,108 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package poll
import (
"context"
"errors"
"testing"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
connect_go "connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
"gitea.com/gitea/act_runner/internal/pkg/config"
)
// TestPoller_PerWorkerCounters verifies that each worker maintains its own
// backoff counters. With a shared counter, N workers each seeing one empty
// response would inflate the counter to N and trigger an unnecessarily long
// backoff. With per-worker state, each worker only sees its own count.
func TestPoller_PerWorkerCounters(t *testing.T) {
client := mocks.NewClient(t)
client.On("FetchTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
// Always return an empty response.
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil
},
)
cfg, err := config.LoadDefault("")
require.NoError(t, err)
p := &Poller{client: client, cfg: cfg}
ctx := context.Background()
s1 := &workerState{}
s2 := &workerState{}
// Each worker independently observes one empty response.
_, ok := p.fetchTask(ctx, s1)
require.False(t, ok)
_, ok = p.fetchTask(ctx, s2)
require.False(t, ok)
assert.Equal(t, int64(1), s1.consecutiveEmpty, "worker 1 should only count its own empty response")
assert.Equal(t, int64(1), s2.consecutiveEmpty, "worker 2 should only count its own empty response")
// Worker 1 sees a second empty; worker 2 stays at 1.
_, ok = p.fetchTask(ctx, s1)
require.False(t, ok)
assert.Equal(t, int64(2), s1.consecutiveEmpty)
assert.Equal(t, int64(1), s2.consecutiveEmpty, "worker 2's counter must not be affected by worker 1's empty fetches")
}
// TestPoller_FetchErrorIncrementsErrorsOnly verifies that a fetch error
// increments only the per-worker error counter, not the empty counter.
func TestPoller_FetchErrorIncrementsErrorsOnly(t *testing.T) {
client := mocks.NewClient(t)
client.On("FetchTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
return nil, errors.New("network unreachable")
},
)
cfg, err := config.LoadDefault("")
require.NoError(t, err)
p := &Poller{client: client, cfg: cfg}
s := &workerState{}
_, ok := p.fetchTask(context.Background(), s)
require.False(t, ok)
assert.Equal(t, int64(1), s.consecutiveErrors)
assert.Equal(t, int64(0), s.consecutiveEmpty)
}
// TestPoller_CalculateInterval verifies the per-worker exponential backoff
// math is correctly driven by the worker's own counters.
func TestPoller_CalculateInterval(t *testing.T) {
cfg, err := config.LoadDefault("")
require.NoError(t, err)
cfg.Runner.FetchInterval = 2 * time.Second
cfg.Runner.FetchIntervalMax = 60 * time.Second
p := &Poller{cfg: cfg}
cases := []struct {
name string
empty, errs int64
wantInterval time.Duration
}{
{"first poll, no backoff", 0, 0, 2 * time.Second},
{"single empty, still base", 1, 0, 2 * time.Second},
{"two empties, doubled", 2, 0, 4 * time.Second},
{"five empties, capped path", 5, 0, 32 * time.Second},
{"many empties, capped at max", 20, 0, 60 * time.Second},
{"errors drive backoff too", 0, 3, 8 * time.Second},
{"max(empty, errors) wins", 2, 4, 16 * time.Second},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
s := &workerState{consecutiveEmpty: tc.empty, consecutiveErrors: tc.errs}
assert.Equal(t, tc.wantInterval, p.calculateInterval(s))
})
}
}