mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-05-07 15:53:24 +02:00
Compare commits
2 Commits
7c6f1261d4
...
e56b984c04
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e56b984c04 | ||
|
|
fa5334eb24 |
@@ -10,6 +10,7 @@ import (
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitea.com/gitea/act_runner/internal/pkg/client"
|
||||
@@ -48,6 +49,10 @@ type Reporter struct {
|
||||
outputs sync.Map
|
||||
daemon chan struct{}
|
||||
|
||||
// Unix-nanos of the last successful UpdateTask. Atomic so the heartbeat
|
||||
// guard in ReportState reads it without contending stateMu.
|
||||
lastReportedAtNanos atomic.Int64
|
||||
|
||||
// Adaptive batching control
|
||||
logReportInterval time.Duration
|
||||
logReportMaxLatency time.Duration
|
||||
@@ -489,8 +494,12 @@ func (r *Reporter) ReportState(reportResult bool) error {
|
||||
|
||||
// Consume stateChanged atomically with the snapshot; restored on error
|
||||
// below so a concurrent Fire() during UpdateTask isn't silently lost.
|
||||
// Heartbeat at stateReportInterval even when nothing changed, so the server
|
||||
// doesn't time out long-running silent jobs as orphaned (#826).
|
||||
last := r.lastReportedAtNanos.Load()
|
||||
withinHeartbeatInterval := last != 0 && time.Since(time.Unix(0, last)) < r.stateReportInterval
|
||||
r.stateMu.Lock()
|
||||
if !reportResult && !r.stateChanged && len(outputs) == 0 {
|
||||
if !reportResult && !r.stateChanged && len(outputs) == 0 && withinHeartbeatInterval {
|
||||
r.stateMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
@@ -517,6 +526,7 @@ func (r *Reporter) ReportState(reportResult bool) error {
|
||||
return err
|
||||
}
|
||||
metrics.ReportStateTotal.WithLabelValues(metrics.LabelResultSuccess).Inc()
|
||||
r.lastReportedAtNanos.Store(time.Now().UnixNano())
|
||||
|
||||
for _, k := range resp.Msg.SentOutputs {
|
||||
r.outputs.Store(k, struct{}{})
|
||||
|
||||
@@ -597,3 +597,45 @@ func TestReporter_StateNotifyFlush(t *testing.T) {
|
||||
}, 500*time.Millisecond, 10*time.Millisecond,
|
||||
"step transition should have triggered immediate state flush via stateNotify")
|
||||
}
|
||||
|
||||
// TestReporter_StateHeartbeat verifies that ReportState sends a heartbeat
|
||||
// UpdateTask once stateReportInterval has elapsed since the last successful
|
||||
// report, even when nothing has changed. Without this, long-running silent
|
||||
// jobs (no log output, no step transitions) cause the server to time the
|
||||
// task out and cancel it (#826).
|
||||
func TestReporter_StateHeartbeat(t *testing.T) {
|
||||
var updateTaskCalls atomic.Int64
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, _ *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
updateTaskCalls.Add(1)
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
cfg.Runner.StateReportInterval = 50 * time.Millisecond
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
|
||||
// First call has no prior report — sends to seed lastReportedAt.
|
||||
reporter.stateMu.Lock()
|
||||
reporter.stateChanged = true
|
||||
reporter.stateMu.Unlock()
|
||||
require.NoError(t, reporter.ReportState(false))
|
||||
require.Equal(t, int64(1), updateTaskCalls.Load())
|
||||
|
||||
// Second call immediately after with nothing changed — must skip.
|
||||
require.NoError(t, reporter.ReportState(false))
|
||||
assert.Equal(t, int64(1), updateTaskCalls.Load(), "no-op ReportState within stateReportInterval must skip")
|
||||
|
||||
// After stateReportInterval elapses, a heartbeat must fire even with no changes.
|
||||
time.Sleep(2 * cfg.Runner.StateReportInterval)
|
||||
require.NoError(t, reporter.ReportState(false))
|
||||
assert.Equal(t, int64(2), updateTaskCalls.Load(), "ReportState must heartbeat after stateReportInterval even with no state change")
|
||||
}
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
while [ ! -d /etc/s6/docker/supervise ]; do
|
||||
sleep 0.1
|
||||
done
|
||||
|
||||
s6-svwait -U /etc/s6/docker
|
||||
|
||||
exec run.sh
|
||||
|
||||
Reference in New Issue
Block a user