mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-05-11 18:33:24 +02:00
fix: ensure dbfs_data is cleaned up after task completion (#952)
Fixes #950. After #819, the daemon flushes logs eagerly on the job-result entry (via the `stateNotify` path), so `Close()` typically runs `ReportLog(true)` with an empty buffer. Gitea's `UpdateLog` handler short-circuits on `len(Rows)==0` before honoring `NoMore`, so the final request never runs `TransferLogs` and `dbfs_data` rows leak. The server-side short-circuit is latent since the original Actions implementation in 2023; #819 made it deterministically reachable. Workaround: inject a sentinel row in `Close()` after the daemon has exited so the final `UpdateLog` always carries at least one row. Done after the daemon waits so the sentinel can't be flushed before `ReportLog(true)` reads it. https://github.com/go-gitea/gitea/pull/37631 drops the empty-rows short-circuit when `NoMore=true`; that would work with or without this PR. Reviewed-on: https://gitea.com/gitea/runner/pulls/952 Reviewed-by: Nicolas <bircni@icloud.com> Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com> Co-authored-by: silverwind <me@silverwind.io> Co-committed-by: silverwind <me@silverwind.io>
This commit is contained in:
@@ -416,6 +416,21 @@ func (r *Reporter) Close(lastWords string) error {
|
|||||||
log.Error("No Response from RunDaemon for 60s, continue best effort")
|
log.Error("No Response from RunDaemon for 60s, continue best effort")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Gitea's UpdateLog short-circuits on len(Rows)==0 before honoring NoMore,
|
||||||
|
// so a final empty request never runs TransferLogs and dbfs_data leaks.
|
||||||
|
// Inject a sentinel row after the daemon has exited so it can't be flushed
|
||||||
|
// before ReportLog(true).
|
||||||
|
// TODO: Remove after https://github.com/go-gitea/gitea/pull/37631 is in all
|
||||||
|
// supported branches, e.g. v1.28+.
|
||||||
|
r.stateMu.Lock()
|
||||||
|
if len(r.logRows) == 0 {
|
||||||
|
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
||||||
|
Time: timestamppb.Now(),
|
||||||
|
Content: "",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
r.stateMu.Unlock()
|
||||||
|
|
||||||
// Report the job outcome even when all log upload retry attempts have been exhausted
|
// Report the job outcome even when all log upload retry attempts have been exhausted
|
||||||
return errors.Join(
|
return errors.Join(
|
||||||
retry.Do(func() error {
|
retry.Do(func() error {
|
||||||
|
|||||||
@@ -598,6 +598,68 @@ func TestReporter_StateNotifyFlush(t *testing.T) {
|
|||||||
"step transition should have triggered immediate state flush via stateNotify")
|
"step transition should have triggered immediate state flush via stateNotify")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Regression test for https://gitea.com/gitea/runner/issues/950: Close() must
|
||||||
|
// always send a final UpdateLog with NoMore=true carrying at least one row,
|
||||||
|
// otherwise the server's len(Rows)==0 short-circuit skips TransferLogs.
|
||||||
|
// TODO: Remove after https://github.com/go-gitea/gitea/pull/37631 is in all
|
||||||
|
// supported branches, e.g. v1.28+.
|
||||||
|
func TestReporter_CloseAlwaysSendsRowsWithNoMore(t *testing.T) {
|
||||||
|
var lastReq atomic.Pointer[runnerv1.UpdateLogRequest]
|
||||||
|
var noMoreCalls atomic.Int64
|
||||||
|
|
||||||
|
client := mocks.NewClient(t)
|
||||||
|
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
|
||||||
|
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||||
|
lastReq.Store(req.Msg)
|
||||||
|
if req.Msg.NoMore {
|
||||||
|
noMoreCalls.Add(1)
|
||||||
|
}
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
|
||||||
|
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
|
||||||
|
}), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||||
|
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Intervals large enough that no daemon-driven flush fires during the test.
|
||||||
|
cfg, _ := config.LoadDefault("")
|
||||||
|
cfg.Runner.LogReportInterval = 10 * time.Second
|
||||||
|
cfg.Runner.LogReportMaxLatency = 10 * time.Second
|
||||||
|
cfg.Runner.StateReportInterval = 10 * time.Second
|
||||||
|
cfg.Runner.LogReportBatchSize = 1000
|
||||||
|
|
||||||
|
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||||
|
reporter.ResetSteps(1)
|
||||||
|
reporter.RunDaemon()
|
||||||
|
|
||||||
|
// Simulate a successful job whose log buffer was already drained by the
|
||||||
|
// daemon (logOffset > 0, logRows empty, terminal Result set). This is the
|
||||||
|
// state Close() lands in for the typical successful job under #819.
|
||||||
|
reporter.stateMu.Lock()
|
||||||
|
reporter.logOffset = 5
|
||||||
|
reporter.state.Result = runnerv1.Result_RESULT_SUCCESS
|
||||||
|
reporter.state.Steps[0].Result = runnerv1.Result_RESULT_SUCCESS
|
||||||
|
reporter.state.StoppedAt = timestamppb.Now()
|
||||||
|
reporter.stateMu.Unlock()
|
||||||
|
|
||||||
|
require.NoError(t, reporter.Close(""))
|
||||||
|
|
||||||
|
require.Equal(t, int64(1), noMoreCalls.Load(), "Close must send exactly one UpdateLog with NoMore=true")
|
||||||
|
final := lastReq.Load()
|
||||||
|
require.NotNil(t, final)
|
||||||
|
assert.True(t, final.NoMore, "final UpdateLog must carry NoMore=true")
|
||||||
|
assert.NotEmpty(t, final.Rows, "final UpdateLog must carry at least one row")
|
||||||
|
}
|
||||||
|
|
||||||
// TestReporter_StateHeartbeat verifies that ReportState sends a heartbeat
|
// TestReporter_StateHeartbeat verifies that ReportState sends a heartbeat
|
||||||
// UpdateTask once stateReportInterval has elapsed since the last successful
|
// UpdateTask once stateReportInterval has elapsed since the last successful
|
||||||
// report, even when nothing has changed. Without this, long-running silent
|
// report, even when nothing has changed. Without this, long-running silent
|
||||||
|
|||||||
Reference in New Issue
Block a user