mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-03-22 06:45:03 +01:00
Merge branch 'act-runner-actions-oss-act' of gitea.com:gitea/act_runner into act-runner-actions-oss-act
This commit is contained in:
2
Makefile
2
Makefile
@@ -125,7 +125,7 @@ tidy-check: tidy
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
test: fmt-check security-check
|
test: fmt-check security-check
|
||||||
@$(GO) test -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
|
@$(GO) test -race -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
|
||||||
|
|
||||||
.PHONY: vet
|
.PHONY: vet
|
||||||
vet:
|
vet:
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ package report
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"slices"
|
"slices"
|
||||||
@@ -38,6 +39,7 @@ type Reporter struct {
|
|||||||
state *runnerv1.TaskState
|
state *runnerv1.TaskState
|
||||||
stateMu sync.RWMutex
|
stateMu sync.RWMutex
|
||||||
outputs sync.Map
|
outputs sync.Map
|
||||||
|
daemon chan struct{}
|
||||||
|
|
||||||
debugOutputEnabled bool
|
debugOutputEnabled bool
|
||||||
stopCommandEndToken string
|
stopCommandEndToken string
|
||||||
@@ -66,6 +68,7 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
|
|||||||
state: &runnerv1.TaskState{
|
state: &runnerv1.TaskState{
|
||||||
Id: task.Id,
|
Id: task.Id,
|
||||||
},
|
},
|
||||||
|
daemon: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
|
if task.Secrets["ACTIONS_STEP_DEBUG"] == "true" {
|
||||||
@@ -103,6 +106,18 @@ func appendIfNotNil[T any](s []*T, v *T) []*T {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isJobStepEntry is used to not report composite step results incorrectly as step result
|
||||||
|
// returns true if the logentry is on job level
|
||||||
|
// returns false for composite action step messages
|
||||||
|
func isJobStepEntry(entry *log.Entry) bool {
|
||||||
|
if v, ok := entry.Data["stepID"]; ok {
|
||||||
|
if v, ok := v.([]string); ok && len(v) > 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Reporter) Fire(entry *log.Entry) error {
|
func (r *Reporter) Fire(entry *log.Entry) error {
|
||||||
r.stateMu.Lock()
|
r.stateMu.Lock()
|
||||||
defer r.stateMu.Unlock()
|
defer r.stateMu.Unlock()
|
||||||
@@ -119,6 +134,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
|
|||||||
if stage != "Main" {
|
if stage != "Main" {
|
||||||
if v, ok := entry.Data["jobResult"]; ok {
|
if v, ok := entry.Data["jobResult"]; ok {
|
||||||
if jobResult, ok := r.parseResult(v); ok {
|
if jobResult, ok := r.parseResult(v); ok {
|
||||||
|
// We need to ensure log upload before this upload
|
||||||
r.state.Result = jobResult
|
r.state.Result = jobResult
|
||||||
r.state.StoppedAt = timestamppb.New(timestamp)
|
r.state.StoppedAt = timestamppb.New(timestamp)
|
||||||
for _, s := range r.state.Steps {
|
for _, s := range r.state.Steps {
|
||||||
@@ -178,7 +194,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
|
|||||||
} else if !r.duringSteps() {
|
} else if !r.duringSteps() {
|
||||||
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
|
||||||
}
|
}
|
||||||
if v, ok := entry.Data["stepResult"]; ok {
|
if v, ok := entry.Data["stepResult"]; ok && isJobStepEntry(entry) {
|
||||||
if stepResult, ok := r.parseResult(v); ok {
|
if stepResult, ok := r.parseResult(v); ok {
|
||||||
if step.LogLength == 0 {
|
if step.LogLength == 0 {
|
||||||
step.LogIndex = int64(r.logOffset + len(r.logRows))
|
step.LogIndex = int64(r.logOffset + len(r.logRows))
|
||||||
@@ -192,15 +208,17 @@ func (r *Reporter) Fire(entry *log.Entry) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reporter) RunDaemon() {
|
func (r *Reporter) RunDaemon() {
|
||||||
if r.closed {
|
r.stateMu.RLock()
|
||||||
return
|
closed := r.closed
|
||||||
}
|
r.stateMu.RUnlock()
|
||||||
if r.ctx.Err() != nil {
|
if closed || r.ctx.Err() != nil {
|
||||||
|
// Acknowledge close
|
||||||
|
close(r.daemon)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = r.ReportLog(false)
|
_ = r.ReportLog(false)
|
||||||
_ = r.ReportState()
|
_ = r.ReportState(false)
|
||||||
|
|
||||||
time.AfterFunc(time.Second, r.RunDaemon)
|
time.AfterFunc(time.Second, r.RunDaemon)
|
||||||
}
|
}
|
||||||
@@ -242,9 +260,8 @@ func (r *Reporter) SetOutputs(outputs map[string]string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reporter) Close(lastWords string) error {
|
func (r *Reporter) Close(lastWords string) error {
|
||||||
r.closed = true
|
|
||||||
|
|
||||||
r.stateMu.Lock()
|
r.stateMu.Lock()
|
||||||
|
r.closed = true
|
||||||
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||||
if lastWords == "" {
|
if lastWords == "" {
|
||||||
lastWords = "Early termination"
|
lastWords = "Early termination"
|
||||||
@@ -267,13 +284,23 @@ func (r *Reporter) Close(lastWords string) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
r.stateMu.Unlock()
|
r.stateMu.Unlock()
|
||||||
|
// Wait for Acknowledge
|
||||||
return retry.Do(func() error {
|
select {
|
||||||
if err := r.ReportLog(true); err != nil {
|
case <-r.daemon:
|
||||||
return err
|
case <-time.After(60 * time.Second):
|
||||||
|
close(r.daemon)
|
||||||
|
log.Error("No Response from RunDaemon for 60s, continue best effort")
|
||||||
}
|
}
|
||||||
return r.ReportState()
|
|
||||||
}, retry.Context(r.ctx))
|
// Report the job outcome even when all log upload retry attempts have been exhausted
|
||||||
|
return errors.Join(
|
||||||
|
retry.Do(func() error {
|
||||||
|
return r.ReportLog(true)
|
||||||
|
}, retry.Context(r.ctx)),
|
||||||
|
retry.Do(func() error {
|
||||||
|
return r.ReportState(true)
|
||||||
|
}, retry.Context(r.ctx)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reporter) ReportLog(noMore bool) error {
|
func (r *Reporter) ReportLog(noMore bool) error {
|
||||||
@@ -301,17 +328,20 @@ func (r *Reporter) ReportLog(noMore bool) error {
|
|||||||
|
|
||||||
r.stateMu.Lock()
|
r.stateMu.Lock()
|
||||||
r.logRows = r.logRows[ack-r.logOffset:]
|
r.logRows = r.logRows[ack-r.logOffset:]
|
||||||
|
submitted := r.logOffset + len(rows)
|
||||||
r.logOffset = ack
|
r.logOffset = ack
|
||||||
r.stateMu.Unlock()
|
r.stateMu.Unlock()
|
||||||
|
|
||||||
if noMore && ack < r.logOffset+len(rows) {
|
if noMore && ack < submitted {
|
||||||
return fmt.Errorf("not all logs are submitted")
|
return fmt.Errorf("not all logs are submitted")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reporter) ReportState() error {
|
// ReportState only reports the job result if reportResult is true
|
||||||
|
// RunDaemon never reports results even if result is set
|
||||||
|
func (r *Reporter) ReportState(reportResult bool) error {
|
||||||
r.clientM.Lock()
|
r.clientM.Lock()
|
||||||
defer r.clientM.Unlock()
|
defer r.clientM.Unlock()
|
||||||
|
|
||||||
@@ -319,6 +349,11 @@ func (r *Reporter) ReportState() error {
|
|||||||
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
||||||
r.stateMu.RUnlock()
|
r.stateMu.RUnlock()
|
||||||
|
|
||||||
|
// Only report result from Close to reliable sent logs
|
||||||
|
if !reportResult {
|
||||||
|
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
|
||||||
|
}
|
||||||
|
|
||||||
outputs := make(map[string]string)
|
outputs := make(map[string]string)
|
||||||
r.outputs.Range(func(k, v interface{}) bool {
|
r.outputs.Range(func(k, v interface{}) bool {
|
||||||
if val, ok := v.(string); ok {
|
if val, ok := v.(string); ok {
|
||||||
|
|||||||
@@ -5,8 +5,11 @@ package report
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||||
connect_go "connectrpc.com/connect"
|
connect_go "connectrpc.com/connect"
|
||||||
@@ -15,6 +18,7 @@ import (
|
|||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"google.golang.org/protobuf/types/known/structpb"
|
"google.golang.org/protobuf/types/known/structpb"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
|
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
|
||||||
)
|
)
|
||||||
@@ -174,6 +178,7 @@ func TestReporter_Fire(t *testing.T) {
|
|||||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
|
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
|
||||||
Context: taskCtx,
|
Context: taskCtx,
|
||||||
})
|
})
|
||||||
|
reporter.RunDaemon()
|
||||||
defer func() {
|
defer func() {
|
||||||
assert.NoError(t, reporter.Close(""))
|
assert.NoError(t, reporter.Close(""))
|
||||||
}()
|
}()
|
||||||
@@ -191,7 +196,144 @@ func TestReporter_Fire(t *testing.T) {
|
|||||||
assert.NoError(t, reporter.Fire(&log.Entry{Message: "::debug::debug log line", Data: dataStep0}))
|
assert.NoError(t, reporter.Fire(&log.Entry{Message: "::debug::debug log line", Data: dataStep0}))
|
||||||
assert.NoError(t, reporter.Fire(&log.Entry{Message: "::debug::debug log line", Data: dataStep0}))
|
assert.NoError(t, reporter.Fire(&log.Entry{Message: "::debug::debug log line", Data: dataStep0}))
|
||||||
assert.NoError(t, reporter.Fire(&log.Entry{Message: "regular log line", Data: dataStep0}))
|
assert.NoError(t, reporter.Fire(&log.Entry{Message: "regular log line", Data: dataStep0}))
|
||||||
|
assert.NoError(t, reporter.Fire(&log.Entry{Message: "composite step result", Data: map[string]interface{}{
|
||||||
|
"stage": "Main",
|
||||||
|
"stepID": []string{"0", "0"},
|
||||||
|
"stepNumber": 0,
|
||||||
|
"raw_output": true,
|
||||||
|
"stepResult": "failure",
|
||||||
|
}}))
|
||||||
|
assert.Equal(t, runnerv1.Result_RESULT_UNSPECIFIED, reporter.state.Steps[0].Result)
|
||||||
|
assert.NoError(t, reporter.Fire(&log.Entry{Message: "step result", Data: map[string]interface{}{
|
||||||
|
"stage": "Main",
|
||||||
|
"stepNumber": 0,
|
||||||
|
"raw_output": true,
|
||||||
|
"stepResult": "success",
|
||||||
|
}}))
|
||||||
|
assert.Equal(t, runnerv1.Result_RESULT_SUCCESS, reporter.state.Steps[0].Result)
|
||||||
|
|
||||||
assert.Equal(t, int64(3), reporter.state.Steps[0].LogLength)
|
assert.Equal(t, int64(5), reporter.state.Steps[0].LogLength)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestReporter_EphemeralRunnerDeletion reproduces the exact scenario from
|
||||||
|
// https://gitea.com/gitea/act_runner/issues/793:
|
||||||
|
//
|
||||||
|
// 1. RunDaemon calls ReportLog(false) — runner is still alive
|
||||||
|
// 2. Close() updates state to Result=FAILURE (between RunDaemon's ReportLog and ReportState)
|
||||||
|
// 3. RunDaemon's ReportState() would clone the completed state and send it,
|
||||||
|
// but the fix makes ReportState return early when closed, preventing this
|
||||||
|
// 4. Close's ReportLog(true) succeeds because the runner was not deleted
|
||||||
|
func TestReporter_EphemeralRunnerDeletion(t *testing.T) {
|
||||||
|
runnerDeleted := false
|
||||||
|
|
||||||
|
client := mocks.NewClient(t)
|
||||||
|
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
|
||||||
|
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||||
|
if runnerDeleted {
|
||||||
|
return nil, fmt.Errorf("runner has been deleted")
|
||||||
|
}
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
|
||||||
|
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
|
||||||
|
}), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
client.On("UpdateTask", mock.Anything, mock.Anything).Maybe().Return(
|
||||||
|
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||||
|
// Server deletes ephemeral runner when it receives a completed state
|
||||||
|
if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
|
||||||
|
runnerDeleted = true
|
||||||
|
}
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
taskCtx, err := structpb.NewStruct(map[string]interface{}{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx})
|
||||||
|
reporter.ResetSteps(1)
|
||||||
|
|
||||||
|
// Fire a log entry to create pending data
|
||||||
|
assert.NoError(t, reporter.Fire(&log.Entry{
|
||||||
|
Message: "build output",
|
||||||
|
Data: log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true},
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Step 1: RunDaemon calls ReportLog(false) — runner is still alive
|
||||||
|
assert.NoError(t, reporter.ReportLog(false))
|
||||||
|
|
||||||
|
// Step 2: Close() updates state — sets Result=FAILURE and marks steps cancelled.
|
||||||
|
// In the real race, this happens while RunDaemon is between ReportLog and ReportState.
|
||||||
|
reporter.stateMu.Lock()
|
||||||
|
reporter.closed = true
|
||||||
|
for _, v := range reporter.state.Steps {
|
||||||
|
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||||
|
v.Result = runnerv1.Result_RESULT_CANCELLED
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reporter.state.Result = runnerv1.Result_RESULT_FAILURE
|
||||||
|
reporter.logRows = append(reporter.logRows, &runnerv1.LogRow{
|
||||||
|
Time: timestamppb.Now(),
|
||||||
|
Content: "Early termination",
|
||||||
|
})
|
||||||
|
reporter.state.StoppedAt = timestamppb.Now()
|
||||||
|
reporter.stateMu.Unlock()
|
||||||
|
|
||||||
|
// Step 3: RunDaemon's ReportState() — with the fix, this returns early
|
||||||
|
// because closed=true, preventing the server from deleting the runner.
|
||||||
|
assert.NoError(t, reporter.ReportState(false))
|
||||||
|
assert.False(t, runnerDeleted, "runner must not be deleted by RunDaemon's ReportState")
|
||||||
|
|
||||||
|
// Step 4: Close's final log upload succeeds because the runner is still alive.
|
||||||
|
// Flush pending rows first, then send the noMore signal (matching Close's retry behavior).
|
||||||
|
assert.NoError(t, reporter.ReportLog(false))
|
||||||
|
// Acknowledge Close as done in daemon
|
||||||
|
close(reporter.daemon)
|
||||||
|
err = reporter.ReportLog(true)
|
||||||
|
assert.NoError(t, err, "final log upload must not fail: runner should not be deleted before Close finishes sending logs")
|
||||||
|
err = reporter.ReportState(true)
|
||||||
|
assert.NoError(t, err, "final state update should work: runner should not be deleted before Close finishes sending logs")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReporter_RunDaemonClose_Race(t *testing.T) {
|
||||||
|
client := mocks.NewClient(t)
|
||||||
|
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
|
||||||
|
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
|
||||||
|
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
|
||||||
|
}), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||||
|
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
taskCtx, err := structpb.NewStruct(map[string]interface{}{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{
|
||||||
|
Context: taskCtx,
|
||||||
|
})
|
||||||
|
reporter.ResetSteps(1)
|
||||||
|
|
||||||
|
// Start the daemon loop in a separate goroutine.
|
||||||
|
// RunDaemon reads r.closed and reschedules itself via time.AfterFunc.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
reporter.RunDaemon()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Close concurrently — this races with RunDaemon on r.closed.
|
||||||
|
assert.NoError(t, reporter.Close(""))
|
||||||
|
|
||||||
|
// Cancel context so pending AfterFunc callbacks exit quickly.
|
||||||
|
cancel()
|
||||||
|
wg.Wait()
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user