Wait for RunDaemon Acknowledge

* Always skip reporting job result from RunDaemon
This commit is contained in:
Christopher Homberger
2026-02-20 13:01:12 +01:00
parent 97c1320234
commit b0726c4423
2 changed files with 29 additions and 21 deletions

View File

@@ -37,6 +37,7 @@ type Reporter struct {
state *runnerv1.TaskState state *runnerv1.TaskState
stateMu sync.RWMutex stateMu sync.RWMutex
outputs sync.Map outputs sync.Map
daemon chan struct{}
debugOutputEnabled bool debugOutputEnabled bool
stopCommandEndToken string stopCommandEndToken string
@@ -63,6 +64,7 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
state: &runnerv1.TaskState{ state: &runnerv1.TaskState{
Id: task.Id, Id: task.Id,
}, },
daemon: make(chan struct{}),
} }
if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" { if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
@@ -109,6 +111,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
if stage != "Main" { if stage != "Main" {
if v, ok := entry.Data["jobResult"]; ok { if v, ok := entry.Data["jobResult"]; ok {
if jobResult, ok := r.parseResult(v); ok { if jobResult, ok := r.parseResult(v); ok {
// We need to ensure log upload before this upload
r.state.Result = jobResult r.state.Result = jobResult
r.state.StoppedAt = timestamppb.New(timestamp) r.state.StoppedAt = timestamppb.New(timestamp)
for _, s := range r.state.Steps { for _, s := range r.state.Steps {
@@ -179,15 +182,14 @@ func (r *Reporter) RunDaemon() {
r.stateMu.RLock() r.stateMu.RLock()
closed := r.closed closed := r.closed
r.stateMu.RUnlock() r.stateMu.RUnlock()
if closed { if closed || r.ctx.Err() != nil {
return // Acknowledge close
} close(r.daemon)
if r.ctx.Err() != nil {
return return
} }
_ = r.ReportLog(false) _ = r.ReportLog(false)
_ = r.ReportState() _ = r.ReportState(false)
time.AfterFunc(time.Second, r.RunDaemon) time.AfterFunc(time.Second, r.RunDaemon)
} }
@@ -253,12 +255,19 @@ func (r *Reporter) Close(lastWords string) error {
}) })
} }
r.stateMu.Unlock() r.stateMu.Unlock()
// Wait for Acknowledge
select {
case <-r.daemon:
case <-time.After(60 * time.Second):
close(r.daemon)
log.Error("No Response from RunDaemon for 60s, continue best effort")
}
return retry.Do(func() error { return retry.Do(func() error {
if err := r.ReportLog(true); err != nil { if err := r.ReportLog(true); err != nil {
return err return err
} }
return r.reportState() return r.ReportState(true)
}, retry.Context(r.ctx)) }, retry.Context(r.ctx))
} }
@@ -297,20 +306,9 @@ func (r *Reporter) ReportLog(noMore bool) error {
return nil return nil
} }
// ReportState skips when closed so RunDaemon cannot send a completed state // ReportState only reports the job result if reportResult is true
// before Close sends the final logs. // RunDaemon never reports results even if result is set
func (r *Reporter) ReportState() error { func (r *Reporter) ReportState(reportResult bool) error {
r.stateMu.RLock()
closed := r.closed
r.stateMu.RUnlock()
if closed {
return nil
}
return r.reportState()
}
// reportState sends unconditionally. Used by Close after final logs are uploaded.
func (r *Reporter) reportState() error {
r.clientM.Lock() r.clientM.Lock()
defer r.clientM.Unlock() defer r.clientM.Unlock()
@@ -318,6 +316,11 @@ func (r *Reporter) reportState() error {
state := proto.Clone(r.state).(*runnerv1.TaskState) state := proto.Clone(r.state).(*runnerv1.TaskState)
r.stateMu.RUnlock() r.stateMu.RUnlock()
// Only report result from Close to reliable sent logs
if !reportResult {
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
}
outputs := make(map[string]string) outputs := make(map[string]string)
r.outputs.Range(func(k, v interface{}) bool { r.outputs.Range(func(k, v interface{}) bool {
if val, ok := v.(string); ok { if val, ok := v.(string); ok {

View File

@@ -178,6 +178,7 @@ func TestReporter_Fire(t *testing.T) {
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{ reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
Context: taskCtx, Context: taskCtx,
}) })
reporter.RunDaemon()
defer func() { defer func() {
assert.NoError(t, reporter.Close("")) assert.NoError(t, reporter.Close(""))
}() }()
@@ -267,14 +268,18 @@ func TestReporter_EphemeralRunnerDeletion(t *testing.T) {
// Step 3: RunDaemon's ReportState() — with the fix, this returns early // Step 3: RunDaemon's ReportState() — with the fix, this returns early
// because closed=true, preventing the server from deleting the runner. // because closed=true, preventing the server from deleting the runner.
assert.NoError(t, reporter.ReportState()) assert.NoError(t, reporter.ReportState(false))
assert.False(t, runnerDeleted, "runner must not be deleted by RunDaemon's ReportState") assert.False(t, runnerDeleted, "runner must not be deleted by RunDaemon's ReportState")
// Step 4: Close's final log upload succeeds because the runner is still alive. // Step 4: Close's final log upload succeeds because the runner is still alive.
// Flush pending rows first, then send the noMore signal (matching Close's retry behavior). // Flush pending rows first, then send the noMore signal (matching Close's retry behavior).
assert.NoError(t, reporter.ReportLog(false)) assert.NoError(t, reporter.ReportLog(false))
// Acknowledge Close as done in daemon
close(reporter.daemon)
err = reporter.ReportLog(true) err = reporter.ReportLog(true)
assert.NoError(t, err, "final log upload must not fail: runner should not be deleted before Close finishes sending logs") assert.NoError(t, err, "final log upload must not fail: runner should not be deleted before Close finishes sending logs")
err = reporter.ReportState(true)
assert.NoError(t, err, "final state update should work: runner should not be deleted before Close finishes sending logs")
} }
func TestReporter_RunDaemonClose_Race(t *testing.T) { func TestReporter_RunDaemonClose_Race(t *testing.T) {