2 Commits

Author SHA1 Message Date
Bo-Yi Wu
dc91ce340e test(poll): add concurrency test for single-poller capacity control
- Introduce TaskRunner interface to decouple Poller from concrete run.Runner
- Add TestPoller_ConcurrencyLimitedByCapacity verifying max concurrent
  tasks respects capacity and FetchTask is never called concurrently
- Mock runner respects context cancellation for proper shutdown testing
2026-04-16 15:16:25 +08:00
Bo-Yi Wu
272a446326 refactor: use single poller with semaphore-based capacity control
Previously, capacity=N spawned N independent polling goroutines, each
making FetchTask RPCs to the Gitea server concurrently. This caused
unnecessary connection load on the server proportional to the runner's
capacity setting.

Replace the N-goroutine model with a single polling loop that uses a
buffered channel as a semaphore to control concurrent task execution.
The poller acquires a capacity slot before fetching; when at capacity,
it blocks without issuing RPCs. Fetched tasks are dispatched to
independent goroutines that release their slot on completion.

Also fix a pre-existing bug in Shutdown() where the timeout branch
used a blocking receive on p.done instead of a non-blocking select,
which prevented shutdownJobs() from ever being called on timeout.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 14:56:13 +08:00
14 changed files with 79 additions and 112 deletions

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@
.env
.runner
coverage.txt
/gitea-vet
/config.yaml
# Jetbrains

View File

@@ -13,7 +13,6 @@ linters:
- forbidigo
- gocheckcompilerdirectives
- gocritic
- goheader
- govet
- ineffassign
- mirror
@@ -86,11 +85,6 @@ linters:
enable:
- nilness
- unusedwrite
goheader:
values:
regexp:
HEADER: 'Copyright \d{4} The Gitea Authors\. All rights reserved\.(\nCopyright [^\n]+)*\nSPDX-License-Identifier: MIT'
template: '{{ HEADER }}'
exclusions:
generated: lax
presets:
@@ -107,16 +101,9 @@ issues:
max-same-issues: 0
formatters:
enable:
- gci
- gofmt
- gofumpt
settings:
gci:
custom-order: true
sections:
- standard
- prefix(gitea.com/gitea/act_runner)
- blank
- default
gofumpt:
extra-rules: true
exclusions:

View File

@@ -1,5 +1,6 @@
DIST := dist
EXECUTABLE := act_runner
GOFMT ?= gofumpt -l
DIST_DIRS := $(DIST)/binaries $(DIST)/release
GO ?= go
SHASUM ?= shasum -a 256
@@ -11,6 +12,7 @@ GXZ_PAGAGE ?= github.com/ulikunitz/xz/cmd/gxz@v0.5.10
LINUX_ARCHS ?= linux/amd64,linux/arm64
DARWIN_ARCHS ?= darwin-12/amd64,darwin-12/arm64
WINDOWS_ARCHS ?= windows/amd64
GO_FMT_FILES := $(shell find . -type f -name "*.go" ! -name "generated.*")
GOFILES := $(shell find . -type f -name "*.go" -o -name "go.mod" ! -name "generated.*")
DOCKER_IMAGE ?= gitea/act_runner
@@ -66,14 +68,19 @@ else
endif
endif
GO_PACKAGES_TO_VET ?= $(filter-out gitea.com/gitea/act_runner/internal/pkg/client/mocks,$(shell $(GO) list ./...))
TAGS ?=
LDFLAGS ?= -X "gitea.com/gitea/act_runner/internal/pkg/ver.version=v$(RELASE_VERSION)"
all: build
.PHONY: fmt
fmt:
$(GO) run $(GOLANGCI_LINT_PACKAGE) fmt
@hash gofumpt > /dev/null 2>&1; if [ $$? -ne 0 ]; then \
$(GO) install mvdan.cc/gofumpt@latest; \
fi
$(GOFMT) -w $(GO_FMT_FILES)
.PHONY: go-check
go-check:
@@ -86,20 +93,23 @@ go-check:
fi
.PHONY: fmt-check
fmt-check: fmt
@diff=$$(git diff --color=always); \
fmt-check:
@hash gofumpt > /dev/null 2>&1; if [ $$? -ne 0 ]; then \
$(GO) install mvdan.cc/gofumpt@latest; \
fi
@diff=$$($(GOFMT) -d $(GO_FMT_FILES)); \
if [ -n "$$diff" ]; then \
echo "Please run 'make fmt' and commit the result:"; \
printf "%s" "$${diff}"; \
echo "$${diff}"; \
exit 1; \
fi
fi;
.PHONY: deps-tools
deps-tools: ## install tool dependencies
$(GO) install $(GOVULNCHECK_PACKAGE)
.PHONY: lint
lint: lint-go
lint: lint-go vet
.PHONY: lint-go
lint-go: ## lint go files
@@ -129,6 +139,12 @@ tidy-check: tidy
test: fmt-check security-check
@$(GO) test -race -v -cover -coverprofile coverage.txt ./... && echo "\n==>\033[32m Ok\033[m\n" || exit 1
.PHONY: vet
vet:
@echo "Running go vet..."
@$(GO) build code.gitea.io/gitea-vet
@$(GO) vet -vettool=gitea-vet $(GO_PACKAGES_TO_VET)
install: $(GOFILES)
$(GO) install -v -tags '$(TAGS)' -ldflags '$(EXTLDFLAGS)-s -w $(LDFLAGS)'

11
build.go Normal file
View File

@@ -0,0 +1,11 @@
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
//go:build vendor
package main
import (
// for vet
_ "code.gitea.io/gitea-vet"
)

2
go.mod
View File

@@ -4,6 +4,7 @@ go 1.26.0
require (
code.gitea.io/actions-proto-go v0.4.1
code.gitea.io/gitea-vet v0.2.3
connectrpc.com/connect v1.19.1
github.com/avast/retry-go/v4 v4.7.0
github.com/docker/docker v25.0.13+incompatible
@@ -109,6 +110,7 @@ require (
golang.org/x/net v0.50.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/tools v0.42.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.67.0 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect

8
go.sum
View File

@@ -1,5 +1,7 @@
code.gitea.io/actions-proto-go v0.4.1 h1:l0EYhjsgpUe/1VABo2eK7zcoNX2W44WOnb0MSLrKfls=
code.gitea.io/actions-proto-go v0.4.1/go.mod h1:mn7Wkqz6JbnTOHQpot3yDeHx+O5C9EGhMEE+htvHBas=
code.gitea.io/gitea-vet v0.2.3 h1:gdFmm6WOTM65rE8FUBTRzeQZYzXePKSSB1+r574hWwI=
code.gitea.io/gitea-vet v0.2.3/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE=
connectrpc.com/connect v1.19.1 h1:R5M57z05+90EfEvCY1b7hBxDVOUl45PrtXtAV2fOC14=
connectrpc.com/connect v1.19.1/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w=
cyphar.com/go-pathrs v0.2.3 h1:0pH8gep37wB0BgaXrEaN1OtZhUMeS7VvaejSr6i822o=
@@ -222,6 +224,7 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
@@ -265,6 +268,8 @@ golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -303,8 +308,11 @@ golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200325010219-a49f79bcc224/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -8,10 +8,10 @@ import (
"fmt"
"os"
"github.com/spf13/cobra"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/ver"
"github.com/spf13/cobra"
)
func Execute(ctx context.Context) {

View File

@@ -16,6 +16,11 @@ import (
"strings"
"time"
"connectrpc.com/connect"
"github.com/mattn/go-isatty"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"gitea.com/gitea/act_runner/internal/app/poll"
"gitea.com/gitea/act_runner/internal/app/run"
"gitea.com/gitea/act_runner/internal/pkg/client"
@@ -24,11 +29,6 @@ import (
"gitea.com/gitea/act_runner/internal/pkg/labels"
"gitea.com/gitea/act_runner/internal/pkg/metrics"
"gitea.com/gitea/act_runner/internal/pkg/ver"
"connectrpc.com/connect"
"github.com/mattn/go-isatty"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
func runDaemon(ctx context.Context, daemArgs *daemonArgs, configFile *string) func(cmd *cobra.Command, args []string) error {

View File

@@ -14,17 +14,17 @@ import (
"strings"
"time"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/labels"
"gitea.com/gitea/act_runner/internal/pkg/ver"
pingv1 "code.gitea.io/actions-proto-go/ping/v1"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
"github.com/mattn/go-isatty"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/labels"
"gitea.com/gitea/act_runner/internal/pkg/ver"
)
// runRegister registers a runner to the server

View File

@@ -12,13 +12,13 @@ import (
"sync/atomic"
"time"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/metrics"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
log "github.com/sirupsen/logrus"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/metrics"
)
// TaskRunner abstracts task execution so the poller can be tested

View File

@@ -11,14 +11,14 @@ import (
"testing"
"time"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
"gitea.com/gitea/act_runner/internal/pkg/config"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
connect_go "connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
"gitea.com/gitea/act_runner/internal/pkg/config"
)
// TestPoller_WorkerStateCounters verifies that workerState correctly tracks
@@ -200,61 +200,3 @@ func TestPoller_ConcurrencyLimitedByCapacity(t *testing.T) {
assert.Equal(t, int64(totalTasks), runner.totalCompleted.Load(),
"all tasks should have been executed")
}
// TestPoller_ShutdownForcesJobsOnTimeout locks in the fix for a
// pre-existing bug where Shutdown's timeout branch used a blocking
// `<-p.done` receive, leaving p.shutdownJobs() unreachable. With a
// task parked on jobsCtx and a Shutdown deadline shorter than the
// task's natural completion, Shutdown must force-cancel via
// shutdownJobs() and return ctx.Err() promptly — not block until the
// task would have finished on its own.
func TestPoller_ShutdownForcesJobsOnTimeout(t *testing.T) {
var served atomic.Bool
cli := mocks.NewClient(t)
cli.On("FetchTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
if served.CompareAndSwap(false, true) {
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{
Task: &runnerv1.Task{Id: 1},
}), nil
}
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil
},
)
// delay >> Shutdown timeout: Run only returns when jobsCtx is
// cancelled by shutdownJobs().
runner := &mockRunner{delay: 30 * time.Second}
cfg, err := config.LoadDefault("")
require.NoError(t, err)
cfg.Runner.Capacity = 1
cfg.Runner.FetchInterval = 10 * time.Millisecond
cfg.Runner.FetchIntervalMax = 10 * time.Millisecond
poller := New(cfg, cli, runner)
var wg sync.WaitGroup
wg.Go(poller.Poll)
require.Eventually(t, func() bool {
return runner.running.Load() == 1
}, time.Second, 10*time.Millisecond, "task should start running")
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
start := time.Now()
err = poller.Shutdown(ctx)
elapsed := time.Since(start)
require.ErrorIs(t, err, context.DeadlineExceeded)
// With the fix, Shutdown returns shortly after the deadline once
// the forced job unwinds. Without the fix, the blocking <-p.done
// would hang for the full 30s mockRunner delay.
assert.Less(t, elapsed, 5*time.Second,
"Shutdown must not block on the parked task; shutdownJobs() must run on timeout")
wg.Wait()
assert.Equal(t, int64(1), runner.totalCompleted.Load(),
"the parked task must be cancelled and unwound")
}

View File

@@ -15,13 +15,6 @@ import (
"sync/atomic"
"time"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/labels"
"gitea.com/gitea/act_runner/internal/pkg/metrics"
"gitea.com/gitea/act_runner/internal/pkg/report"
"gitea.com/gitea/act_runner/internal/pkg/ver"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
"github.com/docker/docker/api/types/container"
@@ -30,6 +23,13 @@ import (
"github.com/nektos/act/pkg/model"
"github.com/nektos/act/pkg/runner"
log "github.com/sirupsen/logrus"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/labels"
"gitea.com/gitea/act_runner/internal/pkg/metrics"
"gitea.com/gitea/act_runner/internal/pkg/report"
"gitea.com/gitea/act_runner/internal/pkg/ver"
)
// Runner runs the pipeline.

View File

@@ -12,16 +12,16 @@ import (
"sync"
"time"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/metrics"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"connectrpc.com/connect"
"github.com/avast/retry-go/v4"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/metrics"
)
type Reporter struct {

View File

@@ -12,9 +12,6 @@ import (
"testing"
"time"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
"gitea.com/gitea/act_runner/internal/pkg/config"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
connect_go "connectrpc.com/connect"
log "github.com/sirupsen/logrus"
@@ -23,6 +20,9 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.com/gitea/act_runner/internal/pkg/client/mocks"
"gitea.com/gitea/act_runner/internal/pkg/config"
)
func TestReporter_parseLogRow(t *testing.T) {