mirror of
https://gitea.com/gitea/act_runner.git
synced 2025-12-16 19:14:46 +00:00
Compare commits
11 Commits
c701ba4787
...
v0.2.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
12999b61dd | ||
|
|
49a2fcc138 | ||
|
|
8f88e4f15a | ||
|
|
a1bb3b56fd | ||
|
|
1a7ec5f339 | ||
|
|
5d01cb8904 | ||
|
|
dcf84d8a53 | ||
|
|
73adae040d | ||
|
|
db662b3690 | ||
|
|
cf92a979e2 | ||
|
|
87058716fb |
@@ -1,14 +1,11 @@
|
|||||||
linters:
|
linters:
|
||||||
enable:
|
enable:
|
||||||
- gosimple
|
- gosimple
|
||||||
- deadcode
|
|
||||||
- typecheck
|
- typecheck
|
||||||
- govet
|
- govet
|
||||||
- errcheck
|
- errcheck
|
||||||
- staticcheck
|
- staticcheck
|
||||||
- unused
|
- unused
|
||||||
- structcheck
|
|
||||||
- varcheck
|
|
||||||
- dupl
|
- dupl
|
||||||
#- gocyclo # The cyclomatic complexety of a lot of functions is too high, we should refactor those another time.
|
#- gocyclo # The cyclomatic complexety of a lot of functions is too high, we should refactor those another time.
|
||||||
- gofmt
|
- gofmt
|
||||||
@@ -112,7 +109,6 @@ issues:
|
|||||||
- gocritic
|
- gocritic
|
||||||
- linters:
|
- linters:
|
||||||
- unused
|
- unused
|
||||||
- deadcode
|
|
||||||
text: "swagger"
|
text: "swagger"
|
||||||
- path: contrib/pr/checkout.go
|
- path: contrib/pr/checkout.go
|
||||||
linters:
|
linters:
|
||||||
@@ -154,9 +150,6 @@ issues:
|
|||||||
- path: cmd/dump.go
|
- path: cmd/dump.go
|
||||||
linters:
|
linters:
|
||||||
- dupl
|
- dupl
|
||||||
- path: services/webhook/webhook.go
|
|
||||||
linters:
|
|
||||||
- structcheck
|
|
||||||
- text: "commentFormatting: put a space between `//` and comment text"
|
- text: "commentFormatting: put a space between `//` and comment text"
|
||||||
linters:
|
linters:
|
||||||
- gocritic
|
- gocritic
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM golang:1.20-alpine3.18 as builder
|
FROM golang:1.20.5-alpine3.18 as builder
|
||||||
# Do not remove `git` here, it is required for getting runner version when executing `make build`
|
# Do not remove `git` here, it is required for getting runner version when executing `make build`
|
||||||
RUN apk add --no-cache make git
|
RUN apk add --no-cache make git
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM golang:1.20-alpine3.18 as builder
|
FROM golang:1.20.5-alpine3.18 as builder
|
||||||
# Do not remove `git` here, it is required for getting runner version when executing `make build`
|
# Do not remove `git` here, it is required for getting runner version when executing `make build`
|
||||||
RUN apk add --no-cache make git
|
RUN apk add --no-cache make git
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,13 @@ make docker
|
|||||||
|
|
||||||
## Quickstart
|
## Quickstart
|
||||||
|
|
||||||
|
Actions are disabled by default, so you need to add the following to the configuration file of your Gitea instance to enable it:
|
||||||
|
|
||||||
|
```ini
|
||||||
|
[actions]
|
||||||
|
ENABLED=true
|
||||||
|
```
|
||||||
|
|
||||||
### Register
|
### Register
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
@@ -36,7 +43,7 @@ And you will be asked to input:
|
|||||||
|
|
||||||
1. Gitea instance URL, like `http://192.168.8.8:3000/`. You should use your gitea instance ROOT_URL as the instance argument
|
1. Gitea instance URL, like `http://192.168.8.8:3000/`. You should use your gitea instance ROOT_URL as the instance argument
|
||||||
and you should not use `localhost` or `127.0.0.1` as instance IP;
|
and you should not use `localhost` or `127.0.0.1` as instance IP;
|
||||||
2. Runner token, you can get it from `http://192.168.8.8:3000/admin/runners`;
|
2. Runner token, you can get it from `http://192.168.8.8:3000/admin/actions/runners`;
|
||||||
3. Runner name, you can just leave it blank;
|
3. Runner name, you can just leave it blank;
|
||||||
4. Runner labels, you can just leave it blank.
|
4. Runner labels, you can just leave it blank.
|
||||||
|
|
||||||
|
|||||||
4
go.mod
4
go.mod
@@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner
|
|||||||
go 1.20
|
go 1.20
|
||||||
|
|
||||||
require (
|
require (
|
||||||
code.gitea.io/actions-proto-go v0.3.0
|
code.gitea.io/actions-proto-go v0.3.1
|
||||||
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5
|
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5
|
||||||
github.com/avast/retry-go/v4 v4.3.1
|
github.com/avast/retry-go/v4 v4.3.1
|
||||||
github.com/bufbuild/connect-go v1.3.1
|
github.com/bufbuild/connect-go v1.3.1
|
||||||
@@ -89,4 +89,4 @@ require (
|
|||||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/nektos/act => gitea.com/gitea/act v0.246.2-0.20230703034344-3813f40cba18
|
replace github.com/nektos/act => gitea.com/gitea/act v0.246.2-0.20230717034634-cdc6d4bc6a38
|
||||||
|
|||||||
8
go.sum
8
go.sum
@@ -1,9 +1,9 @@
|
|||||||
code.gitea.io/actions-proto-go v0.3.0 h1:9Tvg8+TaaCXPKi6EnWl9vVgs2VZsj1Cs5afnsHa4AmM=
|
code.gitea.io/actions-proto-go v0.3.1 h1:PMyiQtBKb8dNnpEO2R5rcZdXSis+UQZVo/SciMtR1aU=
|
||||||
code.gitea.io/actions-proto-go v0.3.0/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A=
|
code.gitea.io/actions-proto-go v0.3.1/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A=
|
||||||
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5 h1:daBEK2GQeqGikJESctP5Cu1i33z5ztAD4kyQWiw185M=
|
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5 h1:daBEK2GQeqGikJESctP5Cu1i33z5ztAD4kyQWiw185M=
|
||||||
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE=
|
code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE=
|
||||||
gitea.com/gitea/act v0.246.2-0.20230703034344-3813f40cba18 h1:UN4x0o3LKZCsNLPtbk2E1e38XQKsL7XI/XaRh7ckw1g=
|
gitea.com/gitea/act v0.246.2-0.20230717034634-cdc6d4bc6a38 h1:whUEO/qPkYfpbL1he9TuIIzz2P4v6xEwb2lT6E/4F7A=
|
||||||
gitea.com/gitea/act v0.246.2-0.20230703034344-3813f40cba18/go.mod h1:oU/5klyP5O+J2psPS3t50t09+SNVg+fZ/jN4lDZAq1U=
|
gitea.com/gitea/act v0.246.2-0.20230717034634-cdc6d4bc6a38/go.mod h1:oU/5klyP5O+J2psPS3t50t09+SNVg+fZ/jN4lDZAq1U=
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
|
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
|
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
|||||||
cfg.Container.DockerHost = dockerSocketPath
|
cfg.Container.DockerHost = dockerSocketPath
|
||||||
}
|
}
|
||||||
// check the scheme, if the scheme is not npipe or unix
|
// check the scheme, if the scheme is not npipe or unix
|
||||||
// set cfg.Container.DockerHost to "-" because it can't be mounted to the job conatiner
|
// set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
|
||||||
if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
|
if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
|
||||||
scheme := cfg.Container.DockerHost[:protoIndex]
|
scheme := cfg.Container.DockerHost[:protoIndex]
|
||||||
if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
|
if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ type executeArgs struct {
|
|||||||
envs []string
|
envs []string
|
||||||
envfile string
|
envfile string
|
||||||
secrets []string
|
secrets []string
|
||||||
defaultActionsUrl string
|
defaultActionsURL string
|
||||||
insecureSecrets bool
|
insecureSecrets bool
|
||||||
privileged bool
|
privileged bool
|
||||||
usernsMode string
|
usernsMode string
|
||||||
@@ -252,7 +252,7 @@ func runExecList(ctx context.Context, planner model.WorkflowPlanner, execArgs *e
|
|||||||
var filterPlan *model.Plan
|
var filterPlan *model.Plan
|
||||||
|
|
||||||
// Determine the event name to be filtered
|
// Determine the event name to be filtered
|
||||||
var filterEventName string = ""
|
var filterEventName string
|
||||||
|
|
||||||
if len(execArgs.event) > 0 {
|
if len(execArgs.event) > 0 {
|
||||||
log.Infof("Using chosed event for filtering: %s", execArgs.event)
|
log.Infof("Using chosed event for filtering: %s", execArgs.event)
|
||||||
@@ -289,7 +289,7 @@ func runExecList(ctx context.Context, planner model.WorkflowPlanner, execArgs *e
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
printList(filterPlan)
|
_ = printList(filterPlan)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -359,11 +359,11 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command
|
|||||||
execArgs.cacheHandler = handler
|
execArgs.cacheHandler = handler
|
||||||
|
|
||||||
if len(execArgs.artifactServerAddr) == 0 {
|
if len(execArgs.artifactServerAddr) == 0 {
|
||||||
if ip := common.GetOutboundIP(); ip == nil {
|
ip := common.GetOutboundIP()
|
||||||
|
if ip == nil {
|
||||||
return fmt.Errorf("unable to determine outbound IP address")
|
return fmt.Errorf("unable to determine outbound IP address")
|
||||||
} else {
|
|
||||||
execArgs.artifactServerAddr = ip.String()
|
|
||||||
}
|
}
|
||||||
|
execArgs.artifactServerAddr = ip.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(execArgs.artifactServerPath) == 0 {
|
if len(execArgs.artifactServerPath) == 0 {
|
||||||
@@ -407,7 +407,7 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command
|
|||||||
ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%s", eventName),
|
ContainerNamePrefix: fmt.Sprintf("GITEA-ACTIONS-TASK-%s", eventName),
|
||||||
ContainerMaxLifetime: maxLifetime,
|
ContainerMaxLifetime: maxLifetime,
|
||||||
ContainerNetworkMode: container.NetworkMode(execArgs.network),
|
ContainerNetworkMode: container.NetworkMode(execArgs.network),
|
||||||
DefaultActionInstance: execArgs.defaultActionsUrl,
|
DefaultActionInstance: execArgs.defaultActionsURL,
|
||||||
PlatformPicker: func(_ []string) string {
|
PlatformPicker: func(_ []string) string {
|
||||||
return execArgs.image
|
return execArgs.image
|
||||||
},
|
},
|
||||||
@@ -423,7 +423,7 @@ func runExec(ctx context.Context, execArgs *executeArgs) func(cmd *cobra.Command
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !execArgs.debug {
|
if !execArgs.debug {
|
||||||
logLevel := log.Level(log.InfoLevel)
|
logLevel := log.InfoLevel
|
||||||
config.JobLoggerLevel = &logLevel
|
config.JobLoggerLevel = &logLevel
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -480,7 +480,7 @@ func loadExecCmd(ctx context.Context) *cobra.Command {
|
|||||||
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPath, "artifact-server-path", "", ".", "Defines the path where the artifact server stores uploads and retrieves downloads from. If not specified the artifact server will not start.")
|
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPath, "artifact-server-path", "", ".", "Defines the path where the artifact server stores uploads and retrieves downloads from. If not specified the artifact server will not start.")
|
||||||
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerAddr, "artifact-server-addr", "", "", "Defines the address where the artifact server listens")
|
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerAddr, "artifact-server-addr", "", "", "Defines the address where the artifact server listens")
|
||||||
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPort, "artifact-server-port", "", "34567", "Defines the port where the artifact server listens (will only bind to localhost).")
|
execCmd.PersistentFlags().StringVarP(&execArg.artifactServerPort, "artifact-server-port", "", "34567", "Defines the port where the artifact server listens (will only bind to localhost).")
|
||||||
execCmd.PersistentFlags().StringVarP(&execArg.defaultActionsUrl, "default-actions-url", "", "https://github.com", "Defines the default url of action instance.")
|
execCmd.PersistentFlags().StringVarP(&execArg.defaultActionsURL, "default-actions-url", "", "https://github.com", "Defines the default url of action instance.")
|
||||||
execCmd.PersistentFlags().BoolVarP(&execArg.noSkipCheckout, "no-skip-checkout", "", false, "Do not skip actions/checkout")
|
execCmd.PersistentFlags().BoolVarP(&execArg.noSkipCheckout, "no-skip-checkout", "", false, "Do not skip actions/checkout")
|
||||||
execCmd.PersistentFlags().BoolVarP(&execArg.debug, "debug", "d", false, "enable debug log")
|
execCmd.PersistentFlags().BoolVarP(&execArg.debug, "debug", "d", false, "enable debug log")
|
||||||
execCmd.PersistentFlags().BoolVarP(&execArg.dryrun, "dryrun", "n", false, "dryrun mode")
|
execCmd.PersistentFlags().BoolVarP(&execArg.dryrun, "dryrun", "n", false, "dryrun mode")
|
||||||
|
|||||||
@@ -47,12 +47,12 @@ func runRegister(ctx context.Context, regArgs *registerArgs, configFile *string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if regArgs.NoInteractive {
|
if regArgs.NoInteractive {
|
||||||
if err := registerNoInteractive(*configFile, regArgs); err != nil {
|
if err := registerNoInteractive(ctx, *configFile, regArgs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
go func() {
|
go func() {
|
||||||
if err := registerInteractive(*configFile); err != nil {
|
if err := registerInteractive(ctx, *configFile); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -187,7 +187,7 @@ func (r *registerInputs) assignToNext(stage registerStage, value string, cfg *co
|
|||||||
return StageUnknown
|
return StageUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerInteractive(configFile string) error {
|
func registerInteractive(ctx context.Context, configFile string) error {
|
||||||
var (
|
var (
|
||||||
reader = bufio.NewReader(os.Stdin)
|
reader = bufio.NewReader(os.Stdin)
|
||||||
stage = StageInputInstance
|
stage = StageInputInstance
|
||||||
@@ -213,11 +213,10 @@ func registerInteractive(configFile string) error {
|
|||||||
|
|
||||||
if stage == StageWaitingForRegistration {
|
if stage == StageWaitingForRegistration {
|
||||||
log.Infof("Registering runner, name=%s, instance=%s, labels=%v.", inputs.RunnerName, inputs.InstanceAddr, inputs.Labels)
|
log.Infof("Registering runner, name=%s, instance=%s, labels=%v.", inputs.RunnerName, inputs.InstanceAddr, inputs.Labels)
|
||||||
if err := doRegister(cfg, inputs); err != nil {
|
if err := doRegister(ctx, cfg, inputs); err != nil {
|
||||||
return fmt.Errorf("Failed to register runner: %w", err)
|
return fmt.Errorf("Failed to register runner: %w", err)
|
||||||
} else {
|
|
||||||
log.Infof("Runner registered successfully.")
|
|
||||||
}
|
}
|
||||||
|
log.Infof("Runner registered successfully.")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -250,7 +249,7 @@ func printStageHelp(stage registerStage) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerNoInteractive(configFile string, regArgs *registerArgs) error {
|
func registerNoInteractive(ctx context.Context, configFile string, regArgs *registerArgs) error {
|
||||||
cfg, err := config.LoadDefault(configFile)
|
cfg, err := config.LoadDefault(configFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -282,16 +281,14 @@ func registerNoInteractive(configFile string, regArgs *registerArgs) error {
|
|||||||
log.WithError(err).Errorf("Invalid input, please re-run act command.")
|
log.WithError(err).Errorf("Invalid input, please re-run act command.")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := doRegister(cfg, inputs); err != nil {
|
if err := doRegister(ctx, cfg, inputs); err != nil {
|
||||||
return fmt.Errorf("Failed to register runner: %w", err)
|
return fmt.Errorf("Failed to register runner: %w", err)
|
||||||
}
|
}
|
||||||
log.Infof("Runner registered successfully.")
|
log.Infof("Runner registered successfully.")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func doRegister(cfg *config.Config, inputs *registerInputs) error {
|
func doRegister(ctx context.Context, cfg *config.Config, inputs *registerInputs) error {
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// initial http client
|
// initial http client
|
||||||
cli := client.New(
|
cli := client.New(
|
||||||
inputs.InstanceAddr,
|
inputs.InstanceAddr,
|
||||||
@@ -307,7 +304,7 @@ func doRegister(cfg *config.Config, inputs *registerInputs) error {
|
|||||||
}))
|
}))
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return ctx.Err()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
|
|||||||
@@ -6,7 +6,9 @@ package poll
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||||
"github.com/bufbuild/connect-go"
|
"github.com/bufbuild/connect-go"
|
||||||
@@ -19,9 +21,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Poller struct {
|
type Poller struct {
|
||||||
client client.Client
|
client client.Client
|
||||||
runner *run.Runner
|
runner *run.Runner
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
|
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
||||||
@@ -55,9 +58,20 @@ func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Lim
|
|||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := p.runner.Run(ctx, task); err != nil {
|
p.runTaskWithRecover(ctx, task)
|
||||||
log.WithError(err).Error("failed to run task")
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Poller) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
err := fmt.Errorf("panic: %v", r)
|
||||||
|
log.WithError(err).Error("panic in runTaskWithRecover")
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := p.runner.Run(ctx, task); err != nil {
|
||||||
|
log.WithError(err).Error("failed to run task")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,7 +79,11 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
|||||||
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
|
reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{}))
|
// Load the version value that was in the cache when the request was sent.
|
||||||
|
v := p.tasksVersion.Load()
|
||||||
|
resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{
|
||||||
|
TasksVersion: v,
|
||||||
|
}))
|
||||||
if errors.Is(err, context.DeadlineExceeded) {
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
@@ -74,8 +92,20 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
|
|||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp == nil || resp.Msg == nil || resp.Msg.Task == nil {
|
if resp == nil || resp.Msg == nil {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if resp.Msg.TasksVersion > v {
|
||||||
|
p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.Msg.Task == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// got a task, set `tasksVersion` to zero to focre query db in next request.
|
||||||
|
p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
|
||||||
|
|
||||||
return resp.Msg.Task, true
|
return resp.Msg.Task, true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -91,10 +91,9 @@ func NewRunner(cfg *config.Config, reg *config.Registration, cli client.Client)
|
|||||||
func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
|
func (r *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
|
||||||
if _, ok := r.runningTasks.Load(task.Id); ok {
|
if _, ok := r.runningTasks.Load(task.Id); ok {
|
||||||
return fmt.Errorf("task %d is already running", task.Id)
|
return fmt.Errorf("task %d is already running", task.Id)
|
||||||
} else {
|
|
||||||
r.runningTasks.Store(task.Id, struct{}{})
|
|
||||||
defer r.runningTasks.Delete(task.Id)
|
|
||||||
}
|
}
|
||||||
|
r.runningTasks.Store(task.Id, struct{}{})
|
||||||
|
defer r.runningTasks.Delete(task.Id)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout)
|
ctx, cancel := context.WithTimeout(ctx, r.cfg.Runner.Timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/bufbuild/connect-go"
|
"github.com/bufbuild/connect-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getHttpClient(endpoint string, insecure bool) *http.Client {
|
func getHTTPClient(endpoint string, insecure bool) *http.Client {
|
||||||
if strings.HasPrefix(endpoint, "https://") && insecure {
|
if strings.HasPrefix(endpoint, "https://") && insecure {
|
||||||
return &http.Client{
|
return &http.Client{
|
||||||
Transport: &http.Transport{
|
Transport: &http.Transport{
|
||||||
@@ -49,12 +49,12 @@ func New(endpoint string, insecure bool, uuid, token, version string, opts ...co
|
|||||||
|
|
||||||
return &HTTPClient{
|
return &HTTPClient{
|
||||||
PingServiceClient: pingv1connect.NewPingServiceClient(
|
PingServiceClient: pingv1connect.NewPingServiceClient(
|
||||||
getHttpClient(endpoint, insecure),
|
getHTTPClient(endpoint, insecure),
|
||||||
baseURL,
|
baseURL,
|
||||||
opts...,
|
opts...,
|
||||||
),
|
),
|
||||||
RunnerServiceClient: runnerv1connect.NewRunnerServiceClient(
|
RunnerServiceClient: runnerv1connect.NewRunnerServiceClient(
|
||||||
getHttpClient(endpoint, insecure),
|
getHTTPClient(endpoint, insecure),
|
||||||
baseURL,
|
baseURL,
|
||||||
opts...,
|
opts...,
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
# Example configuration file, it's safe to copy this as the default config file without any modification.
|
# Example configuration file, it's safe to copy this as the default config file without any modification.
|
||||||
|
|
||||||
|
# You don't have to copy this file to your instance,
|
||||||
|
# just run `./act_runner generate-config > config.yaml` to generate a config file.
|
||||||
|
|
||||||
log:
|
log:
|
||||||
# The level of logging, can be trace, debug, info, warn, error, fatal
|
# The level of logging, can be trace, debug, info, warn, error, fatal
|
||||||
level: info
|
level: info
|
||||||
|
|||||||
@@ -55,9 +55,8 @@ func TestParse(t *testing.T) {
|
|||||||
if tt.wantErr {
|
if tt.wantErr {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
return
|
return
|
||||||
} else {
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
assert.DeepEqual(t, got, tt.want)
|
assert.DeepEqual(t, got, tt.want)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user