From 822af5029f28f8f04eaf52cae5c75d57a4c15bcd Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 11 Jun 2026 09:00:31 +0000 Subject: [PATCH] feat: complete runner-side cancellation handling (#1016) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the runner side of the cancellation flow, superseding #825. Two parts: ### 1. Report cancellations correctly (`fix`) When `Reporter.Close` ran with the state still `UNSPECIFIED` and the reporter's context had been cancelled, the synthesised final state attributed the job to `RESULT_FAILURE` with an "Early termination" log row — misreporting a cancellation as a generic failure. `Close` now detects the cancelled context and finalizes the task as `RESULT_CANCELLED`. ### 2. Advertise the `cancelling` capability (`feat`) [actions-proto-go v0.6.0](https://gitea.com/gitea/actions-proto-go) adds a `capabilities` field to `RegisterRequest`/`DeclareRequest`, so the runner can now tell the server it understands the transitional cancelling state: - Bumps `gitea.dev/actions-proto-go` to `v0.6.0`. - Adds a single `RunnerCapabilities()` source of truth exposing `CapabilityCancelling`. - Sends `Capabilities` on both register and declare. With this the server records `HasCancellingSupport` and can rely on the runner running post-step cleanup before a task is finalized as `RESULT_CANCELLED`. ## Compatibility Wire-compatible against older servers: the new field uses a previously unused field number (8 on `RegisterRequest`, 3 on `DeclareRequest`) and the client uses the binary protobuf codec, so a server predating the field silently ignores it — registration and declaration succeed and the feature simply stays off. It activates only once both runner and server are on v0.6.0. ## Server side The matching Gitea change (read `GetCapabilities()`, persist `HasCancellingSupport`) is a separate PR against `gitea/gitea`. Supersedes #825. Reviewed-on: https://gitea.com/gitea/runner/pulls/1016 Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com> Reviewed-by: wxiaoguang <29147+wxiaoguang@noreply.gitea.com> --- go.mod | 2 +- go.sum | 12 +---- internal/app/cmd/register.go | 12 +++-- internal/app/run/runner.go | 17 ++++++- internal/pkg/report/reporter.go | 17 ++++++- internal/pkg/report/reporter_test.go | 71 ++++++++++++++++++++++++++++ 6 files changed, 111 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index 166cbb59..e3bb32e6 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.26.0 require ( connectrpc.com/connect v1.20.0 dario.cat/mergo v1.0.2 - gitea.dev/actions-proto-go v0.5.0 + gitea.dev/actions-proto-go v0.6.0 github.com/Masterminds/semver v1.5.0 github.com/avast/retry-go/v5 v5.0.0 github.com/containerd/errdefs v1.0.0 diff --git a/go.sum b/go.sum index 9d6807c3..a6945151 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ cyphar.com/go-pathrs v0.2.3 h1:0pH8gep37wB0BgaXrEaN1OtZhUMeS7VvaejSr6i822o= cyphar.com/go-pathrs v0.2.3/go.mod h1:y8f1EMG7r+hCuFf/rXsKqMJrJAUoADZGNh5/vZPKcGc= dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= -gitea.dev/actions-proto-go v0.5.0 h1:Fc3DI4Fm3B3JBRXFUjegql+usoNAjjAw1cxMansfA2I= -gitea.dev/actions-proto-go v0.5.0/go.mod h1:p4RX+D9oqiEEzzkPMXscw2CmaGuYFPWFc6xIOmDNDqs= +gitea.dev/actions-proto-go v0.6.0 h1:gjllYQ5vmwlkqOeofTQu5qKTZpmf7kWsafoHvoPCSzY= +gitea.dev/actions-proto-go v0.6.0/go.mod h1:p4RX+D9oqiEEzzkPMXscw2CmaGuYFPWFc6xIOmDNDqs= github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk= github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -47,8 +47,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/cli v29.5.2+incompatible h1:ubykJ1Y8LmNRGJ2BuMQ0kHOt/RO1YzGNswqWMJgivuQ= -github.com/docker/cli v29.5.2+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/cli v29.5.3+incompatible h1:nbEFfz774vBwQ5KRYv7c/AghjReqnGISvrRhzjV0evs= github.com/docker/cli v29.5.3+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/docker-credential-helpers v0.9.6 h1:cT2PbRPSlnMmNTfT2TDMXRyQ1KMWHG7xoTLBcn1ZNv0= @@ -149,8 +147,6 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= -github.com/opencontainers/selinux v1.15.0 h1:4Gs40e/R2FvM8PC1HPaPncLLaDor8Y2WDfk5gjU9o5M= -github.com/opencontainers/selinux v1.15.0/go.mod h1:LenyElirjUHszfxrjuFqC85HIeXZKumHcKMQtnaDlQQ= github.com/opencontainers/selinux v1.15.1 h1:ERxeh5caJvCzNAKdI8WQbJmB1LDTn4BuaAg8wihLBpA= github.com/opencontainers/selinux v1.15.1/go.mod h1:LenyElirjUHszfxrjuFqC85HIeXZKumHcKMQtnaDlQQ= github.com/pjbgf/sha1cd v0.6.0 h1:3WJ8Wz8gvDz29quX1OcEmkAlUg9diU4GxJHqs0/XiwU= @@ -254,10 +250,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= -golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY= -golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/app/cmd/register.go b/internal/app/cmd/register.go index 9ba5228c..2063c366 100644 --- a/internal/app/cmd/register.go +++ b/internal/app/cmd/register.go @@ -14,6 +14,7 @@ import ( "strings" "time" + "gitea.com/gitea/runner/internal/app/run" "gitea.com/gitea/runner/internal/pkg/client" "gitea.com/gitea/runner/internal/pkg/config" "gitea.com/gitea/runner/internal/pkg/labels" @@ -365,11 +366,12 @@ func doRegister(ctx context.Context, cfg *config.Config, inputs *registerInputs) } // register new runner. resp, err := cli.Register(ctx, connect.NewRequest(&runnerv1.RegisterRequest{ - Name: reg.Name, - Token: reg.Token, - Version: ver.Version(), - Labels: ls, - Ephemeral: reg.Ephemeral, + Name: reg.Name, + Token: reg.Token, + Version: ver.Version(), + Labels: ls, + Ephemeral: reg.Ephemeral, + Capabilities: run.RunnerCapabilities(), })) if err != nil { log.WithError(err).Error("poller: cannot register new runner") diff --git a/internal/app/run/runner.go b/internal/app/run/runner.go index 9bcb9f4a..69e8c2cc 100644 --- a/internal/app/run/runner.go +++ b/internal/app/run/runner.go @@ -37,6 +37,18 @@ import ( log "github.com/sirupsen/logrus" ) +// CapabilityCancelling tells the server this runner understands the +// transitional cancelling state and will run post-step cleanup before +// finalizing a task as RESULT_CANCELLED. +const CapabilityCancelling = "cancelling" + +// RunnerCapabilities are the capability flags this runner advertises to the +// server during registration and declaration. The server uses them to enable +// transitional features that require runner-side support. +func RunnerCapabilities() []string { + return []string{CapabilityCancelling} +} + // Runner runs the pipeline. type Runner struct { name string @@ -504,7 +516,8 @@ func (r *Runner) RunningCount() int64 { func (r *Runner) Declare(ctx context.Context, labels []string) (*connect.Response[runnerv1.DeclareResponse], error) { return r.client.Declare(ctx, connect.NewRequest(&runnerv1.DeclareRequest{ - Version: ver.Version(), - Labels: labels, + Version: ver.Version(), + Labels: labels, + Capabilities: RunnerCapabilities(), })) } diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index b911b564..6cc0c353 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -391,15 +391,28 @@ func (r *Reporter) Close(lastWords string) error { r.stateMu.Lock() r.closed = true if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED { + // When r.ctx has been cancelled (server returned RESULT_CANCELLED via + // rpcCtx/ReportState, see line 590) the job is being torn down on the + // cancellation path: surface that explicitly instead of attributing it + // to a generic failure. + cancelled := errors.Is(r.ctx.Err(), context.Canceled) if lastWords == "" { - lastWords = "Early termination" + if cancelled { + lastWords = "Cancelled" + } else { + lastWords = "Early termination" + } } for _, v := range r.state.Steps { if v.Result == runnerv1.Result_RESULT_UNSPECIFIED { v.Result = runnerv1.Result_RESULT_CANCELLED } } - r.state.Result = runnerv1.Result_RESULT_FAILURE + if cancelled { + r.state.Result = runnerv1.Result_RESULT_CANCELLED + } else { + r.state.Result = runnerv1.Result_RESULT_FAILURE + } r.logRows = append(r.logRows, &runnerv1.LogRow{ Time: timestamppb.Now(), Content: lastWords, diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index 5c031b71..606e1eaa 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -850,3 +850,74 @@ func TestReporter_ServerCancelStillFlushesFinal(t *testing.T) { assert.True(t, finalLogNoMoreSeen.Load(), "Close() must send a final UpdateLog{NoMore:true} even after server-side cancellation") assert.True(t, finalTaskStateSeen.Load(), "Close() must send a final UpdateTask with the populated final state even after server-side cancellation") } + +// TestReporter_CloseReportsCancelledOnCanceledCtx asserts that when Close() +// runs on a reporter whose state has not been finalised AND whose context has +// been cancelled, the synthesised final state carries RESULT_CANCELLED and +// the appended log row reads "Cancelled" — not RESULT_FAILURE / "Early +// termination". This is the runner-side half of the Running -> Cancelling -> +// Cancelled flow: it gives Gitea an explicit cancel acknowledgement rather +// than a generic failure when the job is torn down on the cancel path. +func TestReporter_CloseReportsCancelledOnCanceledCtx(t *testing.T) { + var finalState atomic.Pointer[runnerv1.TaskState] + var finalLogRows atomic.Pointer[[]*runnerv1.LogRow] + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + if req.Msg.NoMore { + rows := append([]*runnerv1.LogRow(nil), req.Msg.Rows...) + finalLogRows.Store(&rows) + } + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED { + finalState.Store(req.Msg.State) + } + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + cfg, _ := config.LoadDefault("") + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + + // Simulate the cancellation path: r.ctx is cancelled before Close() runs. + cancel() + + // Skip the daemon wait inside Close(). + close(reporter.daemon) + + // Empty lastWords so Close() picks the synthesised value. + require.NoError(t, reporter.Close("")) + + got := finalState.Load() + require.NotNil(t, got, "Close() must send a final UpdateTask") + assert.Equal(t, runnerv1.Result_RESULT_CANCELLED, got.Result, + "final Result must be RESULT_CANCELLED when r.ctx is cancelled, not RESULT_FAILURE") + require.Len(t, got.Steps, 1) + assert.Equal(t, runnerv1.Result_RESULT_CANCELLED, got.Steps[0].Result, + "unfinished steps must be marked RESULT_CANCELLED") + + rows := finalLogRows.Load() + require.NotNil(t, rows, "Close() must send a final UpdateLog{NoMore:true}") + var foundCancelled, foundEarlyTermination bool + for _, r := range *rows { + if r.Content == "Cancelled" { + foundCancelled = true + } + if r.Content == "Early termination" { + foundEarlyTermination = true + } + } + assert.True(t, foundCancelled, "final log must contain a 'Cancelled' row") + assert.False(t, foundEarlyTermination, "final log must not contain 'Early termination' on the cancel path") +}