mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-05-08 16:23:23 +02:00
perf: reduce runner-to-server connection load with adaptive reporting and polling
- Replace fixed 1s RunDaemon timer with event-driven select loop using separate log (3s) and state (5s) tickers for periodic flush - Add batch-size threshold (default 100 rows) to flush logs immediately during bursty output like npm install - Add max-latency timer (default 5s) to guarantee single log lines are delivered within a bounded time - Trigger immediate flush on step transitions (start/stop) and job result for responsive frontend UX - Skip ReportLog when no pending rows and ReportState when state is unchanged to eliminate no-op HTTP requests - Replace fixed-rate polling with exponential backoff and jitter to prevent thundering herd on idle runners - Tune HTTP client with MaxIdleConnsPerHost=10 and share a single http.Client between Ping and Runner service clients - Add configurable options: log_report_interval, log_report_max_latency, log_report_batch_size, state_report_interval, fetch_interval_max Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"gitea.com/gitea/act_runner/internal/pkg/client"
|
||||
"gitea.com/gitea/act_runner/internal/pkg/config"
|
||||
)
|
||||
|
||||
type Reporter struct {
|
||||
@@ -35,16 +36,27 @@ type Reporter struct {
|
||||
logReplacer *strings.Replacer
|
||||
oldnew []string
|
||||
|
||||
state *runnerv1.TaskState
|
||||
stateMu sync.RWMutex
|
||||
outputs sync.Map
|
||||
daemon chan struct{}
|
||||
state *runnerv1.TaskState
|
||||
stateChanged bool
|
||||
stateMu sync.RWMutex
|
||||
outputs sync.Map
|
||||
daemon chan struct{}
|
||||
|
||||
// Adaptive batching control
|
||||
logReportInterval time.Duration
|
||||
logReportMaxLatency time.Duration
|
||||
logBatchSize int
|
||||
stateReportInterval time.Duration
|
||||
|
||||
// Event notification channels (non-blocking, buffered 1)
|
||||
logNotify chan struct{} // signal: new log rows arrived
|
||||
stateNotify chan struct{} // signal: step transition (start/stop)
|
||||
|
||||
debugOutputEnabled bool
|
||||
stopCommandEndToken string
|
||||
}
|
||||
|
||||
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter {
|
||||
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task, cfg *config.Config) *Reporter {
|
||||
var oldnew []string
|
||||
if v := task.Context.Fields["token"].GetStringValue(); v != "" {
|
||||
oldnew = append(oldnew, v, "***")
|
||||
@@ -57,11 +69,17 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
|
||||
}
|
||||
|
||||
rv := &Reporter{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
client: client,
|
||||
oldnew: oldnew,
|
||||
logReplacer: strings.NewReplacer(oldnew...),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
client: client,
|
||||
oldnew: oldnew,
|
||||
logReplacer: strings.NewReplacer(oldnew...),
|
||||
logReportInterval: cfg.Runner.LogReportInterval,
|
||||
logReportMaxLatency: cfg.Runner.LogReportMaxLatency,
|
||||
logBatchSize: cfg.Runner.LogReportBatchSize,
|
||||
stateReportInterval: cfg.Runner.StateReportInterval,
|
||||
logNotify: make(chan struct{}, 1),
|
||||
stateNotify: make(chan struct{}, 1),
|
||||
state: &runnerv1.TaskState{
|
||||
Id: task.Id,
|
||||
},
|
||||
@@ -108,11 +126,42 @@ func isJobStepEntry(entry *log.Entry) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *Reporter) Fire(entry *log.Entry) error {
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
// notifyLog sends a non-blocking signal that new log rows are available.
|
||||
func (r *Reporter) notifyLog() {
|
||||
select {
|
||||
case r.logNotify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
log.WithFields(entry.Data).Trace(entry.Message)
|
||||
// notifyState sends a non-blocking signal that a UX-critical state change occurred (step start/stop, job result).
|
||||
func (r *Reporter) notifyState() {
|
||||
select {
|
||||
case r.stateNotify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// unlockAndNotify releases stateMu and sends channel notifications.
|
||||
// Must be called with stateMu held.
|
||||
func (r *Reporter) unlockAndNotify(urgentState bool) {
|
||||
r.stateMu.Unlock()
|
||||
r.notifyLog()
|
||||
if urgentState {
|
||||
r.notifyState()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) Fire(entry *log.Entry) error {
|
||||
urgentState := false
|
||||
|
||||
r.stateMu.Lock()
|
||||
|
||||
r.stateChanged = true
|
||||
|
||||
if log.IsLevelEnabled(log.TraceLevel) {
|
||||
log.WithFields(entry.Data).Trace(entry.Message)
|
||||
}
|
||||
|
||||
timestamp := entry.Time
|
||||
if r.state.StartedAt == nil {
|
||||
@@ -135,11 +184,13 @@ func (r *Reporter) Fire(entry *log.Entry) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
urgentState = true
|
||||
}
|
||||
}
|
||||
if !r.duringSteps() {
|
||||
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
||||
}
|
||||
r.unlockAndNotify(urgentState)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -153,11 +204,13 @@ func (r *Reporter) Fire(entry *log.Entry) error {
|
||||
if !r.duringSteps() {
|
||||
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
||||
}
|
||||
r.unlockAndNotify(false)
|
||||
return nil
|
||||
}
|
||||
|
||||
if step.StartedAt == nil {
|
||||
step.StartedAt = timestamppb.New(timestamp)
|
||||
urgentState = true
|
||||
}
|
||||
|
||||
// Force reporting log errors as raw output to prevent silent failures
|
||||
@@ -185,26 +238,91 @@ func (r *Reporter) Fire(entry *log.Entry) error {
|
||||
}
|
||||
step.Result = stepResult
|
||||
step.StoppedAt = timestamppb.New(timestamp)
|
||||
urgentState = true
|
||||
}
|
||||
}
|
||||
|
||||
r.unlockAndNotify(urgentState)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reporter) RunDaemon() {
|
||||
r.stateMu.RLock()
|
||||
closed := r.closed
|
||||
r.stateMu.RUnlock()
|
||||
if closed || r.ctx.Err() != nil {
|
||||
// Acknowledge close
|
||||
close(r.daemon)
|
||||
return
|
||||
go r.runDaemonLoop()
|
||||
}
|
||||
|
||||
func (r *Reporter) stopLatencyTimer(active *bool, timer *time.Timer) {
|
||||
if *active {
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
*active = false
|
||||
}
|
||||
}
|
||||
|
||||
_ = r.ReportLog(false)
|
||||
_ = r.ReportState(false)
|
||||
func (r *Reporter) runDaemonLoop() {
|
||||
logTicker := time.NewTicker(r.logReportInterval)
|
||||
stateTicker := time.NewTicker(r.stateReportInterval)
|
||||
|
||||
time.AfterFunc(time.Second, r.RunDaemon)
|
||||
// maxLatencyTimer ensures the first buffered log row is sent within logReportMaxLatency.
|
||||
// Start inactive — it is armed when the first log row arrives in an empty buffer.
|
||||
maxLatencyTimer := time.NewTimer(0)
|
||||
if !maxLatencyTimer.Stop() {
|
||||
<-maxLatencyTimer.C
|
||||
}
|
||||
maxLatencyActive := false
|
||||
|
||||
defer logTicker.Stop()
|
||||
defer stateTicker.Stop()
|
||||
defer maxLatencyTimer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-logTicker.C:
|
||||
_ = r.ReportLog(false)
|
||||
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
|
||||
|
||||
case <-stateTicker.C:
|
||||
_ = r.ReportState(false)
|
||||
|
||||
case <-r.logNotify:
|
||||
r.stateMu.RLock()
|
||||
n := len(r.logRows)
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
if n >= r.logBatchSize {
|
||||
_ = r.ReportLog(false)
|
||||
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
|
||||
} else if !maxLatencyActive && n > 0 {
|
||||
maxLatencyTimer.Reset(r.logReportMaxLatency)
|
||||
maxLatencyActive = true
|
||||
}
|
||||
|
||||
case <-r.stateNotify:
|
||||
// Step transition or job result — flush both immediately for frontend UX.
|
||||
_ = r.ReportLog(false)
|
||||
_ = r.ReportState(false)
|
||||
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
|
||||
|
||||
case <-maxLatencyTimer.C:
|
||||
maxLatencyActive = false
|
||||
_ = r.ReportLog(false)
|
||||
|
||||
case <-r.ctx.Done():
|
||||
close(r.daemon)
|
||||
return
|
||||
}
|
||||
|
||||
r.stateMu.RLock()
|
||||
closed := r.closed
|
||||
r.stateMu.RUnlock()
|
||||
if closed {
|
||||
close(r.daemon)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) Logf(format string, a ...any) {
|
||||
@@ -268,6 +386,10 @@ func (r *Reporter) Close(lastWords string) error {
|
||||
})
|
||||
}
|
||||
r.stateMu.Unlock()
|
||||
|
||||
// Wake up the daemon loop so it detects closed promptly.
|
||||
r.notifyLog()
|
||||
|
||||
// Wait for Acknowledge
|
||||
select {
|
||||
case <-r.daemon:
|
||||
@@ -295,6 +417,10 @@ func (r *Reporter) ReportLog(noMore bool) error {
|
||||
rows := r.logRows
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
if !noMore && len(rows) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
|
||||
TaskId: r.state.Id,
|
||||
Index: int64(r.logOffset),
|
||||
@@ -329,15 +455,7 @@ func (r *Reporter) ReportState(reportResult bool) error {
|
||||
r.clientM.Lock()
|
||||
defer r.clientM.Unlock()
|
||||
|
||||
r.stateMu.RLock()
|
||||
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
// Only report result from Close to reliable sent logs
|
||||
if !reportResult {
|
||||
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
|
||||
}
|
||||
|
||||
// Build the outputs map first (single Range pass instead of two).
|
||||
outputs := make(map[string]string)
|
||||
r.outputs.Range(func(k, v any) bool {
|
||||
if val, ok := v.(string); ok {
|
||||
@@ -346,6 +464,23 @@ func (r *Reporter) ReportState(reportResult bool) error {
|
||||
return true
|
||||
})
|
||||
|
||||
r.stateMu.RLock()
|
||||
changed := r.stateChanged
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
// Early return avoids the expensive proto.Clone on the common no-op path.
|
||||
if !reportResult && !changed && len(outputs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
r.stateMu.RLock()
|
||||
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
if !reportResult {
|
||||
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
|
||||
}
|
||||
|
||||
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
||||
State: state,
|
||||
Outputs: outputs,
|
||||
@@ -354,6 +489,10 @@ func (r *Reporter) ReportState(reportResult bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r.stateMu.Lock()
|
||||
r.stateChanged = false
|
||||
r.stateMu.Unlock()
|
||||
|
||||
for _, k := range resp.Msg.SentOutputs {
|
||||
r.outputs.Store(k, struct{}{})
|
||||
}
|
||||
|
||||
@@ -7,9 +7,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
connect_go "connectrpc.com/connect"
|
||||
@@ -21,6 +19,7 @@ import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
|
||||
"gitea.com/gitea/act_runner/internal/pkg/config"
|
||||
)
|
||||
|
||||
func TestReporter_parseLogRow(t *testing.T) {
|
||||
@@ -175,9 +174,10 @@ func TestReporter_Fire(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
|
||||
Context: taskCtx,
|
||||
})
|
||||
}, cfg)
|
||||
reporter.RunDaemon()
|
||||
defer func() {
|
||||
require.NoError(t, reporter.Close(""))
|
||||
@@ -252,7 +252,8 @@ func TestReporter_EphemeralRunnerDeletion(t *testing.T) {
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx})
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
|
||||
// Fire a log entry to create pending data
|
||||
@@ -315,23 +316,18 @@ func TestReporter_RunDaemonClose_Race(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
|
||||
Context: taskCtx,
|
||||
})
|
||||
}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
|
||||
// Start the daemon loop in a separate goroutine.
|
||||
// RunDaemon reads r.closed and reschedules itself via time.AfterFunc.
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(func() {
|
||||
reporter.RunDaemon()
|
||||
})
|
||||
// Start the daemon loop — RunDaemon spawns a goroutine internally.
|
||||
reporter.RunDaemon()
|
||||
|
||||
// Close concurrently — this races with RunDaemon on r.closed.
|
||||
// Close concurrently — this races with the daemon goroutine on r.closed.
|
||||
require.NoError(t, reporter.Close(""))
|
||||
|
||||
// Cancel context so pending AfterFunc callbacks exit quickly.
|
||||
// Cancel context so the daemon goroutine exits cleanly.
|
||||
cancel()
|
||||
wg.Wait()
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user