diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go index 5dccc1c..fc47fd2 100644 --- a/internal/app/cmd/daemon.go +++ b/internal/app/cmd/daemon.go @@ -122,9 +122,12 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, poller := poll.New(cfg, cli, runner) - poller.Poll(ctx) + go poller.Poll() - return nil + <-ctx.Done() + log.Infof("runner: %s gracefully shutting down", resp.Msg.Runner.Name) + + return poller.Shutdown(context.Background()) } } diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index 70a1f45..31d5a5d 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -25,40 +25,82 @@ type Poller struct { runner *run.Runner cfg *config.Config tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea. + + pollingCtx context.Context + shutdownPolling context.CancelFunc + + jobsCtx context.Context + shutdownJobs context.CancelFunc + + done chan struct{} } func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { + pollingCtx, shutdownPolling := context.WithCancel(context.Background()) + + jobsCtx, shutdownJobs := context.WithCancel(context.Background()) + + done := make(chan struct{}) + return &Poller{ client: client, runner: runner, cfg: cfg, + + pollingCtx: pollingCtx, + shutdownPolling: shutdownPolling, + + jobsCtx: jobsCtx, + shutdownJobs: shutdownJobs, + + done: done, } } -func (p *Poller) Poll(ctx context.Context) { +func (p *Poller) Poll() { limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) wg := &sync.WaitGroup{} for i := 0; i < p.cfg.Runner.Capacity; i++ { wg.Add(1) - go p.poll(ctx, wg, limiter) + go p.poll(wg, limiter) } wg.Wait() + + // signal that we shutdown + close(p.done) } -func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) { +func (p *Poller) Shutdown(ctx context.Context) error { + p.shutdownPolling() + + select { + // gracefully shutdown + case <-p.done: + return nil + + // Our timeout for shutting down ran out + case <-ctx.Done(): + // force a shutdown of all running jobs + p.shutdownJobs() + return ctx.Err() + } +} + +func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) { defer wg.Done() for { - if err := limiter.Wait(ctx); err != nil { - if ctx.Err() != nil { + if err := limiter.Wait(p.pollingCtx); err != nil { + if p.pollingCtx.Err() != nil { log.WithError(err).Debug("limiter wait failed") } return } - task, ok := p.fetchTask(ctx) + task, ok := p.fetchTask(p.pollingCtx) if !ok { continue } - p.runTaskWithRecover(ctx, task) + + p.runTaskWithRecover(p.jobsCtx, task) } }