mirror of
https://gitea.com/gitea/act_runner.git
synced 2025-12-19 04:24:48 +00:00
Merge remote-tracking branch 'gitea/main' into offline_mode_support
# Conflicts: # internal/app/run/runner.go
This commit is contained in:
@@ -42,12 +42,14 @@ func Execute(ctx context.Context) {
|
||||
rootCmd.AddCommand(registerCmd)
|
||||
|
||||
// ./act_runner daemon
|
||||
var daemArgs daemonArgs
|
||||
daemonCmd := &cobra.Command{
|
||||
Use: "daemon",
|
||||
Short: "Run as a runner daemon",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
RunE: runDaemon(ctx, &configFile),
|
||||
Args: cobra.MaximumNArgs(0),
|
||||
RunE: runDaemon(ctx, &daemArgs, &configFile),
|
||||
}
|
||||
daemonCmd.Flags().BoolVar(&daemArgs.Once, "once", false, "Run one job then exit")
|
||||
rootCmd.AddCommand(daemonCmd)
|
||||
|
||||
// ./act_runner exec
|
||||
|
||||
@@ -10,10 +10,11 @@ import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/bufbuild/connect-go"
|
||||
"connectrpc.com/connect"
|
||||
"github.com/mattn/go-isatty"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -27,7 +28,7 @@ import (
|
||||
"gitea.com/gitea/act_runner/internal/pkg/ver"
|
||||
)
|
||||
|
||||
func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error {
|
||||
func runDaemon(ctx context.Context, daemArgs *daemonArgs, configFile *string) func(cmd *cobra.Command, args []string) error {
|
||||
return func(cmd *cobra.Command, args []string) error {
|
||||
cfg, err := config.LoadDefault(*configFile)
|
||||
if err != nil {
|
||||
@@ -88,6 +89,14 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
||||
}
|
||||
}
|
||||
|
||||
if !slices.Equal(reg.Labels, ls.ToStrings()) {
|
||||
reg.Labels = ls.ToStrings()
|
||||
if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
|
||||
return fmt.Errorf("failed to save runner config: %w", err)
|
||||
}
|
||||
log.Infof("labels updated to: %v", reg.Labels)
|
||||
}
|
||||
|
||||
cli := client.New(
|
||||
reg.Address,
|
||||
cfg.Runner.Insecure,
|
||||
@@ -97,32 +106,58 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command,
|
||||
)
|
||||
|
||||
runner := run.NewRunner(cfg, reg, cli)
|
||||
|
||||
// declare the labels of the runner before fetching tasks
|
||||
resp, err := runner.Declare(ctx, ls.Names())
|
||||
if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented {
|
||||
// Gitea instance is older version. skip declare step.
|
||||
log.Warn("Because the Gitea instance is an old version, skip declare labels and version.")
|
||||
log.Errorf("Your Gitea version is too old to support runner declare, please upgrade to v1.21 or later")
|
||||
return err
|
||||
} else if err != nil {
|
||||
log.WithError(err).Error("fail to invoke Declare")
|
||||
return err
|
||||
} else {
|
||||
log.Infof("runner: %s, with version: %s, with labels: %v, declare successfully",
|
||||
resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
|
||||
// if declare successfully, override the labels in the.runner file with valid labels in the config file (if specified)
|
||||
reg.Labels = ls.ToStrings()
|
||||
if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
|
||||
return fmt.Errorf("failed to save runner config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
poller := poll.New(cfg, cli, runner)
|
||||
|
||||
poller.Poll(ctx)
|
||||
if daemArgs.Once {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
poller.PollOnce()
|
||||
}()
|
||||
|
||||
// shutdown when we complete a job or cancel is requested
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-done:
|
||||
}
|
||||
} else {
|
||||
go poller.Poll()
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
err = poller.Shutdown(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type daemonArgs struct {
|
||||
Once bool
|
||||
}
|
||||
|
||||
// initLogging setup the global logrus logger.
|
||||
func initLogging(cfg *config.Config) {
|
||||
isTerm := isatty.IsTerminal(os.Stdout.Fd())
|
||||
|
||||
@@ -484,7 +484,7 @@ func loadExecCmd(ctx context.Context) *cobra.Command {
|
||||
execCmd.PersistentFlags().BoolVarP(&execArg.noSkipCheckout, "no-skip-checkout", "", false, "Do not skip actions/checkout")
|
||||
execCmd.PersistentFlags().BoolVarP(&execArg.debug, "debug", "d", false, "enable debug log")
|
||||
execCmd.PersistentFlags().BoolVarP(&execArg.dryrun, "dryrun", "n", false, "dryrun mode")
|
||||
execCmd.PersistentFlags().StringVarP(&execArg.image, "image", "i", "node:16-bullseye", "Docker image to use. Use \"-self-hosted\" to run directly on the host.")
|
||||
execCmd.PersistentFlags().StringVarP(&execArg.image, "image", "i", "gitea/runner-images:ubuntu-latest", "Docker image to use. Use \"-self-hosted\" to run directly on the host.")
|
||||
execCmd.PersistentFlags().StringVarP(&execArg.network, "network", "", "", "Specify the network to which the container will connect")
|
||||
execCmd.PersistentFlags().StringVarP(&execArg.githubInstance, "gitea-instance", "", "", "Gitea instance to use.")
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
|
||||
pingv1 "code.gitea.io/actions-proto-go/ping/v1"
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
"github.com/bufbuild/connect-go"
|
||||
"connectrpc.com/connect"
|
||||
"github.com/mattn/go-isatty"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -91,10 +91,9 @@ const (
|
||||
)
|
||||
|
||||
var defaultLabels = []string{
|
||||
"ubuntu-latest:docker://node:16-bullseye",
|
||||
"ubuntu-22.04:docker://node:16-bullseye", // There's no node:16-bookworm yet
|
||||
"ubuntu-20.04:docker://node:16-bullseye",
|
||||
"ubuntu-18.04:docker://node:16-buster",
|
||||
"ubuntu-latest:docker://gitea/runner-images:ubuntu-latest",
|
||||
"ubuntu-22.04:docker://gitea/runner-images:ubuntu-22.04",
|
||||
"ubuntu-20.04:docker://gitea/runner-images:ubuntu-20.04",
|
||||
}
|
||||
|
||||
type registerInputs struct {
|
||||
@@ -179,7 +178,7 @@ func (r *registerInputs) assignToNext(stage registerStage, value string, cfg *co
|
||||
}
|
||||
|
||||
if validateLabels(r.Labels) != nil {
|
||||
log.Infoln("Invalid labels, please input again, leave blank to use the default labels (for example, ubuntu-20.04:docker://node:16-bullseye,ubuntu-18.04:docker://node:16-buster,linux_arm:host)")
|
||||
log.Infoln("Invalid labels, please input again, leave blank to use the default labels (for example, ubuntu-latest:docker://gitea/runner-images:ubuntu-latest)")
|
||||
return StageInputLabels
|
||||
}
|
||||
return StageWaitingForRegistration
|
||||
@@ -243,7 +242,7 @@ func printStageHelp(stage registerStage) {
|
||||
hostname, _ := os.Hostname()
|
||||
log.Infof("Enter the runner name (if set empty, use hostname: %s):\n", hostname)
|
||||
case StageInputLabels:
|
||||
log.Infoln("Enter the runner labels, leave blank to use the default labels (comma-separated, for example, ubuntu-20.04:docker://node:16-bullseye,ubuntu-18.04:docker://node:16-buster,linux_arm:host):")
|
||||
log.Infoln("Enter the runner labels, leave blank to use the default labels (comma-separated, for example, ubuntu-latest:docker://gitea/runner-images:ubuntu-latest):")
|
||||
case StageWaitingForRegistration:
|
||||
log.Infoln("Waiting for registration...")
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
"github.com/bufbuild/connect-go"
|
||||
"connectrpc.com/connect"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
@@ -25,40 +25,118 @@ type Poller struct {
|
||||
runner *run.Runner
|
||||
cfg *config.Config
|
||||
tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea.
|
||||
|
||||
pollingCtx context.Context
|
||||
shutdownPolling context.CancelFunc
|
||||
|
||||
jobsCtx context.Context
|
||||
shutdownJobs context.CancelFunc
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller {
|
||||
pollingCtx, shutdownPolling := context.WithCancel(context.Background())
|
||||
|
||||
jobsCtx, shutdownJobs := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
return &Poller{
|
||||
client: client,
|
||||
runner: runner,
|
||||
cfg: cfg,
|
||||
|
||||
pollingCtx: pollingCtx,
|
||||
shutdownPolling: shutdownPolling,
|
||||
|
||||
jobsCtx: jobsCtx,
|
||||
shutdownJobs: shutdownJobs,
|
||||
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) Poll(ctx context.Context) {
|
||||
func (p *Poller) Poll() {
|
||||
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
||||
wg := &sync.WaitGroup{}
|
||||
for i := 0; i < p.cfg.Runner.Capacity; i++ {
|
||||
wg.Add(1)
|
||||
go p.poll(ctx, wg, limiter)
|
||||
go p.poll(wg, limiter)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// signal that we shutdown
|
||||
close(p.done)
|
||||
}
|
||||
|
||||
func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||
func (p *Poller) PollOnce() {
|
||||
limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1)
|
||||
|
||||
p.pollOnce(limiter)
|
||||
|
||||
// signal that we're done
|
||||
close(p.done)
|
||||
}
|
||||
|
||||
func (p *Poller) Shutdown(ctx context.Context) error {
|
||||
p.shutdownPolling()
|
||||
|
||||
select {
|
||||
// graceful shutdown completed succesfully
|
||||
case <-p.done:
|
||||
return nil
|
||||
|
||||
// our timeout for shutting down ran out
|
||||
case <-ctx.Done():
|
||||
// when both the timeout fires and the graceful shutdown
|
||||
// completed succsfully, this branch of the select may
|
||||
// fire. Do a non-blocking check here against the graceful
|
||||
// shutdown status to avoid sending an error if we don't need to.
|
||||
_, ok := <-p.done
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// force a shutdown of all running jobs
|
||||
p.shutdownJobs()
|
||||
|
||||
// wait for running jobs to report their status to Gitea
|
||||
_, _ = <-p.done
|
||||
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
p.pollOnce(limiter)
|
||||
|
||||
select {
|
||||
case <-p.pollingCtx.Done():
|
||||
return
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) pollOnce(limiter *rate.Limiter) {
|
||||
for {
|
||||
if err := limiter.Wait(p.pollingCtx); err != nil {
|
||||
if p.pollingCtx.Err() != nil {
|
||||
log.WithError(err).Debug("limiter wait failed")
|
||||
}
|
||||
return
|
||||
}
|
||||
task, ok := p.fetchTask(ctx)
|
||||
task, ok := p.fetchTask(p.pollingCtx)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
p.runTaskWithRecover(ctx, task)
|
||||
|
||||
p.runTaskWithRecover(p.jobsCtx, task)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
24
internal/app/run/logging.go
Normal file
24
internal/app/run/logging.go
Normal file
@@ -0,0 +1,24 @@
|
||||
// Copyright 2024 The Gitea Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package run
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// NullLogger is used to create a new JobLogger to discard logs. This
|
||||
// will prevent these logs from being logged to the stdout, but
|
||||
// forward them to the Reporter via its hook.
|
||||
type NullLogger struct{}
|
||||
|
||||
// WithJobLogger creates a new logrus.Logger that will discard all logs.
|
||||
func (n NullLogger) WithJobLogger() *log.Logger {
|
||||
logger := log.New()
|
||||
logger.SetOutput(io.Discard)
|
||||
logger.SetLevel(log.TraceLevel)
|
||||
|
||||
return logger
|
||||
}
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||
"github.com/bufbuild/connect-go"
|
||||
"connectrpc.com/connect"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/nektos/act/pkg/artifactcache"
|
||||
"github.com/nektos/act/pkg/common"
|
||||
@@ -183,10 +183,10 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
|
||||
runnerConfig := &runner.Config{
|
||||
// On Linux, Workdir will be like "/<parent_directory>/<owner>/<repo>"
|
||||
// On Windows, Workdir will be like "\<parent_directory>\<owner>\<repo>"
|
||||
Workdir: filepath.FromSlash(fmt.Sprintf("/%s/%s", r.cfg.Container.WorkdirParent, preset.Repository)),
|
||||
BindWorkdir: false,
|
||||
ActionCacheDir: filepath.FromSlash(r.cfg.Host.WorkdirParent),
|
||||
ActionOfflineMode: r.cfg.Cache.OfflineMode,
|
||||
Workdir: filepath.FromSlash(fmt.Sprintf("/%s/%s", strings.TrimLeft(r.cfg.Container.WorkdirParent, "/"), preset.Repository)),
|
||||
BindWorkdir: false,
|
||||
ActionCacheDir: filepath.FromSlash(r.cfg.Host.WorkdirParent),
|
||||
ActionOfflineMode: r.cfg.Cache.OfflineMode,
|
||||
|
||||
ReuseContainers: false,
|
||||
ForcePull: r.cfg.Container.ForcePull,
|
||||
@@ -224,6 +224,10 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
|
||||
// add logger recorders
|
||||
ctx = common.WithLoggerHook(ctx, reporter)
|
||||
|
||||
if !log.IsLevelEnabled(log.DebugLevel) {
|
||||
ctx = runner.WithJobLoggerFactory(ctx, NullLogger{})
|
||||
}
|
||||
|
||||
execErr := executor(ctx)
|
||||
reporter.SetOutputs(job.Outputs)
|
||||
return execErr
|
||||
|
||||
Reference in New Issue
Block a user