5 Commits

Author SHA1 Message Date
Bo-Yi Wu
7031b3507d fix(report): swap log timer defaults so maxLatencyTimer is effective
- Change log_report_interval default from 3s to 5s (periodic sweep)
- Change log_report_max_latency default from 5s to 3s (single-line guarantee)
- Reverse config validation to warn when maxLatency >= interval
- Add TestReporter_MaxLatencyTimer to verify single-line flush
- Add TestReporter_BatchSizeFlush to verify batch threshold flush
- Add TestReporter_StateNotifyFlush to verify step transition flush
2026-04-11 22:54:42 +08:00
Bo-Yi Wu
fc4eef3e0d fix(poll): fetch task before sleeping to avoid startup delay
- Reorder pollOnce to fetch first and sleep after, matching the
  original rate.Limiter burst=1 behavior where the first poll
  returns immediately
- Remove unused ReportInterval config field that was never shipped
2026-04-11 10:27:24 +08:00
Bo-Yi Wu
cce2dd9b9b fix(config): validate and fix invalid config combinations
- Warn and auto-correct when fetch_interval_max < fetch_interval
- Warn and auto-correct when log_report_max_latency < log_report_interval
- log_report_batch_size <= 0 already handled by existing default check

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 23:43:15 +08:00
Bo-Yi Wu
ec07b8c00b perf: reduce runner-to-server connection load with adaptive reporting and polling
- Replace fixed 1s RunDaemon timer with event-driven select loop using
  separate log (3s) and state (5s) tickers for periodic flush
- Add batch-size threshold (default 100 rows) to flush logs immediately
  during bursty output like npm install
- Add max-latency timer (default 5s) to guarantee single log lines are
  delivered within a bounded time
- Trigger immediate flush on step transitions (start/stop) and job
  result for responsive frontend UX
- Skip ReportLog when no pending rows and ReportState when state is
  unchanged to eliminate no-op HTTP requests
- Replace fixed-rate polling with exponential backoff and jitter to
  prevent thundering herd on idle runners
- Tune HTTP client with MaxIdleConnsPerHost=10 and share a single
  http.Client between Ping and Runner service clients
- Add configurable options: log_report_interval, log_report_max_latency,
  log_report_batch_size, state_report_interval, fetch_interval_max

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 22:41:38 +08:00
Lunny Xiao
90c1275f0e Upgrade yaml (#816)
~wait https://gitea.com/gitea/act/pulls/157~

Reviewed-on: https://gitea.com/gitea/act_runner/pulls/816
Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com>
2026-03-28 16:18:47 +00:00
9 changed files with 480 additions and 89 deletions

7
go.mod
View File

@@ -14,15 +14,14 @@ require (
github.com/sirupsen/logrus v1.9.4
github.com/spf13/cobra v1.10.2
github.com/stretchr/testify v1.11.1
go.yaml.in/yaml/v4 v4.0.0-rc.3
golang.org/x/term v0.40.0
golang.org/x/time v0.14.0
golang.org/x/time v0.14.0 // indirect
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
gotest.tools/v3 v3.5.2
)
require go.yaml.in/yaml/v4 v4.0.0-rc.3
require (
cyphar.com/go-pathrs v0.2.3 // indirect
dario.cat/mergo v1.0.2 // indirect
@@ -110,7 +109,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
)
replace github.com/nektos/act => gitea.com/gitea/act v0.261.8
replace github.com/nektos/act => gitea.com/gitea/act v0.261.10
// Remove after github.com/docker/distribution is updated to support distribution/reference v0.6.0
// (pulled in via moby/buildkit, breaks on undefined: reference.SplitHostname)

4
go.sum
View File

@@ -8,8 +8,8 @@ cyphar.com/go-pathrs v0.2.3 h1:0pH8gep37wB0BgaXrEaN1OtZhUMeS7VvaejSr6i822o=
cyphar.com/go-pathrs v0.2.3/go.mod h1:y8f1EMG7r+hCuFf/rXsKqMJrJAUoADZGNh5/vZPKcGc=
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
gitea.com/gitea/act v0.261.8 h1:rUWB5GOZOubfe2VteKb7XP3HRIbcW3UUmfh7bVAgQcA=
gitea.com/gitea/act v0.261.8/go.mod h1:lTp4136rwbZiZS3ZVQeHCvd4qRAZ7LYeiRBqOSdMY/4=
gitea.com/gitea/act v0.261.10 h1:ndwbtuMXXz1dpYF2iwY1/PkgKNETo4jmPXfinTZt8cs=
gitea.com/gitea/act v0.261.10/go.mod h1:oIkqQHvU0lfuIWwcpqa4FmU+t3prA89tgkuHUTsrI2c=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c h1:udKWzYgxTojEKWjV8V+WSxDXJ4NFATAsZjh8iIbsQIg=

View File

@@ -7,13 +7,14 @@ import (
"context"
"errors"
"fmt"
"math/rand/v2"
"sync"
"sync/atomic"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"gitea.com/gitea/act_runner/internal/app/run"
"gitea.com/gitea/act_runner/internal/pkg/client"
@@ -33,6 +34,9 @@ type Poller struct {
shutdownJobs context.CancelFunc
done chan struct{}
consecutiveEmpty atomic.Int64 // count of consecutive polls with no task available
consecutiveErrors atomic.Int64 // count of consecutive fetch errors
}
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
@@ -58,11 +62,10 @@ func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
}
func (p *Poller) Poll() {
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
wg := &sync.WaitGroup{}
for i := 0; i < p.cfg.Runner.Capacity; i++ {
wg.Add(1)
go p.poll(wg, limiter)
go p.poll(wg)
}
wg.Wait()
@@ -71,9 +74,7 @@ func (p *Poller) Poll() {
}
func (p *Poller) PollOnce() {
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
p.pollOnce(limiter)
p.pollOnce()
// signal that we're done
close(p.done)
@@ -108,10 +109,10 @@ func (p *Poller) Shutdown(ctx context.Context) error {
}
}
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
func (p *Poller) poll(wg *sync.WaitGroup) {
defer wg.Done()
for {
p.pollOnce(limiter)
p.pollOnce()
select {
case <-p.pollingCtx.Done():
@@ -122,19 +123,57 @@ func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
}
}
func (p *Poller) pollOnce(limiter *rate.Limiter) {
// calculateInterval returns the polling interval with exponential backoff based on
// consecutive empty or error responses. The interval starts at FetchInterval and
// doubles with each consecutive empty/error, capped at FetchIntervalMax.
func (p *Poller) calculateInterval() time.Duration {
base := p.cfg.Runner.FetchInterval
maxInterval := p.cfg.Runner.FetchIntervalMax
n := max(p.consecutiveEmpty.Load(), p.consecutiveErrors.Load())
if n <= 1 {
return base
}
// Capped exponential backoff: base * 2^(n-1), max shift=5 so multiplier <= 32
shift := min(n-1, 5)
interval := base * time.Duration(int64(1)<<shift)
return min(interval, maxInterval)
}
// addJitter adds +/- 20% random jitter to the given duration to avoid thundering herd.
func addJitter(d time.Duration) time.Duration {
if d <= 0 {
return d
}
// jitter range: [-20%, +20%] of d
jitterRange := int64(d) * 2 / 5 // 40% total range
if jitterRange <= 0 {
return d
}
jitter := rand.Int64N(jitterRange) - jitterRange/2
return d + time.Duration(jitter)
}
func (p *Poller) pollOnce() {
for {
if err := limiter.Wait(p.pollingCtx); err != nil {
if p.pollingCtx.Err() != nil {
log.WithError(err).Debug("limiter wait failed")
}
return
}
task, ok := p.fetchTask(p.pollingCtx)
if !ok {
interval := addJitter(p.calculateInterval())
timer := time.NewTimer(interval)
select {
case <-timer.C:
case <-p.pollingCtx.Done():
timer.Stop()
return
}
continue
}
// Got a task — reset backoff counters for fast subsequent polling.
p.consecutiveEmpty.Store(0)
p.consecutiveErrors.Store(0)
p.runTaskWithRecover(p.jobsCtx, task)
return
}
@@ -167,10 +206,15 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
}
if err != nil {
log.WithError(err).Error("failed to fetch task")
p.consecutiveErrors.Add(1)
return nil, false
}
// Successful response — reset error counter.
p.consecutiveErrors.Store(0)
if resp == nil || resp.Msg == nil {
p.consecutiveEmpty.Add(1)
return nil, false
}
@@ -179,6 +223,7 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
}
if resp.Msg.Task == nil {
p.consecutiveEmpty.Add(1)
return nil, false
}

View File

@@ -98,7 +98,7 @@ func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout)
defer cancel()
reporter := report.NewReporter(ctx, cancel, r.client, task)
reporter := report.NewReporter(ctx, cancel, r.client, task, r.cfg)
var runErr error
defer func() {
lastWords := ""

View File

@@ -8,6 +8,7 @@ import (
"crypto/tls"
"net/http"
"strings"
"time"
"code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
@@ -15,16 +16,17 @@ import (
)
func getHTTPClient(endpoint string, insecure bool) *http.Client {
transport := &http.Transport{
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10, // All requests go to one host; default is 2 which causes frequent reconnects.
IdleConnTimeout: 90 * time.Second,
}
if strings.HasPrefix(endpoint, "https://") && insecure {
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
return http.DefaultClient
return &http.Client{Transport: transport}
}
// New returns a new runner client.
@@ -47,14 +49,15 @@ func New(endpoint string, insecure bool, uuid, token, version string, opts ...co
}
})))
httpClient := getHTTPClient(endpoint, insecure)
return &HTTPClient{
PingServiceClient: pingv1connect.NewPingServiceClient(
getHTTPClient(endpoint, insecure),
httpClient,
baseURL,
opts...,
),
RunnerServiceClient: runnerv1connect.NewRunnerServiceClient(
getHTTPClient(endpoint, insecure),
httpClient,
baseURL,
opts...,
),

View File

@@ -32,6 +32,24 @@ runner:
fetch_timeout: 5s
# The interval for fetching the job from the Gitea instance.
fetch_interval: 2s
# The maximum interval for fetching the job from the Gitea instance.
# The runner uses exponential backoff when idle, increasing the interval up to this maximum.
# Set to 0 or same as fetch_interval to disable backoff.
fetch_interval_max: 60s
# The base interval for periodic log flush to the Gitea instance.
# Logs may be sent earlier if the buffer reaches log_report_batch_size
# or if log_report_max_latency expires after the first buffered row.
log_report_interval: 5s
# The maximum time a log row can wait before being sent.
# This ensures even a single log line appears on the frontend within this duration.
# Must be less than log_report_interval to have any effect.
log_report_max_latency: 3s
# Flush logs immediately when the buffer reaches this many rows.
# This ensures bursty output (e.g., npm install) is delivered promptly.
log_report_batch_size: 100
# The interval for reporting task state (step status, timing) to the Gitea instance.
# State is also reported immediately on step transitions (start/stop).
state_report_interval: 5s
# The github_mirror of a runner is used to specify the mirror address of the github that pulls the action repository.
# It works when something like `uses: actions/checkout@v4` is used and DEFAULT_ACTIONS_URL is set to github,
# and github_mirror is not empty. In this case,

View File

@@ -22,17 +22,22 @@ type Log struct {
// Runner represents the configuration for the runner.
type Runner struct {
File string `yaml:"file"` // File specifies the file path for the runner.
Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` // ShutdownTimeout specifies the duration to wait for running jobs to complete during a shutdown of the runner.
Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup
GithubMirror string `yaml:"github_mirror"` // GithubMirror defines what mirrors should be used when using github
File string `yaml:"file"` // File specifies the file path for the runner.
Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner.
Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner.
EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner.
Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout.
ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` // ShutdownTimeout specifies the duration to wait for running jobs to complete during a shutdown of the runner.
Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode.
FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources.
FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources.
FetchIntervalMax time.Duration `yaml:"fetch_interval_max"` // FetchIntervalMax specifies the maximum backoff interval when idle.
LogReportInterval time.Duration `yaml:"log_report_interval"` // LogReportInterval specifies the base interval for periodic log flush.
LogReportMaxLatency time.Duration `yaml:"log_report_max_latency"` // LogReportMaxLatency specifies the max time a log row can wait before being sent.
LogReportBatchSize int `yaml:"log_report_batch_size"` // LogReportBatchSize triggers immediate log flush when buffer reaches this size.
StateReportInterval time.Duration `yaml:"state_report_interval"` // StateReportInterval specifies the interval for state reporting.
Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup
GithubMirror string `yaml:"github_mirror"` // GithubMirror defines what mirrors should be used when using github
}
// Cache represents the configuration for caching.
@@ -137,6 +142,32 @@ func LoadDefault(file string) (*Config, error) {
if cfg.Runner.FetchInterval <= 0 {
cfg.Runner.FetchInterval = 2 * time.Second
}
if cfg.Runner.FetchIntervalMax <= 0 {
cfg.Runner.FetchIntervalMax = 60 * time.Second
}
if cfg.Runner.LogReportInterval <= 0 {
cfg.Runner.LogReportInterval = 5 * time.Second
}
if cfg.Runner.LogReportMaxLatency <= 0 {
cfg.Runner.LogReportMaxLatency = 3 * time.Second
}
if cfg.Runner.LogReportBatchSize <= 0 {
cfg.Runner.LogReportBatchSize = 100
}
if cfg.Runner.StateReportInterval <= 0 {
cfg.Runner.StateReportInterval = 5 * time.Second
}
// Validate and fix invalid config combinations to prevent confusing behavior.
if cfg.Runner.FetchIntervalMax < cfg.Runner.FetchInterval {
log.Warnf("fetch_interval_max (%v) is less than fetch_interval (%v), setting fetch_interval_max to fetch_interval",
cfg.Runner.FetchIntervalMax, cfg.Runner.FetchInterval)
cfg.Runner.FetchIntervalMax = cfg.Runner.FetchInterval
}
if cfg.Runner.LogReportMaxLatency >= cfg.Runner.LogReportInterval {
log.Warnf("log_report_max_latency (%v) >= log_report_interval (%v), the max-latency timer will never fire before the periodic ticker; consider lowering log_report_max_latency",
cfg.Runner.LogReportMaxLatency, cfg.Runner.LogReportInterval)
}
// although `container.network_mode` will be deprecated, but we have to be compatible with it for now.
if cfg.Container.NetworkMode != "" && cfg.Container.Network == "" {

View File

@@ -20,6 +20,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
)
type Reporter struct {
@@ -35,16 +36,27 @@ type Reporter struct {
logReplacer *strings.Replacer
oldnew []string
state *runnerv1.TaskState
stateMu sync.RWMutex
outputs sync.Map
daemon chan struct{}
state *runnerv1.TaskState
stateChanged bool
stateMu sync.RWMutex
outputs sync.Map
daemon chan struct{}
// Adaptive batching control
logReportInterval time.Duration
logReportMaxLatency time.Duration
logBatchSize int
stateReportInterval time.Duration
// Event notification channels (non-blocking, buffered 1)
logNotify chan struct{} // signal: new log rows arrived
stateNotify chan struct{} // signal: step transition (start/stop)
debugOutputEnabled bool
stopCommandEndToken string
}
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter {
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task, cfg *config.Config) *Reporter {
var oldnew []string
if v := task.Context.Fields["token"].GetStringValue(); v != "" {
oldnew = append(oldnew, v, "***")
@@ -57,11 +69,17 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
}
rv := &Reporter{
ctx: ctx,
cancel: cancel,
client: client,
oldnew: oldnew,
logReplacer: strings.NewReplacer(oldnew...),
ctx: ctx,
cancel: cancel,
client: client,
oldnew: oldnew,
logReplacer: strings.NewReplacer(oldnew...),
logReportInterval: cfg.Runner.LogReportInterval,
logReportMaxLatency: cfg.Runner.LogReportMaxLatency,
logBatchSize: cfg.Runner.LogReportBatchSize,
stateReportInterval: cfg.Runner.StateReportInterval,
logNotify: make(chan struct{}, 1),
stateNotify: make(chan struct{}, 1),
state: &runnerv1.TaskState{
Id: task.Id,
},
@@ -108,11 +126,42 @@ func isJobStepEntry(entry *log.Entry) bool {
return true
}
func (r *Reporter) Fire(entry *log.Entry) error {
r.stateMu.Lock()
defer r.stateMu.Unlock()
// notifyLog sends a non-blocking signal that new log rows are available.
func (r *Reporter) notifyLog() {
select {
case r.logNotify <- struct{}{}:
default:
}
}
log.WithFields(entry.Data).Trace(entry.Message)
// notifyState sends a non-blocking signal that a UX-critical state change occurred (step start/stop, job result).
func (r *Reporter) notifyState() {
select {
case r.stateNotify <- struct{}{}:
default:
}
}
// unlockAndNotify releases stateMu and sends channel notifications.
// Must be called with stateMu held.
func (r *Reporter) unlockAndNotify(urgentState bool) {
r.stateMu.Unlock()
r.notifyLog()
if urgentState {
r.notifyState()
}
}
func (r *Reporter) Fire(entry *log.Entry) error {
urgentState := false
r.stateMu.Lock()
r.stateChanged = true
if log.IsLevelEnabled(log.TraceLevel) {
log.WithFields(entry.Data).Trace(entry.Message)
}
timestamp := entry.Time
if r.state.StartedAt == nil {
@@ -135,11 +184,13 @@ func (r *Reporter) Fire(entry *log.Entry) error {
}
}
}
urgentState = true
}
}
if !r.duringSteps() {
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
}
r.unlockAndNotify(urgentState)
return nil
}
@@ -153,11 +204,13 @@ func (r *Reporter) Fire(entry *log.Entry) error {
if !r.duringSteps() {
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
}
r.unlockAndNotify(false)
return nil
}
if step.StartedAt == nil {
step.StartedAt = timestamppb.New(timestamp)
urgentState = true
}
// Force reporting log errors as raw output to prevent silent failures
@@ -185,26 +238,91 @@ func (r *Reporter) Fire(entry *log.Entry) error {
}
step.Result = stepResult
step.StoppedAt = timestamppb.New(timestamp)
urgentState = true
}
}
r.unlockAndNotify(urgentState)
return nil
}
func (r *Reporter) RunDaemon() {
r.stateMu.RLock()
closed := r.closed
r.stateMu.RUnlock()
if closed || r.ctx.Err() != nil {
// Acknowledge close
close(r.daemon)
return
go r.runDaemonLoop()
}
func (r *Reporter) stopLatencyTimer(active *bool, timer *time.Timer) {
if *active {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
*active = false
}
}
_ = r.ReportLog(false)
_ = r.ReportState(false)
func (r *Reporter) runDaemonLoop() {
logTicker := time.NewTicker(r.logReportInterval)
stateTicker := time.NewTicker(r.stateReportInterval)
time.AfterFunc(time.Second, r.RunDaemon)
// maxLatencyTimer ensures the first buffered log row is sent within logReportMaxLatency.
// Start inactive — it is armed when the first log row arrives in an empty buffer.
maxLatencyTimer := time.NewTimer(0)
if !maxLatencyTimer.Stop() {
<-maxLatencyTimer.C
}
maxLatencyActive := false
defer logTicker.Stop()
defer stateTicker.Stop()
defer maxLatencyTimer.Stop()
for {
select {
case <-logTicker.C:
_ = r.ReportLog(false)
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
case <-stateTicker.C:
_ = r.ReportState(false)
case <-r.logNotify:
r.stateMu.RLock()
n := len(r.logRows)
r.stateMu.RUnlock()
if n >= r.logBatchSize {
_ = r.ReportLog(false)
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
} else if !maxLatencyActive && n > 0 {
maxLatencyTimer.Reset(r.logReportMaxLatency)
maxLatencyActive = true
}
case <-r.stateNotify:
// Step transition or job result — flush both immediately for frontend UX.
_ = r.ReportLog(false)
_ = r.ReportState(false)
r.stopLatencyTimer(&maxLatencyActive, maxLatencyTimer)
case <-maxLatencyTimer.C:
maxLatencyActive = false
_ = r.ReportLog(false)
case <-r.ctx.Done():
close(r.daemon)
return
}
r.stateMu.RLock()
closed := r.closed
r.stateMu.RUnlock()
if closed {
close(r.daemon)
return
}
}
}
func (r *Reporter) Logf(format string, a ...any) {
@@ -268,6 +386,10 @@ func (r *Reporter) Close(lastWords string) error {
})
}
r.stateMu.Unlock()
// Wake up the daemon loop so it detects closed promptly.
r.notifyLog()
// Wait for Acknowledge
select {
case <-r.daemon:
@@ -295,6 +417,10 @@ func (r *Reporter) ReportLog(noMore bool) error {
rows := r.logRows
r.stateMu.RUnlock()
if !noMore && len(rows) == 0 {
return nil
}
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
TaskId: r.state.Id,
Index: int64(r.logOffset),
@@ -329,15 +455,7 @@ func (r *Reporter) ReportState(reportResult bool) error {
r.clientM.Lock()
defer r.clientM.Unlock()
r.stateMu.RLock()
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateMu.RUnlock()
// Only report result from Close to reliable sent logs
if !reportResult {
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
}
// Build the outputs map first (single Range pass instead of two).
outputs := make(map[string]string)
r.outputs.Range(func(k, v any) bool {
if val, ok := v.(string); ok {
@@ -346,6 +464,23 @@ func (r *Reporter) ReportState(reportResult bool) error {
return true
})
r.stateMu.RLock()
changed := r.stateChanged
r.stateMu.RUnlock()
// Early return avoids the expensive proto.Clone on the common no-op path.
if !reportResult && !changed && len(outputs) == 0 {
return nil
}
r.stateMu.RLock()
state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateMu.RUnlock()
if !reportResult {
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
}
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
State: state,
Outputs: outputs,
@@ -354,6 +489,10 @@ func (r *Reporter) ReportState(reportResult bool) error {
return err
}
r.stateMu.Lock()
r.stateChanged = false
r.stateMu.Unlock()
for _, k := range resp.Msg.SentOutputs {
r.outputs.Store(k, struct{}{})
}

View File

@@ -6,8 +6,9 @@ package report
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -21,6 +22,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
"gitea.com/gitea/act_runner/internal/pkg/config"
)
func TestReporter_parseLogRow(t *testing.T) {
@@ -175,9 +177,10 @@ func TestReporter_Fire(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
cfg, _ := config.LoadDefault("")
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
Context: taskCtx,
})
}, cfg)
reporter.RunDaemon()
defer func() {
require.NoError(t, reporter.Close(""))
@@ -252,7 +255,8 @@ func TestReporter_EphemeralRunnerDeletion(t *testing.T) {
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx})
cfg, _ := config.LoadDefault("")
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(1)
// Fire a log entry to create pending data
@@ -315,23 +319,175 @@ func TestReporter_RunDaemonClose_Race(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
cfg, _ := config.LoadDefault("")
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
Context: taskCtx,
})
}, cfg)
reporter.ResetSteps(1)
// Start the daemon loop in a separate goroutine.
// RunDaemon reads r.closed and reschedules itself via time.AfterFunc.
var wg sync.WaitGroup
wg.Go(func() {
reporter.RunDaemon()
})
// Start the daemon loop — RunDaemon spawns a goroutine internally.
reporter.RunDaemon()
// Close concurrently — this races with RunDaemon on r.closed.
// Close concurrently — this races with the daemon goroutine on r.closed.
require.NoError(t, reporter.Close(""))
// Cancel context so pending AfterFunc callbacks exit quickly.
// Cancel context so the daemon goroutine exits cleanly.
cancel()
wg.Wait()
time.Sleep(2 * time.Second)
}
// TestReporter_MaxLatencyTimer verifies that the maxLatencyTimer flushes a
// single buffered log row before the periodic logTicker fires.
//
// Setup: logReportInterval=10s (effectively never), maxLatency=100ms.
// Fire one log line, then assert UpdateLog is called within 500ms.
func TestReporter_MaxLatencyTimer(t *testing.T) {
var updateLogCalls atomic.Int64
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
updateLogCalls.Add(1)
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
// Custom config: logTicker=10s (won't fire during test), maxLatency=100ms
cfg, _ := config.LoadDefault("")
cfg.Runner.LogReportInterval = 10 * time.Second
cfg.Runner.LogReportMaxLatency = 100 * time.Millisecond
cfg.Runner.LogReportBatchSize = 1000 // won't trigger batch flush
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(1)
reporter.RunDaemon()
defer func() {
_ = reporter.Close("")
}()
// Fire a single log line — not enough to trigger batch flush
require.NoError(t, reporter.Fire(&log.Entry{
Message: "single log line",
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
// maxLatencyTimer should flush within ~100ms. Wait up to 500ms.
assert.Eventually(t, func() bool {
return updateLogCalls.Load() > 0
}, 500*time.Millisecond, 10*time.Millisecond,
"maxLatencyTimer should have flushed the log before logTicker (10s)")
}
// TestReporter_BatchSizeFlush verifies that reaching logBatchSize triggers
// an immediate log flush without waiting for any timer.
func TestReporter_BatchSizeFlush(t *testing.T) {
var updateLogCalls atomic.Int64
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
updateLogCalls.Add(1)
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
// Custom config: large timers, small batch size
cfg, _ := config.LoadDefault("")
cfg.Runner.LogReportInterval = 10 * time.Second
cfg.Runner.LogReportMaxLatency = 10 * time.Second
cfg.Runner.LogReportBatchSize = 5
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(1)
reporter.RunDaemon()
defer func() {
_ = reporter.Close("")
}()
// Fire exactly batchSize log lines
for i := range 5 {
require.NoError(t, reporter.Fire(&log.Entry{
Message: fmt.Sprintf("log line %d", i),
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
}
// Batch threshold should trigger immediate flush
assert.Eventually(t, func() bool {
return updateLogCalls.Load() > 0
}, 500*time.Millisecond, 10*time.Millisecond,
"batch size threshold should have triggered immediate flush")
}
// TestReporter_StateNotifyFlush verifies that step transitions trigger
// an immediate state flush via the stateNotify channel.
func TestReporter_StateNotifyFlush(t *testing.T) {
var updateTaskCalls atomic.Int64
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
updateTaskCalls.Add(1)
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
// Custom config: large state interval so only stateNotify can trigger
cfg, _ := config.LoadDefault("")
cfg.Runner.StateReportInterval = 10 * time.Second
cfg.Runner.LogReportInterval = 10 * time.Second
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(1)
reporter.RunDaemon()
defer func() {
_ = reporter.Close("")
}()
// Fire a log entry that starts a step — this triggers stateNotify
require.NoError(t, reporter.Fire(&log.Entry{
Message: "step starting",
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
// stateNotify should trigger immediate UpdateTask call
assert.Eventually(t, func() bool {
return updateTaskCalls.Load() > 0
}, 500*time.Millisecond, 10*time.Millisecond,
"step transition should have triggered immediate state flush via stateNotify")
}