diff --git a/internal/pkg/config/config.example.yaml b/internal/pkg/config/config.example.yaml index 81c31756..2d4b9d98 100644 --- a/internal/pkg/config/config.example.yaml +++ b/internal/pkg/config/config.example.yaml @@ -37,11 +37,13 @@ runner: # Set to 0 or same as fetch_interval to disable backoff. fetch_interval_max: 60s # The base interval for periodic log flush to the Gitea instance. - # Logs may be sent earlier if the buffer reaches log_report_batch_size. - log_report_interval: 3s + # Logs may be sent earlier if the buffer reaches log_report_batch_size + # or if log_report_max_latency expires after the first buffered row. + log_report_interval: 5s # The maximum time a log row can wait before being sent. # This ensures even a single log line appears on the frontend within this duration. - log_report_max_latency: 5s + # Must be less than log_report_interval to have any effect. + log_report_max_latency: 3s # Flush logs immediately when the buffer reaches this many rows. # This ensures bursty output (e.g., npm install) is delivered promptly. log_report_batch_size: 100 diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index eefba633..814634c8 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -146,10 +146,10 @@ func LoadDefault(file string) (*Config, error) { cfg.Runner.FetchIntervalMax = 60 * time.Second } if cfg.Runner.LogReportInterval <= 0 { - cfg.Runner.LogReportInterval = 3 * time.Second + cfg.Runner.LogReportInterval = 5 * time.Second } if cfg.Runner.LogReportMaxLatency <= 0 { - cfg.Runner.LogReportMaxLatency = 5 * time.Second + cfg.Runner.LogReportMaxLatency = 3 * time.Second } if cfg.Runner.LogReportBatchSize <= 0 { cfg.Runner.LogReportBatchSize = 100 @@ -164,10 +164,9 @@ func LoadDefault(file string) (*Config, error) { cfg.Runner.FetchIntervalMax, cfg.Runner.FetchInterval) cfg.Runner.FetchIntervalMax = cfg.Runner.FetchInterval } - if cfg.Runner.LogReportMaxLatency < cfg.Runner.LogReportInterval { - log.Warnf("log_report_max_latency (%v) is less than log_report_interval (%v), setting log_report_max_latency to log_report_interval", + if cfg.Runner.LogReportMaxLatency >= cfg.Runner.LogReportInterval { + log.Warnf("log_report_max_latency (%v) >= log_report_interval (%v), the max-latency timer will never fire before the periodic ticker; consider lowering log_report_max_latency", cfg.Runner.LogReportMaxLatency, cfg.Runner.LogReportInterval) - cfg.Runner.LogReportMaxLatency = cfg.Runner.LogReportInterval } // although `container.network_mode` will be deprecated, but we have to be compatible with it for now. diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 030d3c9a..62a5fb52 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -6,8 +6,11 @@ package report import ( "context" "errors" + "fmt" "strings" + "sync/atomic" "testing" + "time" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" connect_go "connectrpc.com/connect" @@ -331,3 +334,160 @@ func TestReporter_RunDaemonClose_Race(t *testing.T) { // Cancel context so the daemon goroutine exits cleanly. cancel() } + +// TestReporter_MaxLatencyTimer verifies that the maxLatencyTimer flushes a +// single buffered log row before the periodic logTicker fires. +// +// Setup: logReportInterval=10s (effectively never), maxLatency=100ms. +// Fire one log line, then assert UpdateLog is called within 500ms. +func TestReporter_MaxLatencyTimer(t *testing.T) { + var updateLogCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + updateLogCalls.Add(1) + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + + // Custom config: logTicker=10s (won't fire during test), maxLatency=100ms + cfg, _ := config.LoadDefault("") + cfg.Runner.LogReportInterval = 10 * time.Second + cfg.Runner.LogReportMaxLatency = 100 * time.Millisecond + cfg.Runner.LogReportBatchSize = 1000 // won't trigger batch flush + + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + reporter.RunDaemon() + defer func() { + _ = reporter.Close("") + }() + + // Fire a single log line — not enough to trigger batch flush + require.NoError(t, reporter.Fire(&log.Entry{ + Message: "single log line", + Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true}, + })) + + // maxLatencyTimer should flush within ~100ms. Wait up to 500ms. + assert.Eventually(t, func() bool { + return updateLogCalls.Load() > 0 + }, 500*time.Millisecond, 10*time.Millisecond, + "maxLatencyTimer should have flushed the log before logTicker (10s)") +} + +// TestReporter_BatchSizeFlush verifies that reaching logBatchSize triggers +// an immediate log flush without waiting for any timer. +func TestReporter_BatchSizeFlush(t *testing.T) { + var updateLogCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + updateLogCalls.Add(1) + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + + // Custom config: large timers, small batch size + cfg, _ := config.LoadDefault("") + cfg.Runner.LogReportInterval = 10 * time.Second + cfg.Runner.LogReportMaxLatency = 10 * time.Second + cfg.Runner.LogReportBatchSize = 5 + + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + reporter.RunDaemon() + defer func() { + _ = reporter.Close("") + }() + + // Fire exactly batchSize log lines + for i := range 5 { + require.NoError(t, reporter.Fire(&log.Entry{ + Message: fmt.Sprintf("log line %d", i), + Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true}, + })) + } + + // Batch threshold should trigger immediate flush + assert.Eventually(t, func() bool { + return updateLogCalls.Load() > 0 + }, 500*time.Millisecond, 10*time.Millisecond, + "batch size threshold should have triggered immediate flush") +} + +// TestReporter_StateNotifyFlush verifies that step transitions trigger +// an immediate state flush via the stateNotify channel. +func TestReporter_StateNotifyFlush(t *testing.T) { + var updateTaskCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Maybe().Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + updateTaskCalls.Add(1) + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + + // Custom config: large state interval so only stateNotify can trigger + cfg, _ := config.LoadDefault("") + cfg.Runner.StateReportInterval = 10 * time.Second + cfg.Runner.LogReportInterval = 10 * time.Second + + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + reporter.RunDaemon() + defer func() { + _ = reporter.Close("") + }() + + // Fire a log entry that starts a step — this triggers stateNotify + require.NoError(t, reporter.Fire(&log.Entry{ + Message: "step starting", + Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true}, + })) + + // stateNotify should trigger immediate UpdateTask call + assert.Eventually(t, func() bool { + return updateTaskCalls.Load() > 0 + }, 500*time.Millisecond, 10*time.Millisecond, + "step transition should have triggered immediate state flush via stateNotify") +}