2 Commits

Author SHA1 Message Date
Bo-Yi Wu
1b9633ab2f refactor(poll): use per-worker backoff counters
- Introduce workerState holding consecutiveEmpty and consecutiveErrors
- Plumb workerState through pollOnce, fetchTask and calculateInterval
- Drop the shared atomic.Int64 counters from Poller

With Capacity > 1, the previous shared counters inflated whenever multiple
workers each saw a single empty response, triggering an unnecessarily long
backoff. Per-worker state keeps each goroutine's backoff independent.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 11:24:33 +08:00
Bo-Yi Wu
2931fe9e48 fix(report): prevent state change loss during in-flight ReportState
- Consume stateChanged atomically with the state snapshot under a single Lock
- Restore stateChanged on UpdateTask error so the change is not silently lost
- Collapse the early-return check into the same Lock to avoid triple locking
- Add tests covering the in-flight Fire race and the error-restore path

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 11:24:22 +08:00
4 changed files with 247 additions and 29 deletions

View File

@@ -34,9 +34,15 @@ type Poller struct {
shutdownJobs context.CancelFunc
done chan struct{}
}
consecutiveEmpty atomic.Int64 // count of consecutive polls with no task available
consecutiveErrors atomic.Int64 // count of consecutive fetch errors
// workerState holds per-goroutine polling state. Backoff counters are
// per-worker so that with Capacity > 1, N workers each seeing one empty
// response don't combine into a "consecutive N empty" reading on a shared
// counter and trigger an unnecessarily long backoff.
type workerState struct {
consecutiveEmpty int64
consecutiveErrors int64
}
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
@@ -74,7 +80,7 @@ func (p *Poller) Poll() {
}
func (p *Poller) PollOnce() {
p.pollOnce()
p.pollOnce(&workerState{})
// signal that we're done
close(p.done)
@@ -111,8 +117,9 @@ func (p *Poller) Shutdown(ctx context.Context) error {
func (p *Poller) poll(wg *sync.WaitGroup) {
defer wg.Done()
s := &workerState{}
for {
p.pollOnce()
p.pollOnce(s)
select {
case <-p.pollingCtx.Done():
@@ -126,11 +133,11 @@ func (p *Poller) poll(wg *sync.WaitGroup) {
// calculateInterval returns the polling interval with exponential backoff based on
// consecutive empty or error responses. The interval starts at FetchInterval and
// doubles with each consecutive empty/error, capped at FetchIntervalMax.
func (p *Poller) calculateInterval() time.Duration {
func (p *Poller) calculateInterval(s *workerState) time.Duration {
base := p.cfg.Runner.FetchInterval
maxInterval := p.cfg.Runner.FetchIntervalMax
n := max(p.consecutiveEmpty.Load(), p.consecutiveErrors.Load())
n := max(s.consecutiveEmpty, s.consecutiveErrors)
if n <= 1 {
return base
}
@@ -155,11 +162,11 @@ func addJitter(d time.Duration) time.Duration {
return d + time.Duration(jitter)
}
func (p *Poller) pollOnce() {
func (p *Poller) pollOnce(s *workerState) {
for {
task, ok := p.fetchTask(p.pollingCtx)
task, ok := p.fetchTask(p.pollingCtx, s)
if !ok {
interval := addJitter(p.calculateInterval())
interval := addJitter(p.calculateInterval(s))
timer := time.NewTimer(interval)
select {
case <-timer.C:
@@ -171,8 +178,8 @@ func (p *Poller) pollOnce() {
}
// Got a task — reset backoff counters for fast subsequent polling.
p.consecutiveEmpty.Store(0)
p.consecutiveErrors.Store(0)
s.consecutiveEmpty = 0
s.consecutiveErrors = 0
p.runTaskWithRecover(p.jobsCtx, task)
return
@@ -192,7 +199,7 @@ func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
}
}
func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
func (p *Poller) fetchTask(ctx context.Context, s *workerState) (*runnerv1.Task, bool) {
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
defer cancel()
@@ -206,15 +213,15 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
}
if err != nil {
log.WithError(err).Error("failed to fetch task")
p.consecutiveErrors.Add(1)
s.consecutiveErrors++
return nil, false
}
// Successful response — reset error counter.
p.consecutiveErrors.Store(0)
s.consecutiveErrors = 0
if resp == nil || resp.Msg == nil {
p.consecutiveEmpty.Add(1)
s.consecutiveEmpty++
return nil, false
}
@@ -223,7 +230,7 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
}
if resp.Msg.Task == nil {
p.consecutiveEmpty.Add(1)
s.consecutiveEmpty++
return nil, false
}

View File

@@ -0,0 +1,108 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package poll
import (
"context"
"errors"
"testing"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
connect_go "connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
"gitea.com/gitea/act_runner/internal/pkg/config"
)
// TestPoller_PerWorkerCounters verifies that each worker maintains its own
// backoff counters. With a shared counter, N workers each seeing one empty
// response would inflate the counter to N and trigger an unnecessarily long
// backoff. With per-worker state, each worker only sees its own count.
func TestPoller_PerWorkerCounters(t *testing.T) {
client := mocks.NewClient(t)
client.On("FetchTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
// Always return an empty response.
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil
},
)
cfg, err := config.LoadDefault("")
require.NoError(t, err)
p := &Poller{client: client, cfg: cfg}
ctx := context.Background()
s1 := &workerState{}
s2 := &workerState{}
// Each worker independently observes one empty response.
_, ok := p.fetchTask(ctx, s1)
require.False(t, ok)
_, ok = p.fetchTask(ctx, s2)
require.False(t, ok)
assert.Equal(t, int64(1), s1.consecutiveEmpty, "worker 1 should only count its own empty response")
assert.Equal(t, int64(1), s2.consecutiveEmpty, "worker 2 should only count its own empty response")
// Worker 1 sees a second empty; worker 2 stays at 1.
_, ok = p.fetchTask(ctx, s1)
require.False(t, ok)
assert.Equal(t, int64(2), s1.consecutiveEmpty)
assert.Equal(t, int64(1), s2.consecutiveEmpty, "worker 2's counter must not be affected by worker 1's empty fetches")
}
// TestPoller_FetchErrorIncrementsErrorsOnly verifies that a fetch error
// increments only the per-worker error counter, not the empty counter.
func TestPoller_FetchErrorIncrementsErrorsOnly(t *testing.T) {
client := mocks.NewClient(t)
client.On("FetchTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
return nil, errors.New("network unreachable")
},
)
cfg, err := config.LoadDefault("")
require.NoError(t, err)
p := &Poller{client: client, cfg: cfg}
s := &workerState{}
_, ok := p.fetchTask(context.Background(), s)
require.False(t, ok)
assert.Equal(t, int64(1), s.consecutiveErrors)
assert.Equal(t, int64(0), s.consecutiveEmpty)
}
// TestPoller_CalculateInterval verifies the per-worker exponential backoff
// math is correctly driven by the worker's own counters.
func TestPoller_CalculateInterval(t *testing.T) {
cfg, err := config.LoadDefault("")
require.NoError(t, err)
cfg.Runner.FetchInterval = 2 * time.Second
cfg.Runner.FetchIntervalMax = 60 * time.Second
p := &Poller{cfg: cfg}
cases := []struct {
name string
empty, errs int64
wantInterval time.Duration
}{
{"first poll, no backoff", 0, 0, 2 * time.Second},
{"single empty, still base", 1, 0, 2 * time.Second},
{"two empties, doubled", 2, 0, 4 * time.Second},
{"five empties, capped path", 5, 0, 32 * time.Second},
{"many empties, capped at max", 20, 0, 60 * time.Second},
{"errors drive backoff too", 0, 3, 8 * time.Second},
{"max(empty, errors) wins", 2, 4, 16 * time.Second},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
s := &workerState{consecutiveEmpty: tc.empty, consecutiveErrors: tc.errs}
assert.Equal(t, tc.wantInterval, p.calculateInterval(s))
})
}
}

View File

@@ -464,18 +464,16 @@ func (r *Reporter) ReportState(reportResult bool) error {
return true
})
r.stateMu.RLock()
changed := r.stateChanged
r.stateMu.RUnlock()
// Early return avoids the expensive proto.Clone on the common no-op path.
if !reportResult && !changed && len(outputs) == 0 {
// Consume stateChanged atomically with the snapshot; restored on error
// below so a concurrent Fire() during UpdateTask isn't silently lost.
r.stateMu.Lock()
if !reportResult && !r.stateChanged && len(outputs) == 0 {
r.stateMu.Unlock()
return nil
}
r.stateMu.RLock()
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateMu.RUnlock()
r.stateChanged = false
r.stateMu.Unlock()
if !reportResult {
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
@@ -486,13 +484,12 @@ func (r *Reporter) ReportState(reportResult bool) error {
Outputs: outputs,
}))
if err != nil {
r.stateMu.Lock()
r.stateChanged = true
r.stateMu.Unlock()
return err
}
r.stateMu.Lock()
r.stateChanged = false
r.stateMu.Unlock()
for _, k := range resp.Msg.SentOutputs {
r.outputs.Store(k, struct{}{})
}

View File

@@ -442,6 +442,112 @@ func TestReporter_BatchSizeFlush(t *testing.T) {
"batch size threshold should have triggered immediate flush")
}
// TestReporter_StateChangedNotLostDuringReport asserts that a Fire() arriving
// mid-UpdateTask re-dirties the flag so the change is picked up by the next report.
func TestReporter_StateChangedNotLostDuringReport(t *testing.T) {
var updateTaskCalls atomic.Int64
inFlight := make(chan struct{})
release := make(chan struct{})
client := mocks.NewClient(t)
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
n := updateTaskCalls.Add(1)
if n == 1 {
// Signal that the first UpdateTask is in flight, then block until released.
close(inFlight)
<-release
}
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
cfg, _ := config.LoadDefault("")
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(2)
// Mark stateChanged=true so the first ReportState proceeds to UpdateTask.
reporter.stateMu.Lock()
reporter.stateChanged = true
reporter.stateMu.Unlock()
// Kick off the first ReportState in a goroutine — it will block in UpdateTask.
done := make(chan error, 1)
go func() {
done <- reporter.ReportState(false)
}()
// Wait until UpdateTask is in flight (snapshot taken, flag consumed).
<-inFlight
// Concurrent Fire() modifies state — must re-flip stateChanged so the
// change is not lost when the in-flight ReportState finishes.
require.NoError(t, reporter.Fire(&log.Entry{
Message: "step starts",
Data: log.Fields{"stage": "Main", "stepNumber": 1, "raw_output": true},
}))
// Release the in-flight UpdateTask and wait for it to return.
close(release)
require.NoError(t, <-done)
// stateChanged must still be true so the next ReportState picks up the
// concurrent Fire()'s change instead of skipping via the early-return path.
reporter.stateMu.RLock()
changed := reporter.stateChanged
reporter.stateMu.RUnlock()
assert.True(t, changed, "stateChanged must remain true after a concurrent Fire() during in-flight ReportState")
// And the next ReportState must actually send a second UpdateTask.
require.NoError(t, reporter.ReportState(false))
assert.Equal(t, int64(2), updateTaskCalls.Load(), "concurrent Fire() change must trigger a second UpdateTask, not be silently lost")
}
// TestReporter_StateChangedRestoredOnError verifies that when UpdateTask fails,
// the dirty flag is restored so the snapshotted change isn't silently lost.
func TestReporter_StateChangedRestoredOnError(t *testing.T) {
var updateTaskCalls atomic.Int64
client := mocks.NewClient(t)
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
n := updateTaskCalls.Add(1)
if n == 1 {
return nil, errors.New("transient network error")
}
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
cfg, _ := config.LoadDefault("")
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(1)
reporter.stateMu.Lock()
reporter.stateChanged = true
reporter.stateMu.Unlock()
// First ReportState fails — flag must be restored to true.
require.Error(t, reporter.ReportState(false))
reporter.stateMu.RLock()
changed := reporter.stateChanged
reporter.stateMu.RUnlock()
assert.True(t, changed, "stateChanged must be restored to true after UpdateTask error so the change is retried")
// The next ReportState should still issue a request because the flag was restored.
require.NoError(t, reporter.ReportState(false))
assert.Equal(t, int64(2), updateTaskCalls.Load())
}
// TestReporter_StateNotifyFlush verifies that step transitions trigger
// an immediate state flush via the stateNotify channel.
func TestReporter_StateNotifyFlush(t *testing.T) {