fix(report): swap log timer defaults so maxLatencyTimer is effective

- Change log_report_interval default from 3s to 5s (periodic sweep)
- Change log_report_max_latency default from 5s to 3s (single-line guarantee)
- Reverse config validation to warn when maxLatency >= interval
- Add TestReporter_MaxLatencyTimer to verify single-line flush
- Add TestReporter_BatchSizeFlush to verify batch threshold flush
- Add TestReporter_StateNotifyFlush to verify step transition flush
This commit is contained in:
Bo-Yi Wu
2026-04-11 22:54:42 +08:00
parent fc4eef3e0d
commit 7031b3507d
3 changed files with 169 additions and 8 deletions

View File

@@ -37,11 +37,13 @@ runner:
# Set to 0 or same as fetch_interval to disable backoff. # Set to 0 or same as fetch_interval to disable backoff.
fetch_interval_max: 60s fetch_interval_max: 60s
# The base interval for periodic log flush to the Gitea instance. # The base interval for periodic log flush to the Gitea instance.
# Logs may be sent earlier if the buffer reaches log_report_batch_size. # Logs may be sent earlier if the buffer reaches log_report_batch_size
log_report_interval: 3s # or if log_report_max_latency expires after the first buffered row.
log_report_interval: 5s
# The maximum time a log row can wait before being sent. # The maximum time a log row can wait before being sent.
# This ensures even a single log line appears on the frontend within this duration. # This ensures even a single log line appears on the frontend within this duration.
log_report_max_latency: 5s # Must be less than log_report_interval to have any effect.
log_report_max_latency: 3s
# Flush logs immediately when the buffer reaches this many rows. # Flush logs immediately when the buffer reaches this many rows.
# This ensures bursty output (e.g., npm install) is delivered promptly. # This ensures bursty output (e.g., npm install) is delivered promptly.
log_report_batch_size: 100 log_report_batch_size: 100

View File

@@ -146,10 +146,10 @@ func LoadDefault(file string) (*Config, error) {
cfg.Runner.FetchIntervalMax = 60 * time.Second cfg.Runner.FetchIntervalMax = 60 * time.Second
} }
if cfg.Runner.LogReportInterval <= 0 { if cfg.Runner.LogReportInterval <= 0 {
cfg.Runner.LogReportInterval = 3 * time.Second cfg.Runner.LogReportInterval = 5 * time.Second
} }
if cfg.Runner.LogReportMaxLatency <= 0 { if cfg.Runner.LogReportMaxLatency <= 0 {
cfg.Runner.LogReportMaxLatency = 5 * time.Second cfg.Runner.LogReportMaxLatency = 3 * time.Second
} }
if cfg.Runner.LogReportBatchSize <= 0 { if cfg.Runner.LogReportBatchSize <= 0 {
cfg.Runner.LogReportBatchSize = 100 cfg.Runner.LogReportBatchSize = 100
@@ -164,10 +164,9 @@ func LoadDefault(file string) (*Config, error) {
cfg.Runner.FetchIntervalMax, cfg.Runner.FetchInterval) cfg.Runner.FetchIntervalMax, cfg.Runner.FetchInterval)
cfg.Runner.FetchIntervalMax = cfg.Runner.FetchInterval cfg.Runner.FetchIntervalMax = cfg.Runner.FetchInterval
} }
if cfg.Runner.LogReportMaxLatency < cfg.Runner.LogReportInterval { if cfg.Runner.LogReportMaxLatency >= cfg.Runner.LogReportInterval {
log.Warnf("log_report_max_latency (%v) is less than log_report_interval (%v), setting log_report_max_latency to log_report_interval", log.Warnf("log_report_max_latency (%v) >= log_report_interval (%v), the max-latency timer will never fire before the periodic ticker; consider lowering log_report_max_latency",
cfg.Runner.LogReportMaxLatency, cfg.Runner.LogReportInterval) cfg.Runner.LogReportMaxLatency, cfg.Runner.LogReportInterval)
cfg.Runner.LogReportMaxLatency = cfg.Runner.LogReportInterval
} }
// although `container.network_mode` will be deprecated, but we have to be compatible with it for now. // although `container.network_mode` will be deprecated, but we have to be compatible with it for now.

View File

@@ -6,8 +6,11 @@ package report
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1" runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
connect_go "connectrpc.com/connect" connect_go "connectrpc.com/connect"
@@ -331,3 +334,160 @@ func TestReporter_RunDaemonClose_Race(t *testing.T) {
// Cancel context so the daemon goroutine exits cleanly. // Cancel context so the daemon goroutine exits cleanly.
cancel() cancel()
} }
// TestReporter_MaxLatencyTimer verifies that the maxLatencyTimer flushes a
// single buffered log row before the periodic logTicker fires.
//
// Setup: logReportInterval=10s (effectively never), maxLatency=100ms.
// Fire one log line, then assert UpdateLog is called within 500ms.
func TestReporter_MaxLatencyTimer(t *testing.T) {
var updateLogCalls atomic.Int64
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
updateLogCalls.Add(1)
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
// Custom config: logTicker=10s (won't fire during test), maxLatency=100ms
cfg, _ := config.LoadDefault("")
cfg.Runner.LogReportInterval = 10 * time.Second
cfg.Runner.LogReportMaxLatency = 100 * time.Millisecond
cfg.Runner.LogReportBatchSize = 1000 // won't trigger batch flush
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(1)
reporter.RunDaemon()
defer func() {
_ = reporter.Close("")
}()
// Fire a single log line — not enough to trigger batch flush
require.NoError(t, reporter.Fire(&log.Entry{
Message: "single log line",
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
// maxLatencyTimer should flush within ~100ms. Wait up to 500ms.
assert.Eventually(t, func() bool {
return updateLogCalls.Load() > 0
}, 500*time.Millisecond, 10*time.Millisecond,
"maxLatencyTimer should have flushed the log before logTicker (10s)")
}
// TestReporter_BatchSizeFlush verifies that reaching logBatchSize triggers
// an immediate log flush without waiting for any timer.
func TestReporter_BatchSizeFlush(t *testing.T) {
var updateLogCalls atomic.Int64
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
updateLogCalls.Add(1)
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
// Custom config: large timers, small batch size
cfg, _ := config.LoadDefault("")
cfg.Runner.LogReportInterval = 10 * time.Second
cfg.Runner.LogReportMaxLatency = 10 * time.Second
cfg.Runner.LogReportBatchSize = 5
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(1)
reporter.RunDaemon()
defer func() {
_ = reporter.Close("")
}()
// Fire exactly batchSize log lines
for i := range 5 {
require.NoError(t, reporter.Fire(&log.Entry{
Message: fmt.Sprintf("log line %d", i),
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
}
// Batch threshold should trigger immediate flush
assert.Eventually(t, func() bool {
return updateLogCalls.Load() > 0
}, 500*time.Millisecond, 10*time.Millisecond,
"batch size threshold should have triggered immediate flush")
}
// TestReporter_StateNotifyFlush verifies that step transitions trigger
// an immediate state flush via the stateNotify channel.
func TestReporter_StateNotifyFlush(t *testing.T) {
var updateTaskCalls atomic.Int64
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Maybe().Return(
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
},
)
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
updateTaskCalls.Add(1)
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
// Custom config: large state interval so only stateNotify can trigger
cfg, _ := config.LoadDefault("")
cfg.Runner.StateReportInterval = 10 * time.Second
cfg.Runner.LogReportInterval = 10 * time.Second
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.ResetSteps(1)
reporter.RunDaemon()
defer func() {
_ = reporter.Close("")
}()
// Fire a log entry that starts a step — this triggers stateNotify
require.NoError(t, reporter.Fire(&log.Entry{
Message: "step starting",
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
}))
// stateNotify should trigger immediate UpdateTask call
assert.Eventually(t, func() bool {
return updateTaskCalls.Load() > 0
}, 500*time.Millisecond, 10*time.Millisecond,
"step transition should have triggered immediate state flush via stateNotify")
}