From 295eecb9afd1f784866b7d9d79102406c812e393 Mon Sep 17 00:00:00 2001 From: silverwind Date: Sun, 10 May 2026 20:55:57 +0000 Subject: [PATCH] fix: ensure `dbfs_data` is cleaned up after task completion (#952) Fixes #950. After #819, the daemon flushes logs eagerly on the job-result entry (via the `stateNotify` path), so `Close()` typically runs `ReportLog(true)` with an empty buffer. Gitea's `UpdateLog` handler short-circuits on `len(Rows)==0` before honoring `NoMore`, so the final request never runs `TransferLogs` and `dbfs_data` rows leak. The server-side short-circuit is latent since the original Actions implementation in 2023; #819 made it deterministically reachable. Workaround: inject a sentinel row in `Close()` after the daemon has exited so the final `UpdateLog` always carries at least one row. Done after the daemon waits so the sentinel can't be flushed before `ReportLog(true)` reads it. https://github.com/go-gitea/gitea/pull/37631 drops the empty-rows short-circuit when `NoMore=true`; that would work with or without this PR. Reviewed-on: https://gitea.com/gitea/runner/pulls/952 Reviewed-by: Nicolas Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com> Co-authored-by: silverwind Co-committed-by: silverwind --- internal/pkg/report/reporter.go | 15 +++++++ internal/pkg/report/reporter_test.go | 62 ++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index e87580d9..eda52b49 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -416,6 +416,21 @@ func (r *Reporter) Close(lastWords string) error { log.Error("No Response from RunDaemon for 60s, continue best effort") } + // Gitea's UpdateLog short-circuits on len(Rows)==0 before honoring NoMore, + // so a final empty request never runs TransferLogs and dbfs_data leaks. + // Inject a sentinel row after the daemon has exited so it can't be flushed + // before ReportLog(true). + // TODO: Remove after https://github.com/go-gitea/gitea/pull/37631 is in all + // supported branches, e.g. v1.28+. + r.stateMu.Lock() + if len(r.logRows) == 0 { + r.logRows = append(r.logRows, &runnerv1.LogRow{ + Time: timestamppb.Now(), + Content: "", + }) + } + r.stateMu.Unlock() + // Report the job outcome even when all log upload retry attempts have been exhausted return errors.Join( retry.Do(func() error { diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 0b114608..f12ee1b4 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -598,6 +598,68 @@ func TestReporter_StateNotifyFlush(t *testing.T) { "step transition should have triggered immediate state flush via stateNotify") } +// Regression test for https://gitea.com/gitea/runner/issues/950: Close() must +// always send a final UpdateLog with NoMore=true carrying at least one row, +// otherwise the server's len(Rows)==0 short-circuit skips TransferLogs. +// TODO: Remove after https://github.com/go-gitea/gitea/pull/37631 is in all +// supported branches, e.g. v1.28+. +func TestReporter_CloseAlwaysSendsRowsWithNoMore(t *testing.T) { + var lastReq atomic.Pointer[runnerv1.UpdateLogRequest] + var noMoreCalls atomic.Int64 + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + lastReq.Store(req.Msg) + if req.Msg.NoMore { + noMoreCalls.Add(1) + } + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + + // Intervals large enough that no daemon-driven flush fires during the test. + cfg, _ := config.LoadDefault("") + cfg.Runner.LogReportInterval = 10 * time.Second + cfg.Runner.LogReportMaxLatency = 10 * time.Second + cfg.Runner.StateReportInterval = 10 * time.Second + cfg.Runner.LogReportBatchSize = 1000 + + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + reporter.RunDaemon() + + // Simulate a successful job whose log buffer was already drained by the + // daemon (logOffset > 0, logRows empty, terminal Result set). This is the + // state Close() lands in for the typical successful job under #819. + reporter.stateMu.Lock() + reporter.logOffset = 5 + reporter.state.Result = runnerv1.Result_RESULT_SUCCESS + reporter.state.Steps[0].Result = runnerv1.Result_RESULT_SUCCESS + reporter.state.StoppedAt = timestamppb.Now() + reporter.stateMu.Unlock() + + require.NoError(t, reporter.Close("")) + + require.Equal(t, int64(1), noMoreCalls.Load(), "Close must send exactly one UpdateLog with NoMore=true") + final := lastReq.Load() + require.NotNil(t, final) + assert.True(t, final.NoMore, "final UpdateLog must carry NoMore=true") + assert.NotEmpty(t, final.Rows, "final UpdateLog must carry at least one row") +} + // TestReporter_StateHeartbeat verifies that ReportState sends a heartbeat // UpdateTask once stateReportInterval has elapsed since the last successful // report, even when nothing has changed. Without this, long-running silent