mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-05-07 15:53:24 +02:00
perf: reduce runner-to-server connection load with adaptive reporting and polling (#819)
## Summary Many teams self-host Gitea + Act Runner at scale. The current runner design causes excessive HTTP requests to the Gitea server, leading to high server load. This PR addresses three root causes: aggressive fixed-interval polling, per-task status reporting every 1 second regardless of activity, and unoptimized HTTP client configuration. ## Problem The original architecture has these issues: **1. Fixed 1-second reporting interval (RunDaemon)** - Every running task calls ReportLog + ReportState every 1 second (2 HTTP requests/sec/task) - These requests are sent even when there are no new log rows or state changes - With 200 runners × 3 tasks each = **1,200 req/sec just for status reporting** **2. Fixed 2-second polling interval (no backoff)** - Idle runners poll FetchTask every 2 seconds forever, even when no jobs are queued - No exponential backoff or jitter — all runners can synchronize after network recovery (thundering herd) - 200 idle runners = **100 req/sec doing nothing useful** **3. HTTP client not tuned** - Uses http.DefaultClient with MaxIdleConnsPerHost=2, causing frequent TCP/TLS reconnects - Creates two separate http.Client instances (one for Ping, one for Runner service) instead of sharing **Total: ~1,300 req/sec for 200 runners with 3 tasks each** ## Solution ### Adaptive Event-Driven Log Reporting Replace the recursive `time.AfterFunc(1s)` pattern in RunDaemon with a goroutine-based select event loop using three trigger mechanisms: | Trigger | Default | Purpose | |---------|---------|---------| | `log_report_max_latency` | 3s | Guarantee even a single log line is delivered within this time | | `log_report_interval` | 5s | Periodic sweep — steady-state cadence | | `log_report_batch_size` | 100 rows | Immediate flush during bursty output (e.g., npm install) | **Key design**: `log_report_max_latency` (3s) must be less than `log_report_interval` (5s) so the max-latency timer fires before the periodic ticker for single-line scenarios. State reporting is decoupled to its own `state_report_interval` (default 5s), with immediate flush on step transitions (start/stop) via a stateNotify channel for responsive frontend UX. Additionally: - Skip ReportLog when `len(rows) == 0` (no pending log rows) - Skip ReportState when `stateChanged == false && len(outputs) == 0` (nothing changed) - Move expensive `proto.Clone` after the early-return check to avoid deep copies on no-op paths ### Polling Backoff with Jitter Replace fixed `rate.Limiter` with adaptive exponential backoff: - Track `consecutiveEmpty` and `consecutiveErrors` counters - Interval doubles with each empty/error response: `base × 2^(n-1)`, capped at `fetch_interval_max` (default 60s) - Add ±20% random jitter to prevent thundering herd - Fetch first, sleep after ��� preserves burst=1 behavior for immediate first fetch on startup and after task completion ### HTTP Client Tuning - Configure custom `http.Transport` with `MaxIdleConnsPerHost=10` (was 2) - Share a single `http.Client` between PingService and RunnerService - Add `IdleConnTimeout=90s` for clean connection lifecycle ## Load Reduction For 200 runners × 3 tasks (70% with active log output): | Component | Before | After | Reduction | |-----------|--------|-------|-----------| | Polling (idle) | 100 req/s | ~3.4 req/s | 97% | | Log reporting | 420 req/s | ~84 req/s | 80% | | State reporting | 126 req/s | ~25 req/s | 80% | | **Total** | **~1,300 req/s** | **~113 req/s** | **~91%** | ## Frontend UX Impact | Scenario | Before | After | Notes | |----------|--------|-------|-------| | Continuous output (npm install) | ~1s | ~5s | Periodic ticker sweep | | Single line then silence | ~1s | ≤3s | maxLatencyTimer guarantee | | Bursty output (100+ lines) | ~1s | <1s | Batch size immediate flush | | Step start/stop | ~1s | <1s | stateNotify immediate flush | | Job completion | ~1s | ~1s | Close() retry unchanged | ## New Configuration Options All have safe defaults — existing config files need no changes: ```yaml runner: fetch_interval_max: 60s # Max backoff interval when idle log_report_interval: 5s # Periodic log flush interval log_report_max_latency: 3s # Max time a log row waits (must be < log_report_interval) log_report_batch_size: 100 # Immediate flush threshold state_report_interval: 5s # State flush interval (step transitions are always immediate) ``` Config validation warns on invalid combinations: - `fetch_interval_max < fetch_interval` → auto-corrected - `log_report_max_latency >= log_report_interval` → warning (timer would be redundant) ## Test Plan - [x] `go build ./...` passes - [x] `go test ./...` passes (all existing + 3 new tests) - [x] `golangci-lint run` — 0 issues - [x] TestReporter_MaxLatencyTimer — verifies single log line flushed by maxLatencyTimer before logTicker - [x] TestReporter_BatchSizeFlush — verifies batch size threshold triggers immediate flush - [x] TestReporter_StateNotifyFlush — verifies step transition triggers immediate state flush - [x] TestReporter_EphemeralRunnerDeletion — verifies Close/RunDaemon race safety - [x] TestReporter_RunDaemonClose_Race — verifies concurrent Close safety Reviewed-on: https://gitea.com/gitea/act_runner/pulls/819 Reviewed-by: Nicolas <173651+bircni@noreply.gitea.com> Co-authored-by: Bo-Yi Wu <appleboy.tw@gmail.com> Co-committed-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
@@ -6,8 +6,9 @@ package report
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -21,6 +22,7 @@ import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
|
||||
"gitea.com/gitea/act_runner/internal/pkg/config"
|
||||
)
|
||||
|
||||
func TestReporter_parseLogRow(t *testing.T) {
|
||||
@@ -175,9 +177,10 @@ func TestReporter_Fire(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
|
||||
Context: taskCtx,
|
||||
})
|
||||
}, cfg)
|
||||
reporter.RunDaemon()
|
||||
defer func() {
|
||||
require.NoError(t, reporter.Close(""))
|
||||
@@ -252,7 +255,8 @@ func TestReporter_EphemeralRunnerDeletion(t *testing.T) {
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx})
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
|
||||
// Fire a log entry to create pending data
|
||||
@@ -315,23 +319,281 @@ func TestReporter_RunDaemonClose_Race(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
|
||||
Context: taskCtx,
|
||||
})
|
||||
}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
|
||||
// Start the daemon loop in a separate goroutine.
|
||||
// RunDaemon reads r.closed and reschedules itself via time.AfterFunc.
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
reporter.RunDaemon()
|
||||
})
|
||||
// Start the daemon loop — RunDaemon spawns a goroutine internally.
|
||||
reporter.RunDaemon()
|
||||
|
||||
// Close concurrently — this races with RunDaemon on r.closed.
|
||||
// Close concurrently — this races with the daemon goroutine on r.closed.
|
||||
require.NoError(t, reporter.Close(""))
|
||||
|
||||
// Cancel context so pending AfterFunc callbacks exit quickly.
|
||||
// Cancel context so the daemon goroutine exits cleanly.
|
||||
cancel()
|
||||
wg.Wait()
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
// TestReporter_MaxLatencyTimer verifies that the maxLatencyTimer flushes a
|
||||
// single buffered log row before the periodic logTicker fires.
|
||||
//
|
||||
// Setup: logReportInterval=10s (effectively never), maxLatency=100ms.
|
||||
// Fire one log line, then assert UpdateLog is called within 500ms.
|
||||
func TestReporter_MaxLatencyTimer(t *testing.T) {
|
||||
var updateLogCalls atomic.Int64
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||
updateLogCalls.Add(1)
|
||||
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
|
||||
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
|
||||
}), nil
|
||||
},
|
||||
)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Custom config: logTicker=10s (won't fire during test), maxLatency=100ms
|
||||
cfg, _ := config.LoadDefault("")
|
||||
cfg.Runner.LogReportInterval = 10 * time.Second
|
||||
cfg.Runner.LogReportMaxLatency = 100 * time.Millisecond
|
||||
cfg.Runner.LogReportBatchSize = 1000 // won't trigger batch flush
|
||||
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
reporter.RunDaemon()
|
||||
defer func() {
|
||||
_ = reporter.Close("")
|
||||
}()
|
||||
|
||||
// Fire a single log line — not enough to trigger batch flush
|
||||
require.NoError(t, reporter.Fire(&log.Entry{
|
||||
Message: "single log line",
|
||||
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
|
||||
}))
|
||||
|
||||
// maxLatencyTimer should flush within ~100ms. Wait up to 500ms.
|
||||
assert.Eventually(t, func() bool {
|
||||
return updateLogCalls.Load() > 0
|
||||
}, 500*time.Millisecond, 10*time.Millisecond,
|
||||
"maxLatencyTimer should have flushed the log before logTicker (10s)")
|
||||
}
|
||||
|
||||
// TestReporter_BatchSizeFlush verifies that reaching logBatchSize triggers
|
||||
// an immediate log flush without waiting for any timer.
|
||||
func TestReporter_BatchSizeFlush(t *testing.T) {
|
||||
var updateLogCalls atomic.Int64
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||
updateLogCalls.Add(1)
|
||||
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
|
||||
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
|
||||
}), nil
|
||||
},
|
||||
)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Custom config: large timers, small batch size
|
||||
cfg, _ := config.LoadDefault("")
|
||||
cfg.Runner.LogReportInterval = 10 * time.Second
|
||||
cfg.Runner.LogReportMaxLatency = 10 * time.Second
|
||||
cfg.Runner.LogReportBatchSize = 5
|
||||
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
reporter.RunDaemon()
|
||||
defer func() {
|
||||
_ = reporter.Close("")
|
||||
}()
|
||||
|
||||
// Fire exactly batchSize log lines
|
||||
for i := range 5 {
|
||||
require.NoError(t, reporter.Fire(&log.Entry{
|
||||
Message: fmt.Sprintf("log line %d", i),
|
||||
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
|
||||
}))
|
||||
}
|
||||
|
||||
// Batch threshold should trigger immediate flush
|
||||
assert.Eventually(t, func() bool {
|
||||
return updateLogCalls.Load() > 0
|
||||
}, 500*time.Millisecond, 10*time.Millisecond,
|
||||
"batch size threshold should have triggered immediate flush")
|
||||
}
|
||||
|
||||
// TestReporter_StateChangedNotLostDuringReport asserts that a Fire() arriving
|
||||
// mid-UpdateTask re-dirties the flag so the change is picked up by the next report.
|
||||
func TestReporter_StateChangedNotLostDuringReport(t *testing.T) {
|
||||
var updateTaskCalls atomic.Int64
|
||||
inFlight := make(chan struct{})
|
||||
release := make(chan struct{})
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
n := updateTaskCalls.Add(1)
|
||||
if n == 1 {
|
||||
// Signal that the first UpdateTask is in flight, then block until released.
|
||||
close(inFlight)
|
||||
<-release
|
||||
}
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(2)
|
||||
|
||||
// Mark stateChanged=true so the first ReportState proceeds to UpdateTask.
|
||||
reporter.stateMu.Lock()
|
||||
reporter.stateChanged = true
|
||||
reporter.stateMu.Unlock()
|
||||
|
||||
// Kick off the first ReportState in a goroutine — it will block in UpdateTask.
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- reporter.ReportState(false)
|
||||
}()
|
||||
|
||||
// Wait until UpdateTask is in flight (snapshot taken, flag consumed).
|
||||
<-inFlight
|
||||
|
||||
// Concurrent Fire() modifies state — must re-flip stateChanged so the
|
||||
// change is not lost when the in-flight ReportState finishes.
|
||||
require.NoError(t, reporter.Fire(&log.Entry{
|
||||
Message: "step starts",
|
||||
Data: log.Fields{"stage": "Main", "stepNumber": 1, "raw_output": true},
|
||||
}))
|
||||
|
||||
// Release the in-flight UpdateTask and wait for it to return.
|
||||
close(release)
|
||||
require.NoError(t, <-done)
|
||||
|
||||
// stateChanged must still be true so the next ReportState picks up the
|
||||
// concurrent Fire()'s change instead of skipping via the early-return path.
|
||||
reporter.stateMu.RLock()
|
||||
changed := reporter.stateChanged
|
||||
reporter.stateMu.RUnlock()
|
||||
assert.True(t, changed, "stateChanged must remain true after a concurrent Fire() during in-flight ReportState")
|
||||
|
||||
// And the next ReportState must actually send a second UpdateTask.
|
||||
require.NoError(t, reporter.ReportState(false))
|
||||
assert.Equal(t, int64(2), updateTaskCalls.Load(), "concurrent Fire() change must trigger a second UpdateTask, not be silently lost")
|
||||
}
|
||||
|
||||
// TestReporter_StateChangedRestoredOnError verifies that when UpdateTask fails,
|
||||
// the dirty flag is restored so the snapshotted change isn't silently lost.
|
||||
func TestReporter_StateChangedRestoredOnError(t *testing.T) {
|
||||
var updateTaskCalls atomic.Int64
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
n := updateTaskCalls.Add(1)
|
||||
if n == 1 {
|
||||
return nil, errors.New("transient network error")
|
||||
}
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
|
||||
reporter.stateMu.Lock()
|
||||
reporter.stateChanged = true
|
||||
reporter.stateMu.Unlock()
|
||||
|
||||
// First ReportState fails — flag must be restored to true.
|
||||
require.Error(t, reporter.ReportState(false))
|
||||
|
||||
reporter.stateMu.RLock()
|
||||
changed := reporter.stateChanged
|
||||
reporter.stateMu.RUnlock()
|
||||
assert.True(t, changed, "stateChanged must be restored to true after UpdateTask error so the change is retried")
|
||||
|
||||
// The next ReportState should still issue a request because the flag was restored.
|
||||
require.NoError(t, reporter.ReportState(false))
|
||||
assert.Equal(t, int64(2), updateTaskCalls.Load())
|
||||
}
|
||||
|
||||
// TestReporter_StateNotifyFlush verifies that step transitions trigger
|
||||
// an immediate state flush via the stateNotify channel.
|
||||
func TestReporter_StateNotifyFlush(t *testing.T) {
|
||||
var updateTaskCalls atomic.Int64
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateLog", mock.Anything, mock.Anything).Maybe().Return(
|
||||
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
|
||||
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
|
||||
}), nil
|
||||
},
|
||||
)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
updateTaskCalls.Add(1)
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Custom config: large state interval so only stateNotify can trigger
|
||||
cfg, _ := config.LoadDefault("")
|
||||
cfg.Runner.StateReportInterval = 10 * time.Second
|
||||
cfg.Runner.LogReportInterval = 10 * time.Second
|
||||
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
reporter.RunDaemon()
|
||||
defer func() {
|
||||
_ = reporter.Close("")
|
||||
}()
|
||||
|
||||
// Fire a log entry that starts a step — this triggers stateNotify
|
||||
require.NoError(t, reporter.Fire(&log.Entry{
|
||||
Message: "step starting",
|
||||
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
|
||||
}))
|
||||
|
||||
// stateNotify should trigger immediate UpdateTask call
|
||||
assert.Eventually(t, func() bool {
|
||||
return updateTaskCalls.Load() > 0
|
||||
}, 500*time.Millisecond, 10*time.Millisecond,
|
||||
"step transition should have triggered immediate state flush via stateNotify")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user