mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-06-13 13:24:23 +02:00
feat: complete runner-side cancellation handling (#1016)
Completes the runner side of the cancellation flow, superseding #825. Two parts: ### 1. Report cancellations correctly (`fix`) When `Reporter.Close` ran with the state still `UNSPECIFIED` and the reporter's context had been cancelled, the synthesised final state attributed the job to `RESULT_FAILURE` with an "Early termination" log row — misreporting a cancellation as a generic failure. `Close` now detects the cancelled context and finalizes the task as `RESULT_CANCELLED`. ### 2. Advertise the `cancelling` capability (`feat`) [actions-proto-go v0.6.0](https://gitea.com/gitea/actions-proto-go) adds a `capabilities` field to `RegisterRequest`/`DeclareRequest`, so the runner can now tell the server it understands the transitional cancelling state: - Bumps `gitea.dev/actions-proto-go` to `v0.6.0`. - Adds a single `RunnerCapabilities()` source of truth exposing `CapabilityCancelling`. - Sends `Capabilities` on both register and declare. With this the server records `HasCancellingSupport` and can rely on the runner running post-step cleanup before a task is finalized as `RESULT_CANCELLED`. ## Compatibility Wire-compatible against older servers: the new field uses a previously unused field number (8 on `RegisterRequest`, 3 on `DeclareRequest`) and the client uses the binary protobuf codec, so a server predating the field silently ignores it — registration and declaration succeed and the feature simply stays off. It activates only once both runner and server are on v0.6.0. ## Server side The matching Gitea change (read `GetCapabilities()`, persist `HasCancellingSupport`) is a separate PR against `gitea/gitea`. Supersedes #825. Reviewed-on: https://gitea.com/gitea/runner/pulls/1016 Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com> Reviewed-by: wxiaoguang <29147+wxiaoguang@noreply.gitea.com>
This commit is contained in:
2
go.mod
2
go.mod
@@ -5,7 +5,7 @@ go 1.26.0
|
||||
require (
|
||||
connectrpc.com/connect v1.20.0
|
||||
dario.cat/mergo v1.0.2
|
||||
gitea.dev/actions-proto-go v0.5.0
|
||||
gitea.dev/actions-proto-go v0.6.0
|
||||
github.com/Masterminds/semver v1.5.0
|
||||
github.com/avast/retry-go/v5 v5.0.0
|
||||
github.com/containerd/errdefs v1.0.0
|
||||
|
||||
12
go.sum
12
go.sum
@@ -4,8 +4,8 @@ cyphar.com/go-pathrs v0.2.3 h1:0pH8gep37wB0BgaXrEaN1OtZhUMeS7VvaejSr6i822o=
|
||||
cyphar.com/go-pathrs v0.2.3/go.mod h1:y8f1EMG7r+hCuFf/rXsKqMJrJAUoADZGNh5/vZPKcGc=
|
||||
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
|
||||
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
|
||||
gitea.dev/actions-proto-go v0.5.0 h1:Fc3DI4Fm3B3JBRXFUjegql+usoNAjjAw1cxMansfA2I=
|
||||
gitea.dev/actions-proto-go v0.5.0/go.mod h1:p4RX+D9oqiEEzzkPMXscw2CmaGuYFPWFc6xIOmDNDqs=
|
||||
gitea.dev/actions-proto-go v0.6.0 h1:gjllYQ5vmwlkqOeofTQu5qKTZpmf7kWsafoHvoPCSzY=
|
||||
gitea.dev/actions-proto-go v0.6.0/go.mod h1:p4RX+D9oqiEEzzkPMXscw2CmaGuYFPWFc6xIOmDNDqs=
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
|
||||
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
|
||||
@@ -47,8 +47,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
|
||||
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||
github.com/docker/cli v29.5.2+incompatible h1:ubykJ1Y8LmNRGJ2BuMQ0kHOt/RO1YzGNswqWMJgivuQ=
|
||||
github.com/docker/cli v29.5.2+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
|
||||
github.com/docker/cli v29.5.3+incompatible h1:nbEFfz774vBwQ5KRYv7c/AghjReqnGISvrRhzjV0evs=
|
||||
github.com/docker/cli v29.5.3+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
|
||||
github.com/docker/docker-credential-helpers v0.9.6 h1:cT2PbRPSlnMmNTfT2TDMXRyQ1KMWHG7xoTLBcn1ZNv0=
|
||||
@@ -149,8 +147,6 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
|
||||
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
|
||||
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
|
||||
github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M=
|
||||
github.com/opencontainers/selinux v1.15.0 h1:4Gs40e/R2FvM8PC1HPaPncLLaDor8Y2WDfk5gjU9o5M=
|
||||
github.com/opencontainers/selinux v1.15.0/go.mod h1:LenyElirjUHszfxrjuFqC85HIeXZKumHcKMQtnaDlQQ=
|
||||
github.com/opencontainers/selinux v1.15.1 h1:ERxeh5caJvCzNAKdI8WQbJmB1LDTn4BuaAg8wihLBpA=
|
||||
github.com/opencontainers/selinux v1.15.1/go.mod h1:LenyElirjUHszfxrjuFqC85HIeXZKumHcKMQtnaDlQQ=
|
||||
github.com/pjbgf/sha1cd v0.6.0 h1:3WJ8Wz8gvDz29quX1OcEmkAlUg9diU4GxJHqs0/XiwU=
|
||||
@@ -254,10 +250,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
|
||||
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
|
||||
golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw=
|
||||
golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gitea.com/gitea/runner/internal/app/run"
|
||||
"gitea.com/gitea/runner/internal/pkg/client"
|
||||
"gitea.com/gitea/runner/internal/pkg/config"
|
||||
"gitea.com/gitea/runner/internal/pkg/labels"
|
||||
@@ -370,6 +371,7 @@ func doRegister(ctx context.Context, cfg *config.Config, inputs *registerInputs)
|
||||
Version: ver.Version(),
|
||||
Labels: ls,
|
||||
Ephemeral: reg.Ephemeral,
|
||||
Capabilities: run.RunnerCapabilities(),
|
||||
}))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("poller: cannot register new runner")
|
||||
|
||||
@@ -37,6 +37,18 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// CapabilityCancelling tells the server this runner understands the
|
||||
// transitional cancelling state and will run post-step cleanup before
|
||||
// finalizing a task as RESULT_CANCELLED.
|
||||
const CapabilityCancelling = "cancelling"
|
||||
|
||||
// RunnerCapabilities are the capability flags this runner advertises to the
|
||||
// server during registration and declaration. The server uses them to enable
|
||||
// transitional features that require runner-side support.
|
||||
func RunnerCapabilities() []string {
|
||||
return []string{CapabilityCancelling}
|
||||
}
|
||||
|
||||
// Runner runs the pipeline.
|
||||
type Runner struct {
|
||||
name string
|
||||
@@ -506,5 +518,6 @@ func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Respons
|
||||
return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{
|
||||
Version: ver.Version(),
|
||||
Labels: labels,
|
||||
Capabilities: RunnerCapabilities(),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -391,15 +391,28 @@ func (r *Reporter) Close(lastWords string) error {
|
||||
r.stateMu.Lock()
|
||||
r.closed = true
|
||||
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||
// When r.ctx has been cancelled (server returned RESULT_CANCELLED via
|
||||
// rpcCtx/ReportState, see line 590) the job is being torn down on the
|
||||
// cancellation path: surface that explicitly instead of attributing it
|
||||
// to a generic failure.
|
||||
cancelled := errors.Is(r.ctx.Err(), context.Canceled)
|
||||
if lastWords == "" {
|
||||
if cancelled {
|
||||
lastWords = "Cancelled"
|
||||
} else {
|
||||
lastWords = "Early termination"
|
||||
}
|
||||
}
|
||||
for _, v := range r.state.Steps {
|
||||
if v.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||
v.Result = runnerv1.Result_RESULT_CANCELLED
|
||||
}
|
||||
}
|
||||
if cancelled {
|
||||
r.state.Result = runnerv1.Result_RESULT_CANCELLED
|
||||
} else {
|
||||
r.state.Result = runnerv1.Result_RESULT_FAILURE
|
||||
}
|
||||
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
||||
Time: timestamppb.Now(),
|
||||
Content: lastWords,
|
||||
|
||||
@@ -850,3 +850,74 @@ func TestReporter_ServerCancelStillFlushesFinal(t *testing.T) {
|
||||
assert.True(t, finalLogNoMoreSeen.Load(), "Close() must send a final UpdateLog{NoMore:true} even after server-side cancellation")
|
||||
assert.True(t, finalTaskStateSeen.Load(), "Close() must send a final UpdateTask with the populated final state even after server-side cancellation")
|
||||
}
|
||||
|
||||
// TestReporter_CloseReportsCancelledOnCanceledCtx asserts that when Close()
|
||||
// runs on a reporter whose state has not been finalised AND whose context has
|
||||
// been cancelled, the synthesised final state carries RESULT_CANCELLED and
|
||||
// the appended log row reads "Cancelled" — not RESULT_FAILURE / "Early
|
||||
// termination". This is the runner-side half of the Running -> Cancelling ->
|
||||
// Cancelled flow: it gives Gitea an explicit cancel acknowledgement rather
|
||||
// than a generic failure when the job is torn down on the cancel path.
|
||||
func TestReporter_CloseReportsCancelledOnCanceledCtx(t *testing.T) {
|
||||
var finalState atomic.Pointer[runnerv1.TaskState]
|
||||
var finalLogRows atomic.Pointer[[]*runnerv1.LogRow]
|
||||
|
||||
client := mocks.NewClient(t)
|
||||
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||
if req.Msg.NoMore {
|
||||
rows := append([]*runnerv1.LogRow(nil), req.Msg.Rows...)
|
||||
finalLogRows.Store(&rows)
|
||||
}
|
||||
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
|
||||
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
|
||||
}), nil
|
||||
},
|
||||
)
|
||||
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||
if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
|
||||
finalState.Store(req.Msg.State)
|
||||
}
|
||||
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||
},
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||
require.NoError(t, err)
|
||||
cfg, _ := config.LoadDefault("")
|
||||
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||
reporter.ResetSteps(1)
|
||||
|
||||
// Simulate the cancellation path: r.ctx is cancelled before Close() runs.
|
||||
cancel()
|
||||
|
||||
// Skip the daemon wait inside Close().
|
||||
close(reporter.daemon)
|
||||
|
||||
// Empty lastWords so Close() picks the synthesised value.
|
||||
require.NoError(t, reporter.Close(""))
|
||||
|
||||
got := finalState.Load()
|
||||
require.NotNil(t, got, "Close() must send a final UpdateTask")
|
||||
assert.Equal(t, runnerv1.Result_RESULT_CANCELLED, got.Result,
|
||||
"final Result must be RESULT_CANCELLED when r.ctx is cancelled, not RESULT_FAILURE")
|
||||
require.Len(t, got.Steps, 1)
|
||||
assert.Equal(t, runnerv1.Result_RESULT_CANCELLED, got.Steps[0].Result,
|
||||
"unfinished steps must be marked RESULT_CANCELLED")
|
||||
|
||||
rows := finalLogRows.Load()
|
||||
require.NotNil(t, rows, "Close() must send a final UpdateLog{NoMore:true}")
|
||||
var foundCancelled, foundEarlyTermination bool
|
||||
for _, r := range *rows {
|
||||
if r.Content == "Cancelled" {
|
||||
foundCancelled = true
|
||||
}
|
||||
if r.Content == "Early termination" {
|
||||
foundEarlyTermination = true
|
||||
}
|
||||
}
|
||||
assert.True(t, foundCancelled, "final log must contain a 'Cancelled' row")
|
||||
assert.False(t, foundEarlyTermination, "final log must not contain 'Early termination' on the cancel path")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user