From a60030390d17ec7dfd6f971a5d9ec97ee39936f9 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Thu, 16 Apr 2026 15:11:50 +0800 Subject: [PATCH] test(poll): add concurrency test for single-poller capacity control - Introduce TaskRunner interface to decouple Poller from concrete run.Runner - Add TestPoller_ConcurrencyLimitedByCapacity verifying max concurrent tasks respects capacity and FetchTask is never called concurrently - Mock runner respects context cancellation for proper shutdown testing --- internal/app/poll/poller.go | 11 +++- internal/app/poll/poller_test.go | 95 ++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 3 deletions(-) diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index e14a29d1..20d88e1f 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -12,7 +12,6 @@ import ( "sync/atomic" "time" - "gitea.com/gitea/act_runner/internal/app/run" "gitea.com/gitea/act_runner/internal/pkg/client" "gitea.com/gitea/act_runner/internal/pkg/config" "gitea.com/gitea/act_runner/internal/pkg/metrics" @@ -22,9 +21,15 @@ import ( log "github.com/sirupsen/logrus" ) +// TaskRunner abstracts task execution so the poller can be tested +// without a real runner. +type TaskRunner interface { + Run(ctx context.Context, task *runnerv1.Task) error +} + type Poller struct { client client.Client - runner *run.Runner + runner TaskRunner cfg *config.Config tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea. @@ -49,7 +54,7 @@ type workerState struct { lastBackoff time.Duration } -func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { +func New(cfg *config.Config, client client.Client, runner TaskRunner) *Poller { pollingCtx, shutdownPolling := context.WithCancel(context.Background()) jobsCtx, shutdownJobs := context.WithCancel(context.Background()) diff --git a/internal/app/poll/poller_test.go b/internal/app/poll/poller_test.go index 5c450a0b..ce8be585 100644 --- a/internal/app/poll/poller_test.go +++ b/internal/app/poll/poller_test.go @@ -6,6 +6,8 @@ package poll import ( "context" "errors" + "sync" + "sync/atomic" "testing" "time" @@ -105,3 +107,96 @@ func TestPoller_CalculateInterval(t *testing.T) { }) } } + +// atomicMax atomically updates target to max(target, val). +func atomicMax(target *atomic.Int64, val int64) { + for { + old := target.Load() + if val <= old || target.CompareAndSwap(old, val) { + break + } + } +} + +type mockRunner struct { + delay time.Duration + running atomic.Int64 + maxConcurrent atomic.Int64 + totalCompleted atomic.Int64 +} + +func (m *mockRunner) Run(ctx context.Context, _ *runnerv1.Task) error { + atomicMax(&m.maxConcurrent, m.running.Add(1)) + select { + case <-time.After(m.delay): + case <-ctx.Done(): + } + m.running.Add(-1) + m.totalCompleted.Add(1) + return nil +} + +// TestPoller_ConcurrencyLimitedByCapacity verifies that with capacity=3 and +// 6 available tasks, at most 3 tasks run concurrently, and FetchTask is +// never called concurrently (single poller). +func TestPoller_ConcurrencyLimitedByCapacity(t *testing.T) { + const ( + capacity = 3 + totalTasks = 6 + taskDelay = 50 * time.Millisecond + ) + + var ( + tasksReturned atomic.Int64 + fetchConcur atomic.Int64 + maxFetchConcur atomic.Int64 + ) + + cli := mocks.NewClient(t) + cli.On("FetchTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) { + atomicMax(&maxFetchConcur, fetchConcur.Add(1)) + defer fetchConcur.Add(-1) + + n := tasksReturned.Add(1) + if n <= totalTasks { + return connect_go.NewResponse(&runnerv1.FetchTaskResponse{ + Task: &runnerv1.Task{Id: n}, + }), nil + } + return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil + }, + ) + + runner := &mockRunner{delay: taskDelay} + + cfg, err := config.LoadDefault("") + require.NoError(t, err) + cfg.Runner.Capacity = capacity + cfg.Runner.FetchInterval = 10 * time.Millisecond + cfg.Runner.FetchIntervalMax = 10 * time.Millisecond + + poller := New(cfg, cli, runner) + + var wg sync.WaitGroup + wg.Go(poller.Poll) + + require.Eventually(t, func() bool { + return runner.totalCompleted.Load() >= totalTasks + }, 2*time.Second, 10*time.Millisecond, "all tasks should complete") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + err = poller.Shutdown(ctx) + require.NoError(t, err) + wg.Wait() + + assert.LessOrEqual(t, runner.maxConcurrent.Load(), int64(capacity), + "concurrent running tasks must not exceed capacity") + assert.GreaterOrEqual(t, runner.maxConcurrent.Load(), int64(2), + "with 6 tasks and capacity 3, at least 2 should overlap") + assert.Equal(t, int64(1), maxFetchConcur.Load(), + "FetchTask must never be called concurrently (single poller)") + assert.Equal(t, int64(totalTasks), runner.totalCompleted.Load(), + "all tasks should have been executed") +}