mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-02-04 12:24:48 +00:00
Compare commits
8 Commits
v0.1.2
...
acc5afc428
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
acc5afc428 | ||
|
|
27a1a90d25 | ||
|
|
83ec0ba909 | ||
|
|
ed86e2f15a | ||
|
|
d4bebccc12 | ||
|
|
c75b67e892 | ||
|
|
bc6031eff7 | ||
|
|
c69c353d93 |
@@ -4,17 +4,34 @@ on:
|
||||
push:
|
||||
branches: [ main ]
|
||||
|
||||
env:
|
||||
GOPATH: /go_path
|
||||
GOCACHE: /go_cache
|
||||
|
||||
jobs:
|
||||
goreleaser:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
fetch-depth: 0
|
||||
- run: git fetch --force --tags
|
||||
fetch-depth: 0 # all history for all branches and tags
|
||||
- uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '>=1.20.1'
|
||||
- uses: https://gitea.com/actions/go-hashfiles@v0.0.1
|
||||
id: hash-go
|
||||
with:
|
||||
patterns: |
|
||||
go.mod
|
||||
go.sum
|
||||
- name: cache go
|
||||
id: cache-go
|
||||
uses: https://github.com/actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
/go_path
|
||||
/go_cache
|
||||
key: go_path-${{ steps.hash-go.outputs.hash }}
|
||||
- name: goreleaser
|
||||
uses: https://github.com/goreleaser/goreleaser-action@v4
|
||||
with:
|
||||
|
||||
@@ -5,17 +5,34 @@ on:
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
env:
|
||||
GOPATH: /go_path
|
||||
GOCACHE: /go_cache
|
||||
|
||||
jobs:
|
||||
goreleaser:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
fetch-depth: 0
|
||||
- run: git fetch --force --tags
|
||||
fetch-depth: 0 # all history for all branches and tags
|
||||
- uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '>=1.20.1'
|
||||
- uses: https://gitea.com/actions/go-hashfiles@v0.0.1
|
||||
id: hash-go
|
||||
with:
|
||||
patterns: |
|
||||
go.mod
|
||||
go.sum
|
||||
- name: cache go
|
||||
id: cache-go
|
||||
uses: https://github.com/actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
/go_path
|
||||
/go_cache
|
||||
key: go_path-${{ steps.hash-go.outputs.hash }}
|
||||
- name: Import GPG key
|
||||
id: import_gpg
|
||||
uses: https://github.com/crazy-max/ghaction-import-gpg@v5
|
||||
|
||||
@@ -4,7 +4,6 @@ on:
|
||||
- pull_request
|
||||
|
||||
env:
|
||||
GOPROXY: https://goproxy.io,direct
|
||||
GOPATH: /go_path
|
||||
GOCACHE: /go_cache
|
||||
|
||||
@@ -13,31 +12,27 @@ jobs:
|
||||
name: check and test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: cache go path
|
||||
id: cache-go-path
|
||||
uses: https://github.com/actions/cache@v3
|
||||
with:
|
||||
path: /go_path
|
||||
key: go_path-${{ github.repository }}-${{ github.ref_name }}
|
||||
restore-keys: |
|
||||
go_path-${{ github.repository }}-
|
||||
go_path-
|
||||
- name: cache go cache
|
||||
id: cache-go-cache
|
||||
uses: https://github.com/actions/cache@v3
|
||||
with:
|
||||
path: /go_cache
|
||||
key: go_cache-${{ github.repository }}-${{ github.ref_name }}
|
||||
restore-keys: |
|
||||
go_cache-${{ github.repository }}-
|
||||
go_cache-
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.20
|
||||
- uses: actions/checkout@v3
|
||||
go-version: '>=1.20.1'
|
||||
- uses: https://gitea.com/actions/go-hashfiles@v0.0.1
|
||||
id: hash-go
|
||||
with:
|
||||
patterns: |
|
||||
go.mod
|
||||
go.sum
|
||||
- name: cache go
|
||||
id: cache-go
|
||||
uses: https://github.com/actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
/go_path
|
||||
/go_cache
|
||||
key: go_path-${{ steps.hash-go.outputs.hash }}
|
||||
- name: vet checks
|
||||
run: make vet
|
||||
- name: build
|
||||
run: make build
|
||||
- name: test
|
||||
run: make test
|
||||
run: make test
|
||||
|
||||
5
go.mod
5
go.mod
@@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner
|
||||
go 1.20
|
||||
|
||||
require (
|
||||
code.gitea.io/actions-proto-go v0.2.0
|
||||
code.gitea.io/actions-proto-go v0.2.1
|
||||
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5
|
||||
github.com/avast/retry-go/v4 v4.3.1
|
||||
github.com/bufbuild/connect-go v1.3.1
|
||||
@@ -12,6 +12,7 @@ require (
|
||||
github.com/go-chi/render v1.0.2
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/mattn/go-isatty v0.0.17
|
||||
github.com/mattn/go-sqlite3 v1.14.9
|
||||
github.com/nektos/act v0.0.0
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
github.com/spf13/cobra v1.6.1
|
||||
@@ -108,4 +109,4 @@ require (
|
||||
modernc.org/token v1.0.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/nektos/act => gitea.com/gitea/act v0.243.2-0.20230323041428-929ea6df751b
|
||||
replace github.com/nektos/act => gitea.com/gitea/act v0.243.3-0.20230420082431-e12252a43a3f
|
||||
|
||||
8
go.sum
8
go.sum
@@ -1,11 +1,11 @@
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
code.gitea.io/actions-proto-go v0.2.0 h1:nYh9nhhfk67YA4wVNLsCzd//RCvXnljwXClJ33+HPVk=
|
||||
code.gitea.io/actions-proto-go v0.2.0/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A=
|
||||
code.gitea.io/actions-proto-go v0.2.1 h1:ToMN/8thz2q10TuCq8dL2d8mI+/pWpJcHCvG+TELwa0=
|
||||
code.gitea.io/actions-proto-go v0.2.1/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A=
|
||||
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5 h1:daBEK2GQeqGikJESctP5Cu1i33z5ztAD4kyQWiw185M=
|
||||
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE=
|
||||
gitea.com/gitea/act v0.243.2-0.20230323041428-929ea6df751b h1:AFuo+/Avbvo4JAId/K0rb0RHh/OQfFGMfQQWQ5SQcNA=
|
||||
gitea.com/gitea/act v0.243.2-0.20230323041428-929ea6df751b/go.mod h1:iLHCXqOPUElA2nSyHo4wtxSmvdkym3WU7CkP3AxF39Q=
|
||||
gitea.com/gitea/act v0.243.3-0.20230420082431-e12252a43a3f h1:Tbel/BObxGRyvx2q5GwBVg9IaJldhtVoonjwaQAARHU=
|
||||
gitea.com/gitea/act v0.243.3-0.20230420082431-e12252a43a3f/go.mod h1:mabw6AZAiDgxGlK83orWLrNERSPvgBJzEUS3S7u2bHI=
|
||||
gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a h1:lSA0F4e9A2NcQSqGqTOXqu2aRi/XEQxDCBwM8yJtE6s=
|
||||
gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0pAQhH8yz+DNjUbjppKQzKFAn28TMYPB6IU=
|
||||
gitee.com/travelliu/dm v1.8.11192/go.mod h1:DHTzyhCrM843x9VdKVbZ+GKXGRbKM2sJ4LxihRxShkE=
|
||||
|
||||
11
internal/app/artifactcache/db_cgo.go
Normal file
11
internal/app/artifactcache/db_cgo.go
Normal file
@@ -0,0 +1,11 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
//go:build cgo
|
||||
// +build cgo
|
||||
|
||||
package artifactcache
|
||||
|
||||
import _ "github.com/mattn/go-sqlite3"
|
||||
|
||||
var sqliteDriverName = "sqlite3"
|
||||
11
internal/app/artifactcache/db_nocgo.go
Normal file
11
internal/app/artifactcache/db_nocgo.go
Normal file
@@ -0,0 +1,11 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
//go:build !cgo
|
||||
// +build !cgo
|
||||
|
||||
package artifactcache
|
||||
|
||||
import _ "modernc.org/sqlite"
|
||||
|
||||
var sqliteDriverName = "sqlite"
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
"github.com/go-chi/render"
|
||||
log "github.com/sirupsen/logrus"
|
||||
_ "modernc.org/sqlite"
|
||||
"xorm.io/builder"
|
||||
"xorm.io/xorm"
|
||||
)
|
||||
@@ -56,7 +55,7 @@ func StartHandler(dir, outboundIP string, port uint16) (*Handler, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e, err := xorm.NewEngine("sqlite", filepath.Join(dir, "sqlite.db"))
|
||||
e, err := xorm.NewEngine(sqliteDriverName, filepath.Join(dir, "sqlite.db"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -112,16 +111,11 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
|
||||
|
||||
reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue())
|
||||
|
||||
workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
|
||||
workflow, jobID, err := generateWorkflow(task)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jobIDs := workflow.GetJobIDs()
|
||||
if len(jobIDs) != 1 {
|
||||
return fmt.Errorf("multiple jobs found: %v", jobIDs)
|
||||
}
|
||||
jobID := jobIDs[0]
|
||||
plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -209,5 +203,7 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
|
||||
// add logger recorders
|
||||
ctx = common.WithLoggerHook(ctx, reporter)
|
||||
|
||||
return executor(ctx)
|
||||
execErr := executor(ctx)
|
||||
reporter.SetOutputs(job.Outputs)
|
||||
return execErr
|
||||
}
|
||||
|
||||
54
internal/app/run/workflow.go
Normal file
54
internal/app/run/workflow.go
Normal file
@@ -0,0 +1,54 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package run
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
"github.com/nektos/act/pkg/model"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func generateWorkflow(task *runnerv1.Task) (*model.Workflow, string, error) {
|
||||
workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
jobIDs := workflow.GetJobIDs()
|
||||
if len(jobIDs) != 1 {
|
||||
return nil, "", fmt.Errorf("multiple jobs found: %v", jobIDs)
|
||||
}
|
||||
jobID := jobIDs[0]
|
||||
|
||||
needJobIDs := make([]string, 0, len(task.Needs))
|
||||
for id, need := range task.Needs {
|
||||
needJobIDs = append(needJobIDs, id)
|
||||
needJob := &model.Job{
|
||||
Outputs: need.Outputs,
|
||||
Result: strings.ToLower(strings.TrimPrefix(need.Result.String(), "RESULT_")),
|
||||
}
|
||||
workflow.Jobs[id] = needJob
|
||||
}
|
||||
sort.Strings(needJobIDs)
|
||||
|
||||
rawNeeds := yaml.Node{
|
||||
Kind: yaml.SequenceNode,
|
||||
Content: make([]*yaml.Node, 0, len(needJobIDs)),
|
||||
}
|
||||
for _, id := range needJobIDs {
|
||||
rawNeeds.Content = append(rawNeeds.Content, &yaml.Node{
|
||||
Kind: yaml.ScalarNode,
|
||||
Value: id,
|
||||
})
|
||||
}
|
||||
|
||||
workflow.Jobs[jobID].RawNeeds = rawNeeds
|
||||
|
||||
return workflow, jobID, nil
|
||||
}
|
||||
74
internal/app/run/workflow_test.go
Normal file
74
internal/app/run/workflow_test.go
Normal file
@@ -0,0 +1,74 @@
|
||||
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package run
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
"github.com/nektos/act/pkg/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gotest.tools/v3/assert"
|
||||
)
|
||||
|
||||
func Test_generateWorkflow(t *testing.T) {
|
||||
type args struct {
|
||||
task *runnerv1.Task
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
assert func(t *testing.T, wf *model.Workflow)
|
||||
want1 string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "has needs",
|
||||
args: args{
|
||||
task: &runnerv1.Task{
|
||||
WorkflowPayload: []byte(`
|
||||
name: Build and deploy
|
||||
on: push
|
||||
|
||||
jobs:
|
||||
job9:
|
||||
needs: build
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- run: ./deploy --build ${{ needs.job1.outputs.output1 }}
|
||||
- run: ./deploy --build ${{ needs.job2.outputs.output2 }}
|
||||
`),
|
||||
Needs: map[string]*runnerv1.TaskNeed{
|
||||
"job1": {
|
||||
Outputs: map[string]string{
|
||||
"output1": "output1 value",
|
||||
},
|
||||
Result: runnerv1.Result_RESULT_SUCCESS,
|
||||
},
|
||||
"job2": {
|
||||
Outputs: map[string]string{
|
||||
"output2": "output2 value",
|
||||
},
|
||||
Result: runnerv1.Result_RESULT_SUCCESS,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
assert: func(t *testing.T, wf *model.Workflow) {
|
||||
assert.DeepEqual(t, wf.GetJob("job9").Needs(), []string{"job1", "job2"})
|
||||
},
|
||||
want1: "job9",
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, got1, err := generateWorkflow(tt.args.task)
|
||||
require.NoError(t, err)
|
||||
tt.assert(t, got)
|
||||
assert.Equal(t, got1, tt.want1)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -31,8 +31,10 @@ type Reporter struct {
|
||||
logOffset int
|
||||
logRows []*runnerv1.LogRow
|
||||
logReplacer *strings.Replacer
|
||||
state *runnerv1.TaskState
|
||||
stateM sync.RWMutex
|
||||
|
||||
state *runnerv1.TaskState
|
||||
stateMu sync.RWMutex
|
||||
outputs sync.Map
|
||||
}
|
||||
|
||||
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter {
|
||||
@@ -56,8 +58,8 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
|
||||
}
|
||||
|
||||
func (r *Reporter) ResetSteps(l int) {
|
||||
r.stateM.Lock()
|
||||
defer r.stateM.Unlock()
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
for i := 0; i < l; i++ {
|
||||
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
|
||||
Id: int64(i),
|
||||
@@ -70,8 +72,8 @@ func (r *Reporter) Levels() []log.Level {
|
||||
}
|
||||
|
||||
func (r *Reporter) Fire(entry *log.Entry) error {
|
||||
r.stateM.Lock()
|
||||
defer r.stateM.Unlock()
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
|
||||
log.WithFields(entry.Data).Trace(entry.Message)
|
||||
|
||||
@@ -155,9 +157,13 @@ func (r *Reporter) RunDaemon() {
|
||||
}
|
||||
|
||||
func (r *Reporter) Logf(format string, a ...interface{}) {
|
||||
r.stateM.Lock()
|
||||
defer r.stateM.Unlock()
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
|
||||
r.logf(format, a...)
|
||||
}
|
||||
|
||||
func (r *Reporter) logf(format string, a ...interface{}) {
|
||||
if !r.duringSteps() {
|
||||
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
||||
Time: timestamppb.Now(),
|
||||
@@ -166,10 +172,30 @@ func (r *Reporter) Logf(format string, a ...interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) SetOutputs(outputs map[string]string) {
|
||||
r.stateMu.Lock()
|
||||
defer r.stateMu.Unlock()
|
||||
|
||||
for k, v := range outputs {
|
||||
if len(k) > 255 {
|
||||
r.logf("ignore output because the key is too long: %q", k)
|
||||
continue
|
||||
}
|
||||
if l := len(v); l > 1024*1024 {
|
||||
log.Println("ignore output because the value is too long:", k, l)
|
||||
r.logf("ignore output because the value %q is too long: %d", k, l)
|
||||
}
|
||||
if _, ok := r.outputs.Load(k); ok {
|
||||
continue
|
||||
}
|
||||
r.outputs.Store(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) Close(lastWords string) error {
|
||||
r.closed = true
|
||||
|
||||
r.stateM.Lock()
|
||||
r.stateMu.Lock()
|
||||
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
|
||||
if lastWords == "" {
|
||||
lastWords = "Early termination"
|
||||
@@ -184,14 +210,14 @@ func (r *Reporter) Close(lastWords string) error {
|
||||
Time: timestamppb.Now(),
|
||||
Content: lastWords,
|
||||
})
|
||||
return nil
|
||||
r.state.StoppedAt = timestamppb.Now()
|
||||
} else if lastWords != "" {
|
||||
r.logRows = append(r.logRows, &runnerv1.LogRow{
|
||||
Time: timestamppb.Now(),
|
||||
Content: lastWords,
|
||||
})
|
||||
}
|
||||
r.stateM.Unlock()
|
||||
r.stateMu.Unlock()
|
||||
|
||||
return retry.Do(func() error {
|
||||
if err := r.ReportLog(true); err != nil {
|
||||
@@ -205,9 +231,9 @@ func (r *Reporter) ReportLog(noMore bool) error {
|
||||
r.clientM.Lock()
|
||||
defer r.clientM.Unlock()
|
||||
|
||||
r.stateM.RLock()
|
||||
r.stateMu.RLock()
|
||||
rows := r.logRows
|
||||
r.stateM.RUnlock()
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
|
||||
TaskId: r.state.Id,
|
||||
@@ -224,10 +250,10 @@ func (r *Reporter) ReportLog(noMore bool) error {
|
||||
return fmt.Errorf("submitted logs are lost")
|
||||
}
|
||||
|
||||
r.stateM.Lock()
|
||||
r.stateMu.Lock()
|
||||
r.logRows = r.logRows[ack-r.logOffset:]
|
||||
r.logOffset = ack
|
||||
r.stateM.Unlock()
|
||||
r.stateMu.Unlock()
|
||||
|
||||
if noMore && ack < r.logOffset+len(rows) {
|
||||
return fmt.Errorf("not all logs are submitted")
|
||||
@@ -240,21 +266,45 @@ func (r *Reporter) ReportState() error {
|
||||
r.clientM.Lock()
|
||||
defer r.clientM.Unlock()
|
||||
|
||||
r.stateM.RLock()
|
||||
r.stateMu.RLock()
|
||||
state := proto.Clone(r.state).(*runnerv1.TaskState)
|
||||
r.stateM.RUnlock()
|
||||
r.stateMu.RUnlock()
|
||||
|
||||
outputs := make(map[string]string)
|
||||
r.outputs.Range(func(k, v interface{}) bool {
|
||||
if val, ok := v.(string); ok {
|
||||
outputs[k.(string)] = val
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
||||
State: state,
|
||||
State: state,
|
||||
Outputs: outputs,
|
||||
}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, k := range resp.Msg.SentOutputs {
|
||||
r.outputs.Store(k, struct{}{})
|
||||
}
|
||||
|
||||
if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
|
||||
r.cancel()
|
||||
}
|
||||
|
||||
var noSent []string
|
||||
r.outputs.Range(func(k, v interface{}) bool {
|
||||
if _, ok := v.(string); ok {
|
||||
noSent = append(noSent, k.(string))
|
||||
}
|
||||
return true
|
||||
})
|
||||
if len(noSent) > 0 {
|
||||
return fmt.Errorf("there are still outputs that have not been sent: %v", noSent)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user