From b3507195274f2283c3e43eca0ccc75864d4494d8 Mon Sep 17 00:00:00 2001 From: Rowan Bohde Date: Thu, 25 Apr 2024 13:53:18 -0500 Subject: [PATCH] feat: allow graceful shutdowns Add a Shutdown(context.Context) method to the Poller. This will first shutdown all active polling, preventing any new jobs from spawning. It will then wait for either all jobs to finish, or for the context to finish. If the context finishes, it will then force all jobs to end, and then exit. --- internal/app/cmd/daemon.go | 7 +++-- internal/app/poll/poller.go | 56 ++++++++++++++++++++++++++++++++----- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go index 5dccc1c..fc47fd2 100644 --- a/internal/app/cmd/daemon.go +++ b/internal/app/cmd/daemon.go @@ -122,9 +122,12 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, poller := poll.New(cfg, cli, runner) - poller.Poll(ctx) + go poller.Poll() - return nil + <-ctx.Done() + log.Infof("runner: %s gracefully shutting down", resp.Msg.Runner.Name) + + return poller.Shutdown(context.Background()) } } diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index 70a1f45..31d5a5d 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -25,40 +25,82 @@ type Poller struct { runner *run.Runner cfg *config.Config tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea. + + pollingCtx context.Context + shutdownPolling context.CancelFunc + + jobsCtx context.Context + shutdownJobs context.CancelFunc + + done chan struct{} } func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { + pollingCtx, shutdownPolling := context.WithCancel(context.Background()) + + jobsCtx, shutdownJobs := context.WithCancel(context.Background()) + + done := make(chan struct{}) + return &Poller{ client: client, runner: runner, cfg: cfg, + + pollingCtx: pollingCtx, + shutdownPolling: shutdownPolling, + + jobsCtx: jobsCtx, + shutdownJobs: shutdownJobs, + + done: done, } } -func (p *Poller) Poll(ctx context.Context) { +func (p *Poller) Poll() { limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) wg := &sync.WaitGroup{} for i := 0; i < p.cfg.Runner.Capacity; i++ { wg.Add(1) - go p.poll(ctx, wg, limiter) + go p.poll(wg, limiter) } wg.Wait() + + // signal that we shutdown + close(p.done) } -func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) { +func (p *Poller) Shutdown(ctx context.Context) error { + p.shutdownPolling() + + select { + // gracefully shutdown + case <-p.done: + return nil + + // Our timeout for shutting down ran out + case <-ctx.Done(): + // force a shutdown of all running jobs + p.shutdownJobs() + return ctx.Err() + } +} + +func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) { defer wg.Done() for { - if err := limiter.Wait(ctx); err != nil { - if ctx.Err() != nil { + if err := limiter.Wait(p.pollingCtx); err != nil { + if p.pollingCtx.Err() != nil { log.WithError(err).Debug("limiter wait failed") } return } - task, ok := p.fetchTask(ctx) + task, ok := p.fetchTask(p.pollingCtx) if !ok { continue } - p.runTaskWithRecover(ctx, task) + + p.runTaskWithRecover(p.jobsCtx, task) } }