From ec07b8c00b788e43b5a02a0bbe508c3dcb3a5901 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Fri, 10 Apr 2026 22:41:38 +0800 Subject: [PATCH] perf: reduce runner-to-server connection load with adaptive reporting and polling - Replace fixed 1s RunDaemon timer with event-driven select loop using separate log (3s) and state (5s) tickers for periodic flush - Add batch-size threshold (default 100 rows) to flush logs immediately during bursty output like npm install - Add max-latency timer (default 5s) to guarantee single log lines are delivered within a bounded time - Trigger immediate flush on step transitions (start/stop) and job result for responsive frontend UX - Skip ReportLog when no pending rows and ReportState when state is unchanged to eliminate no-op HTTP requests - Replace fixed-rate polling with exponential backoff and jitter to prevent thundering herd on idle runners - Tune HTTP client with MaxIdleConnsPerHost=10 and share a single http.Client between Ping and Runner service clients - Add configurable options: log_report_interval, log_report_max_latency, log_report_batch_size, state_report_interval, fetch_interval_max Co-Authored-By: Claude Opus 4.6 (1M context) --- go.mod | 2 +- internal/app/poll/poller.go | 72 +++++++-- internal/app/run/runner.go | 2 +- internal/pkg/client/http.go | 21 +-- internal/pkg/config/config.example.yaml | 16 ++ internal/pkg/config/config.go | 43 +++-- internal/pkg/report/reporter.go | 205 ++++++++++++++++++++---- internal/pkg/report/reporter_test.go | 26 ++- 8 files changed, 304 insertions(+), 83 deletions(-) diff --git a/go.mod b/go.mod index f4d2b071..6256b528 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/stretchr/testify v1.11.1 go.yaml.in/yaml/v4 v4.0.0-rc.3 golang.org/x/term v0.40.0 - golang.org/x/time v0.14.0 + golang.org/x/time v0.14.0 // indirect google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.5.2 diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index e5f29e47..c06c143f 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -7,13 +7,14 @@ import ( "context" "errors" "fmt" + "math/rand/v2" "sync" "sync/atomic" + "time" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" "connectrpc.com/connect" log "github.com/sirupsen/logrus" - "golang.org/x/time/rate" "gitea.com/gitea/act_runner/internal/app/run" "gitea.com/gitea/act_runner/internal/pkg/client" @@ -33,6 +34,9 @@ type Poller struct { shutdownJobs context.CancelFunc done chan struct{} + + consecutiveEmpty atomic.Int64 // count of consecutive polls with no task available + consecutiveErrors atomic.Int64 // count of consecutive fetch errors } func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { @@ -58,11 +62,10 @@ func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { } func (p *Poller) Poll() { - limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) wg := &sync.WaitGroup{} for i := 0; i < p.cfg.Runner.Capacity; i++ { wg.Add(1) - go p.poll(wg, limiter) + go p.poll(wg) } wg.Wait() @@ -71,9 +74,7 @@ func (p *Poller) Poll() { } func (p *Poller) PollOnce() { - limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) - - p.pollOnce(limiter) + p.pollOnce() // signal that we're done close(p.done) @@ -108,10 +109,10 @@ func (p *Poller) Shutdown(ctx context.Context) error { } } -func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) { +func (p *Poller) poll(wg *sync.WaitGroup) { defer wg.Done() for { - p.pollOnce(limiter) + p.pollOnce() select { case <-p.pollingCtx.Done(): @@ -122,19 +123,58 @@ func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) { } } -func (p *Poller) pollOnce(limiter *rate.Limiter) { +// calculateInterval returns the polling interval with exponential backoff based on +// consecutive empty or error responses. The interval starts at FetchInterval and +// doubles with each consecutive empty/error, capped at FetchIntervalMax. +func (p *Poller) calculateInterval() time.Duration { + base := p.cfg.Runner.FetchInterval + maxInterval := p.cfg.Runner.FetchIntervalMax + + n := max(p.consecutiveEmpty.Load(), p.consecutiveErrors.Load()) + if n <= 1 { + return base + } + + // Capped exponential backoff: base * 2^(n-1), max shift=5 so multiplier <= 32 + shift := min(n-1, 5) + interval := base * time.Duration(int64(1)<= r.logBatchSize { + _ = r.ReportLog(false) + r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer) + } else if !maxLatencyActive && n > 0 { + maxLatencyTimer.Reset(r.logReportMaxLatency) + maxLatencyActive = true + } + + case <-r.stateNotify: + // Step transition or job result — flush both immediately for frontend UX. + _ = r.ReportLog(false) + _ = r.ReportState(false) + r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer) + + case <-maxLatencyTimer.C: + maxLatencyActive = false + _ = r.ReportLog(false) + + case <-r.ctx.Done(): + close(r.daemon) + return + } + + r.stateMu.RLock() + closed := r.closed + r.stateMu.RUnlock() + if closed { + close(r.daemon) + return + } + } } func (r *Reporter) Logf(format string, a ...any) { @@ -268,6 +386,10 @@ func (r *Reporter) Close(lastWords string) error { }) } r.stateMu.Unlock() + + // Wake up the daemon loop so it detects closed promptly. + r.notifyLog() + // Wait for Acknowledge select { case <-r.daemon: @@ -295,6 +417,10 @@ func (r *Reporter) ReportLog(noMore bool) error { rows := r.logRows r.stateMu.RUnlock() + if !noMore && len(rows) == 0 { + return nil + } + resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{ TaskId: r.state.Id, Index: int64(r.logOffset), @@ -329,15 +455,7 @@ func (r *Reporter) ReportState(reportResult bool) error { r.clientM.Lock() defer r.clientM.Unlock() - r.stateMu.RLock() - state := proto.Clone(r.state).(*runnerv1.TaskState) - r.stateMu.RUnlock() - - // Only report result from Close to reliable sent logs - if !reportResult { - state.Result = runnerv1.Result_RESULT_UNSPECIFIED - } - + // Build the outputs map first (single Range pass instead of two). outputs := make(map[string]string) r.outputs.Range(func(k, v any) bool { if val, ok := v.(string); ok { @@ -346,6 +464,23 @@ func (r *Reporter) ReportState(reportResult bool) error { return true }) + r.stateMu.RLock() + changed := r.stateChanged + r.stateMu.RUnlock() + + // Early return avoids the expensive proto.Clone on the common no-op path. + if !reportResult && !changed && len(outputs) == 0 { + return nil + } + + r.stateMu.RLock() + state := proto.Clone(r.state).(*runnerv1.TaskState) + r.stateMu.RUnlock() + + if !reportResult { + state.Result = runnerv1.Result_RESULT_UNSPECIFIED + } + resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ State: state, Outputs: outputs, @@ -354,6 +489,10 @@ func (r *Reporter) ReportState(reportResult bool) error { return err } + r.stateMu.Lock() + r.stateChanged = false + r.stateMu.Unlock() + for _, k := range resp.Msg.SentOutputs { r.outputs.Store(k, struct{}{}) } diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index f617e296..030d3c9a 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -7,9 +7,7 @@ import ( "context" "errors" "strings" - "sync" "testing" - "time" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" connect_go "connectrpc.com/connect" @@ -21,6 +19,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "gitea.com/gitea/act_runner/internal/pkg/client/mocks" + "gitea.com/gitea/act_runner/internal/pkg/config" ) func TestReporter_parseLogRow(t *testing.T) { @@ -175,9 +174,10 @@ func TestReporter_Fire(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) + cfg, _ := config.LoadDefault("") reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ Context: taskCtx, - }) + }, cfg) reporter.RunDaemon() defer func() { require.NoError(t, reporter.Close("")) @@ -252,7 +252,8 @@ func TestReporter_EphemeralRunnerDeletion(t *testing.T) { defer cancel() taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) - reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}) + cfg, _ := config.LoadDefault("") + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) reporter.ResetSteps(1) // Fire a log entry to create pending data @@ -315,23 +316,18 @@ func TestReporter_RunDaemonClose_Race(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) taskCtx, err := structpb.NewStruct(map[string]any{}) require.NoError(t, err) + cfg, _ := config.LoadDefault("") reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ Context: taskCtx, - }) + }, cfg) reporter.ResetSteps(1) - // Start the daemon loop in a separate goroutine. - // RunDaemon reads r.closed and reschedules itself via time.AfterFunc. - var wg sync.WaitGroup - wg.Go(func() { - reporter.RunDaemon() - }) + // Start the daemon loop — RunDaemon spawns a goroutine internally. + reporter.RunDaemon() - // Close concurrently — this races with RunDaemon on r.closed. + // Close concurrently — this races with the daemon goroutine on r.closed. require.NoError(t, reporter.Close("")) - // Cancel context so pending AfterFunc callbacks exit quickly. + // Cancel context so the daemon goroutine exits cleanly. cancel() - wg.Wait() - time.Sleep(2 * time.Second) }