mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-06-10 02:54:23 +02:00
feat: upload job summary when supported (#917)
- Add GitHub-style Actions **job summaries** support (writes to `GITHUB_STEP_SUMMARY` / `workflow/SUMMARY.md`) and render them in the run UI. - Gitea stores summaries internally (DB) and serves them in the run view payload. - `act_runner` uploads the summary **only when Gitea advertises support** (`X-Gitea-Actions-Capabilities: job-summary`), and warns on upload failures without failing the job. ## Compatibility - New Gitea + old runner: no upload → no summary shown (no behavior change) - New runner + old Gitea: capability not advertised → runner skips upload (no behavior change) ## Issue - Fixes go-gitea/gitea#23721 Reviewed-on: https://gitea.com/gitea/runner/pulls/917 Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com> Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com>
This commit is contained in:
@@ -5,15 +5,46 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"gitea.com/gitea/runner/act/common"
|
||||
"gitea.com/gitea/runner/act/container"
|
||||
"gitea.com/gitea/runner/act/model"
|
||||
)
|
||||
|
||||
const maxJobSummaryBytes = 1024 * 1024
|
||||
|
||||
// jobSummaryTruncationMarker is appended to a summary that exceeded the size limit
|
||||
// so the rendered output makes the truncation visible instead of silently cutting off.
|
||||
const jobSummaryTruncationMarker = "\n\n---\n\n*Job summary truncated: it exceeded the maximum allowed size.*\n"
|
||||
|
||||
var (
|
||||
jobSummaryUploadRetryDelay = time.Second
|
||||
// jobSummaryUploadRequestTimeout bounds a single step upload request. It is kept
|
||||
// below jobSummaryUploadPhaseTimeout so one slow or unreachable request times out
|
||||
// and lets the remaining steps still upload within the phase budget, instead of a
|
||||
// single stuck request consuming the whole phase.
|
||||
jobSummaryUploadRequestTimeout = 5 * time.Second
|
||||
// jobSummaryUploadPhaseTimeout bounds the total time spent uploading all step
|
||||
// summaries. The uploads run inside the job cleanup budget that is also used to
|
||||
// stop and remove the container, so a slow or unreachable endpoint must not be
|
||||
// allowed to consume it; this keeps the remaining budget available for teardown.
|
||||
jobSummaryUploadPhaseTimeout = 15 * time.Second
|
||||
)
|
||||
|
||||
type jobInfo interface {
|
||||
matrix() map[string]any
|
||||
steps() []*model.Step
|
||||
@@ -80,8 +111,10 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
|
||||
return common.NewErrorExecutor(err)
|
||||
}
|
||||
|
||||
stepIdx := stepModel.Number
|
||||
preExec := step.pre()
|
||||
preSteps = append(preSteps, useStepLogger(rc, stepModel, stepStagePre, func(ctx context.Context) error {
|
||||
rc.CurrentStepIndex = stepIdx
|
||||
preErr := preExec(ctx)
|
||||
if preErr != nil {
|
||||
reportStepError(ctx, preErr)
|
||||
@@ -93,6 +126,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
|
||||
|
||||
stepExec := step.main()
|
||||
steps = append(steps, useStepLogger(rc, stepModel, stepStageMain, func(ctx context.Context) error {
|
||||
rc.CurrentStepIndex = stepIdx
|
||||
err := stepExec(ctx)
|
||||
if err != nil {
|
||||
reportStepError(ctx, err)
|
||||
@@ -104,6 +138,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
|
||||
|
||||
postFn := step.post()
|
||||
postExec := useStepLogger(rc, stepModel, stepStagePost, func(ctx context.Context) error {
|
||||
rc.CurrentStepIndex = stepIdx
|
||||
err := postFn(ctx)
|
||||
if err != nil {
|
||||
reportStepError(ctx, err)
|
||||
@@ -129,6 +164,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
|
||||
defer cancel()
|
||||
|
||||
logger := common.Logger(ctx)
|
||||
tryUploadJobSummary(ctx, rc)
|
||||
// For Gitea
|
||||
// We don't need to call `stopServiceContainers` here since it will be called by following `info.stopContainer`
|
||||
// logger.Infof("Cleaning up services for job %s", rc.JobName)
|
||||
@@ -235,6 +271,180 @@ func setJobOutputs(ctx context.Context, rc *RunContext) {
|
||||
}
|
||||
}
|
||||
|
||||
func tryUploadJobSummary(ctx context.Context, rc *RunContext) {
|
||||
if rc == nil || rc.JobContainer == nil || rc.Config == nil {
|
||||
return
|
||||
}
|
||||
// Bound the whole upload phase so a slow or unreachable endpoint cannot consume
|
||||
// the job cleanup budget reserved for stopping and removing the container.
|
||||
ctx, cancel := context.WithTimeout(ctx, jobSummaryUploadPhaseTimeout)
|
||||
defer cancel()
|
||||
env := rc.GetEnv()
|
||||
caps := strings.TrimSpace(env["GITEA_ACTIONS_CAPABILITIES"])
|
||||
if !hasJobSummaryCapability(caps) {
|
||||
// Server did not advertise support. Do not attempt upload.
|
||||
return
|
||||
}
|
||||
runtimeURL := strings.TrimSpace(env["ACTIONS_RUNTIME_URL"])
|
||||
runtimeToken := strings.TrimSpace(env["ACTIONS_RUNTIME_TOKEN"])
|
||||
runID := strings.TrimSpace(env["GITEA_RUN_ID"])
|
||||
if runtimeURL == "" || runtimeToken == "" || runID == "" {
|
||||
return
|
||||
}
|
||||
if rc.Run == nil || rc.Run.Job() == nil {
|
||||
return
|
||||
}
|
||||
// The numeric ActionRunJob ID is not exposed in the proto Task message or task context,
|
||||
// but the server signs it into the ACTIONS_RUNTIME_TOKEN JWT claims. We decode the
|
||||
// unverified claims to retrieve it; the server re-verifies the token on the request.
|
||||
jobID := extractJobIDFromRuntimeToken(runtimeToken)
|
||||
if jobID <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
base := strings.TrimRight(runtimeURL, "/") + "/_apis/pipelines/workflows/" + runID +
|
||||
"/jobs/" + strconv.FormatInt(jobID, 10) + "/steps/"
|
||||
actPath := rc.JobContainer.GetActPath()
|
||||
// Reuse a single client across all step uploads so connections can be pooled.
|
||||
client := &http.Client{Timeout: jobSummaryUploadRequestTimeout}
|
||||
for i := range rc.Run.Job().Steps {
|
||||
summaryPath := path.Join(actPath, "workflow", "step-summary-"+strconv.Itoa(i)+".md")
|
||||
body, ok := readSingleFileFromContainerArchive(ctx, rc.JobContainer, summaryPath, maxJobSummaryBytes)
|
||||
if !ok || len(body) == 0 {
|
||||
continue
|
||||
}
|
||||
uploadJobSummary(ctx, client, base+strconv.Itoa(i)+"/summary", runtimeToken, body)
|
||||
}
|
||||
}
|
||||
|
||||
// extractJobIDFromRuntimeToken returns the JobID claim from an ACTIONS_RUNTIME_TOKEN JWT
|
||||
// without verifying its signature. Returns 0 if the token is unparseable or has no JobID.
|
||||
func extractJobIDFromRuntimeToken(token string) int64 {
|
||||
parts := strings.Split(token, ".")
|
||||
if len(parts) != 3 {
|
||||
return 0
|
||||
}
|
||||
payload, err := base64.RawURLEncoding.DecodeString(parts[1])
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
var claims struct {
|
||||
JobID int64 `json:"JobID"`
|
||||
}
|
||||
if err := json.Unmarshal(payload, &claims); err != nil {
|
||||
return 0
|
||||
}
|
||||
return claims.JobID
|
||||
}
|
||||
|
||||
func hasJobSummaryCapability(caps string) bool {
|
||||
return slices.Contains(strings.FieldsFunc(caps, func(r rune) bool {
|
||||
return r == ',' || unicode.IsSpace(r)
|
||||
}), "job-summary")
|
||||
}
|
||||
|
||||
func uploadJobSummary(ctx context.Context, client *http.Client, url, runtimeToken string, body []byte) {
|
||||
logger := common.Logger(ctx)
|
||||
|
||||
var lastStatus int
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < 2; attempt++ {
|
||||
status, err := putJobSummary(ctx, client, url, runtimeToken, body)
|
||||
if err == nil && status/100 == 2 {
|
||||
return
|
||||
}
|
||||
lastStatus = status
|
||||
lastErr = err
|
||||
if attempt == 1 || !isTransientJobSummaryUploadFailure(status, err) {
|
||||
break
|
||||
}
|
||||
timer := time.NewTimer(jobSummaryUploadRetryDelay)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
lastErr = ctx.Err()
|
||||
attempt = 1
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
|
||||
// Best-effort only; do not fail job, but log because capability was advertised.
|
||||
if lastErr != nil {
|
||||
logger.WithError(lastErr).Warn("job summary upload failed")
|
||||
return
|
||||
}
|
||||
logger.Warnf("job summary upload failed: status=%d", lastStatus)
|
||||
}
|
||||
|
||||
func putJobSummary(ctx context.Context, client *http.Client, url, runtimeToken string, body []byte) (int, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+runtimeToken)
|
||||
req.Header.Set("Content-Type", "text/markdown; charset=utf-8")
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
_, _ = io.Copy(io.Discard, resp.Body)
|
||||
return resp.StatusCode, nil
|
||||
}
|
||||
|
||||
func isTransientJobSummaryUploadFailure(status int, err error) bool {
|
||||
return err != nil || status == http.StatusRequestTimeout || status == http.StatusTooManyRequests || status/100 == 5
|
||||
}
|
||||
|
||||
func readSingleFileFromContainerArchive(ctx context.Context, env container.ExecutionsEnvironment, p string, maxBytes int64) ([]byte, bool) {
|
||||
rc, err := env.GetContainerArchive(ctx, p)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
tr := tar.NewReader(rc)
|
||||
for {
|
||||
header, err := tr.Next()
|
||||
if err == io.EOF {
|
||||
return nil, false
|
||||
}
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
if header.Typeflag != tar.TypeReg {
|
||||
continue
|
||||
}
|
||||
if !archiveEntryMatchesPath(header.Name, p) {
|
||||
continue
|
||||
}
|
||||
// Summaries larger than the limit are truncated rather than dropped, so the
|
||||
// user still gets the leading content (mirroring how GitHub caps oversized
|
||||
// step summaries instead of discarding them). Read one extra byte so an
|
||||
// over-limit file is detected from the actual stream rather than trusting
|
||||
// header.Size, then cap the returned content at maxBytes.
|
||||
b, err := io.ReadAll(io.LimitReader(tr, maxBytes+1))
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
if int64(len(b)) > maxBytes {
|
||||
// Reserve room for the marker so the marked-up result still fits in maxBytes.
|
||||
marker := []byte(jobSummaryTruncationMarker)
|
||||
keep := max(maxBytes-int64(len(marker)), 0)
|
||||
b = append(b[:keep], marker...)
|
||||
common.Logger(ctx).Warnf("job summary truncated: path=%s max=%d", p, maxBytes)
|
||||
}
|
||||
return b, true
|
||||
}
|
||||
}
|
||||
|
||||
func archiveEntryMatchesPath(entryName, requestedPath string) bool {
|
||||
entryName = path.Clean(strings.TrimPrefix(entryName, "/"))
|
||||
requestedPath = path.Clean(strings.TrimPrefix(requestedPath, "/"))
|
||||
return entryName == requestedPath || entryName == path.Base(requestedPath)
|
||||
}
|
||||
|
||||
func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor {
|
||||
return func(ctx context.Context) error {
|
||||
ctx = withStepLogger(ctx, stepModel.Number, stepModel.ID, rc.ExprEval.Interpolate(ctx, stepModel.String()), stage.String())
|
||||
|
||||
Reference in New Issue
Block a user