From b0726c4423915424aae5c2552bfc64b5c06e1fc0 Mon Sep 17 00:00:00 2001 From: Christopher Homberger Date: Fri, 20 Feb 2026 13:01:12 +0100 Subject: [PATCH] Wait for RunDaemon Acknowledge * Always skip reporting job result from RunDaemon --- internal/pkg/report/reporter.go | 43 +++++++++++++++------------- internal/pkg/report/reporter_test.go | 7 ++++- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index e9541bb..b220b8a 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -37,6 +37,7 @@ type Reporter struct { state *runnerv1.TaskState stateMu sync.RWMutex outputs sync.Map + daemon chan struct{} debugOutputEnabled bool stopCommandEndToken string @@ -63,6 +64,7 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C state: &runnerv1.TaskState{ Id: task.Id, }, + daemon: make(chan struct{}), } if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" { @@ -109,6 +111,7 @@ func (r *Reporter) Fire(entry *log.Entry) error { if stage != "Main" { if v, ok := entry.Data["jobResult"]; ok { if jobResult, ok := r.parseResult(v); ok { + // We need to ensure log upload before this upload r.state.Result = jobResult r.state.StoppedAt = timestamppb.New(timestamp) for _, s := range r.state.Steps { @@ -179,15 +182,14 @@ func (r *Reporter) RunDaemon() { r.stateMu.RLock() closed := r.closed r.stateMu.RUnlock() - if closed { - return - } - if r.ctx.Err() != nil { + if closed || r.ctx.Err() != nil { + // Acknowledge close + close(r.daemon) return } _ = r.ReportLog(false) - _ = r.ReportState() + _ = r.ReportState(false) time.AfterFunc(time.Second, r.RunDaemon) } @@ -253,12 +255,19 @@ func (r *Reporter) Close(lastWords string) error { }) } r.stateMu.Unlock() + // Wait for Acknowledge + select { + case <-r.daemon: + case <-time.After(60 * time.Second): + close(r.daemon) + log.Error("No Response from RunDaemon for 60s, continue best effort") + } return retry.Do(func() error { if err := r.ReportLog(true); err != nil { return err } - return r.reportState() + return r.ReportState(true) }, retry.Context(r.ctx)) } @@ -297,20 +306,9 @@ func (r *Reporter) ReportLog(noMore bool) error { return nil } -// ReportState skips when closed so RunDaemon cannot send a completed state -// before Close sends the final logs. -func (r *Reporter) ReportState() error { - r.stateMu.RLock() - closed := r.closed - r.stateMu.RUnlock() - if closed { - return nil - } - return r.reportState() -} - -// reportState sends unconditionally. Used by Close after final logs are uploaded. -func (r *Reporter) reportState() error { +// ReportState only reports the job result if reportResult is true +// RunDaemon never reports results even if result is set +func (r *Reporter) ReportState(reportResult bool) error { r.clientM.Lock() defer r.clientM.Unlock() @@ -318,6 +316,11 @@ func (r *Reporter) reportState() error { state := proto.Clone(r.state).(*runnerv1.TaskState) r.stateMu.RUnlock() + // Only report result from Close to reliable sent logs + if !reportResult { + state.Result = runnerv1.Result_RESULT_UNSPECIFIED + } + outputs := make(map[string]string) r.outputs.Range(func(k, v interface{}) bool { if val, ok := v.(string); ok { diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 4d16e64..2381b52 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -178,6 +178,7 @@ func TestReporter_Fire(t *testing.T) { reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ Context: taskCtx, }) + reporter.RunDaemon() defer func() { assert.NoError(t, reporter.Close("")) }() @@ -267,14 +268,18 @@ func TestReporter_EphemeralRunnerDeletion(t *testing.T) { // Step 3: RunDaemon's ReportState() — with the fix, this returns early // because closed=true, preventing the server from deleting the runner. - assert.NoError(t, reporter.ReportState()) + assert.NoError(t, reporter.ReportState(false)) assert.False(t, runnerDeleted, "runner must not be deleted by RunDaemon's ReportState") // Step 4: Close's final log upload succeeds because the runner is still alive. // Flush pending rows first, then send the noMore signal (matching Close's retry behavior). assert.NoError(t, reporter.ReportLog(false)) + // Acknowledge Close as done in daemon + close(reporter.daemon) err = reporter.ReportLog(true) assert.NoError(t, err, "final log upload must not fail: runner should not be deleted before Close finishes sending logs") + err = reporter.ReportState(true) + assert.NoError(t, err, "final state update should work: runner should not be deleted before Close finishes sending logs") } func TestReporter_RunDaemonClose_Race(t *testing.T) {