mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-05-08 00:03:24 +02:00
Compare commits
2 Commits
7031b3507d
...
1b9633ab2f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b9633ab2f | ||
|
|
2931fe9e48 |
@@ -34,9 +34,15 @@ type Poller struct {
|
||||
shutdownJobs context.CancelFunc
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
consecutiveEmpty atomic.Int64 // count of consecutive polls with no task available
|
||||
consecutiveErrors atomic.Int64 // count of consecutive fetch errors
|
||||
// workerState holds per-goroutine polling state. Backoff counters are
|
||||
// per-worker so that with Capacity > 1, N workers each seeing one empty
|
||||
// response don't combine into a "consecutive N empty" reading on a shared
|
||||
// counter and trigger an unnecessarily long backoff.
|
||||
type workerState struct {
|
||||
consecutiveEmpty int64
|
||||
consecutiveErrors int64
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
||||
@@ -74,7 +80,7 @@ func (p *Poller) Poll() {
|
||||
}
|
||||
|
||||
func (p *Poller) PollOnce() {
|
||||
p.pollOnce()
|
||||
p.pollOnce(&workerState{})
|
||||
|
||||
// signal that we're done
|
||||
close(p.done)
|
||||
@@ -111,8 +117,9 @@ func (p *Poller) Shutdown(ctx context.Context) error {
|
||||
|
||||
func (p *Poller) poll(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
s := &workerState{}
|
||||
for {
|
||||
p.pollOnce()
|
||||
p.pollOnce(s)
|
||||
|
||||
select {
|
||||
case <-p.pollingCtx.Done():
|
||||
@@ -126,11 +133,11 @@ func (p *Poller) poll(wg *sync.WaitGroup) {
|
||||
// calculateInterval returns the polling interval with exponential backoff based on
|
||||
// consecutive empty or error responses. The interval starts at FetchInterval and
|
||||
// doubles with each consecutive empty/error, capped at FetchIntervalMax.
|
||||
func (p *Poller) calculateInterval() time.Duration {
|
||||
func (p *Poller) calculateInterval(s *workerState) time.Duration {
|
||||
base := p.cfg.Runner.FetchInterval
|
||||
maxInterval := p.cfg.Runner.FetchIntervalMax
|
||||
|
||||
n := max(p.consecutiveEmpty.Load(), p.consecutiveErrors.Load())
|
||||
n := max(s.consecutiveEmpty, s.consecutiveErrors)
|
||||
if n <= 1 {
|
||||
return base
|
||||
}
|
||||
@@ -155,11 +162,11 @@ func addJitter(d time.Duration) time.Duration {
|
||||
return d + time.Duration(jitter)
|
||||
}
|
||||
|
||||
func (p *Poller) pollOnce() {
|
||||
func (p *Poller) pollOnce(s *workerState) {
|
||||
for {
|
||||
task, ok := p.fetchTask(p.pollingCtx)
|
||||
task, ok := p.fetchTask(p.pollingCtx, s)
|
||||
if !ok {
|
||||
interval := addJitter(p.calculateInterval())
|
||||
interval := addJitter(p.calculateInterval(s))
|
||||
timer := time.NewTimer(interval)
|
||||
select {
|
||||
case <-timer.C:
|
||||
@@ -171,8 +178,8 @@ func (p *Poller) pollOnce() {
|
||||
}
|
||||
|
||||
// Got a task — reset backoff counters for fast subsequent polling.
|
||||
p.consecutiveEmpty.Store(0)
|
||||
p.consecutiveErrors.Store(0)
|
||||
s.consecutiveEmpty = 0
|
||||
s.consecutiveErrors = 0
|
||||
|
||||
p.runTaskWithRecover(p.jobsCtx, task)
|
||||
return
|
||||
@@ -192,7 +199,7 @@ func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
||||
func (p *Poller) fetchTask(ctx context.Context, s *workerState) (*runnerv1.Task, bool) {
|
||||
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
|
||||
defer cancel()
|
||||
|
||||
@@ -206,15 +213,15 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
||||
}
|
||||
if err != nil {
|
||||
log.WithError(err).Error("failed to fetch task")
|
||||
p.consecutiveErrors.Add(1)
|
||||
s.consecutiveErrors++
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Successful response — reset error counter.
|
||||
p.consecutiveErrors.Store(0)
|
||||
s.consecutiveErrors = 0
|
||||
|
||||
if resp == nil || resp.Msg == nil {
|
||||
p.consecutiveEmpty.Add(1)
|
||||
s.consecutiveEmpty++
|
||||
return nil, false
|
||||
}
|
||||
|
||||
@@ -223,7 +230,7 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
||||
}
|
||||
|
||||
if resp.Msg.Task == nil {
|
||||
p.consecutiveEmpty.Add(1)
|
||||
s.consecutiveEmpty++
|
||||
return nil, false
|
||||
}
|
||||
|
||||
|
||||
108
internal/app/poll/poller_test.go
Normal file
108
internal/app/poll/poller_test.go
Normal file
@@ -0,0 +1,108 @@
|
||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package poll
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
connect_go "connectrpc.com/connect"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
|
||||
"gitea.com/gitea/act_runner/internal/pkg/config"
|
||||
)
|
||||
|
||||
// TestPoller_PerWorkerCounters verifies that each worker maintains its own
|
||||
// backoff counters. With a shared counter, N workers each seeing one empty
|
||||
// response would inflate the counter to N and trigger an unnecessarily long
|
||||
// backoff. With per-worker state, each worker only sees its own count.
|
||||
func TestPoller_PerWorkerCounters(t *testing.T) {
|
||||
client := mocks.NewClient(t)
|
||||
client.On("FetchTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
|
||||
// Always return an empty response.
|
||||
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
cfg, err := config.LoadDefault("")
|
||||
require.NoError(t, err)
|
||||
p := &Poller{client: client, cfg: cfg}
|
||||
|
||||
ctx := context.Background()
|
||||
s1 := &workerState{}
|
||||
s2 := &workerState{}
|
||||
|
||||
// Each worker independently observes one empty response.
|
||||
_, ok := p.fetchTask(ctx, s1)
|
||||
require.False(t, ok)
|
||||
_, ok = p.fetchTask(ctx, s2)
|
||||
require.False(t, ok)
|
||||
|
||||
assert.Equal(t, int64(1), s1.consecutiveEmpty, "worker 1 should only count its own empty response")
|
||||
assert.Equal(t, int64(1), s2.consecutiveEmpty, "worker 2 should only count its own empty response")
|
||||
|
||||
// Worker 1 sees a second empty; worker 2 stays at 1.
|
||||
_, ok = p.fetchTask(ctx, s1)
|
||||
require.False(t, ok)
|
||||
assert.Equal(t, int64(2), s1.consecutiveEmpty)
|
||||
assert.Equal(t, int64(1), s2.consecutiveEmpty, "worker 2's counter must not be affected by worker 1's empty fetches")
|
||||
}
|
||||
|
||||
// TestPoller_FetchErrorIncrementsErrorsOnly verifies that a fetch error
|
||||
// increments only the per-worker error counter, not the empty counter.
|
||||
func TestPoller_FetchErrorIncrementsErrorsOnly(t *testing.T) {
|
||||
client := mocks.NewClient(t)
|
||||
client.On("FetchTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
|
||||
return nil, errors.New("network unreachable")
|
||||
},
|
||||
)
|
||||
|
||||
cfg, err := config.LoadDefault("")
|
||||
require.NoError(t, err)
|
||||
p := &Poller{client: client, cfg: cfg}
|
||||
|
||||
s := &workerState{}
|
||||
_, ok := p.fetchTask(context.Background(), s)
|
||||
require.False(t, ok)
|
||||
assert.Equal(t, int64(1), s.consecutiveErrors)
|
||||
assert.Equal(t, int64(0), s.consecutiveEmpty)
|
||||
}
|
||||
|
||||
// TestPoller_CalculateInterval verifies the per-worker exponential backoff
|
||||
// math is correctly driven by the worker's own counters.
|
||||
func TestPoller_CalculateInterval(t *testing.T) {
|
||||
cfg, err := config.LoadDefault("")
|
||||
require.NoError(t, err)
|
||||
cfg.Runner.FetchInterval = 2 * time.Second
|
||||
cfg.Runner.FetchIntervalMax = 60 * time.Second
|
||||
p := &Poller{cfg: cfg}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
empty, errs int64
|
||||
wantInterval time.Duration
|
||||
}{
|
||||
{"first poll, no backoff", 0, 0, 2 * time.Second},
|
||||
{"single empty, still base", 1, 0, 2 * time.Second},
|
||||
{"two empties, doubled", 2, 0, 4 * time.Second},
|
||||
{"five empties, capped path", 5, 0, 32 * time.Second},
|
||||
{"many empties, capped at max", 20, 0, 60 * time.Second},
|
||||
{"errors drive backoff too", 0, 3, 8 * time.Second},
|
||||
{"max(empty, errors) wins", 2, 4, 16 * time.Second},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := &workerState{consecutiveEmpty: tc.empty, consecutiveErrors: tc.errs}
|
||||
assert.Equal(t, tc.wantInterval, p.calculateInterval(s))
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -464,18 +464,16 @@ func (r *Reporter) ReportState(reportResult bool) error {
|
||||
return true
|
||||
})
|
||||
|
||||
r.stateMu.RLock()
|
||||
changed := r.stateChanged
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
// Early return avoids the expensive proto.Clone on the common no-op path.
|
||||
if !reportResult && !changed && len(outputs) == 0 {
|
||||
// Consume stateChanged atomically with the snapshot; restored on error
|
||||
// below so a concurrent Fire() during UpdateTask isn't silently lost.
|
||||
r.stateMu.Lock()
|
||||
if !reportResult && !r.stateChanged && len(outputs) == 0 {
|
||||
r.stateMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
r.stateMu.RLock()
|
||||
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
||||
r.stateMu.RUnlock()
|
||||
r.stateChanged = false
|
||||
r.stateMu.Unlock()
|
||||
|
||||
if !reportResult {
|
||||
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
|
||||
@@ -486,13 +484,12 @@ func (r *Reporter) ReportState(reportResult bool) error {
|
||||
Outputs: outputs,
|
||||
}))
|
||||
if err != nil {
|
||||
r.stateMu.Lock()
|
||||
r.stateChanged = true
|
||||
r.stateMu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
r.stateMu.Lock()
|
||||
r.stateChanged = false
|
||||
r.stateMu.Unlock()
|
||||
|
||||
for _, k := range resp.Msg.SentOutputs {
|
||||
r.outputs.Store(k, struct{}{})
|
||||
}
|
||||
|
||||
@@ -442,6 +442,112 @@ func TestReporter_BatchSizeFlush(t *testing.T) {
|
||||
"batch size threshold should have triggered immediate flush")
|
||||
}
|
||||
|
||||
// TestReporter_StateChangedNotLostDuringReport asserts that a Fire() arriving
|
||||
// mid-UpdateTask re-dirties the flag so the change is picked up by the next report.
|
||||
func TestReporter_StateChangedNotLostDuringReport(t *testing.T) {
|
||||
var updateTaskCalls atomic.Int64
|
||||
inFlight := make(chan struct{})
|
||||
release := make(chan struct{})
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
n := updateTaskCalls.Add(1)
|
||||
if n == 1 {
|
||||
// Signal that the first UpdateTask is in flight, then block until released.
|
||||
close(inFlight)
|
||||
<-release
|
||||
}
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(2)
|
||||
|
||||
// Mark stateChanged=true so the first ReportState proceeds to UpdateTask.
|
||||
reporter.stateMu.Lock()
|
||||
reporter.stateChanged = true
|
||||
reporter.stateMu.Unlock()
|
||||
|
||||
// Kick off the first ReportState in a goroutine — it will block in UpdateTask.
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- reporter.ReportState(false)
|
||||
}()
|
||||
|
||||
// Wait until UpdateTask is in flight (snapshot taken, flag consumed).
|
||||
<-inFlight
|
||||
|
||||
// Concurrent Fire() modifies state — must re-flip stateChanged so the
|
||||
// change is not lost when the in-flight ReportState finishes.
|
||||
require.NoError(t, reporter.Fire(&log.Entry{
|
||||
Message: "step starts",
|
||||
Data: log.Fields{"stage": "Main", "stepNumber": 1, "raw_output": true},
|
||||
}))
|
||||
|
||||
// Release the in-flight UpdateTask and wait for it to return.
|
||||
close(release)
|
||||
require.NoError(t, <-done)
|
||||
|
||||
// stateChanged must still be true so the next ReportState picks up the
|
||||
// concurrent Fire()'s change instead of skipping via the early-return path.
|
||||
reporter.stateMu.RLock()
|
||||
changed := reporter.stateChanged
|
||||
reporter.stateMu.RUnlock()
|
||||
assert.True(t, changed, "stateChanged must remain true after a concurrent Fire() during in-flight ReportState")
|
||||
|
||||
// And the next ReportState must actually send a second UpdateTask.
|
||||
require.NoError(t, reporter.ReportState(false))
|
||||
assert.Equal(t, int64(2), updateTaskCalls.Load(), "concurrent Fire() change must trigger a second UpdateTask, not be silently lost")
|
||||
}
|
||||
|
||||
// TestReporter_StateChangedRestoredOnError verifies that when UpdateTask fails,
|
||||
// the dirty flag is restored so the snapshotted change isn't silently lost.
|
||||
func TestReporter_StateChangedRestoredOnError(t *testing.T) {
|
||||
var updateTaskCalls atomic.Int64
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
n := updateTaskCalls.Add(1)
|
||||
if n == 1 {
|
||||
return nil, errors.New("transient network error")
|
||||
}
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
|
||||
reporter.stateMu.Lock()
|
||||
reporter.stateChanged = true
|
||||
reporter.stateMu.Unlock()
|
||||
|
||||
// First ReportState fails — flag must be restored to true.
|
||||
require.Error(t, reporter.ReportState(false))
|
||||
|
||||
reporter.stateMu.RLock()
|
||||
changed := reporter.stateChanged
|
||||
reporter.stateMu.RUnlock()
|
||||
assert.True(t, changed, "stateChanged must be restored to true after UpdateTask error so the change is retried")
|
||||
|
||||
// The next ReportState should still issue a request because the flag was restored.
|
||||
require.NoError(t, reporter.ReportState(false))
|
||||
assert.Equal(t, int64(2), updateTaskCalls.Load())
|
||||
}
|
||||
|
||||
// TestReporter_StateNotifyFlush verifies that step transitions trigger
|
||||
// an immediate state flush via the stateNotify channel.
|
||||
func TestReporter_StateNotifyFlush(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user