From f5abbba71646a0594e0d03dc4a3357bbf49123d4 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sat, 18 Apr 2026 11:43:15 +0800 Subject: [PATCH] test(poll): cover Shutdown timeout branch reaching shutdownJobs() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add TestPoller_ShutdownForcesJobsOnTimeout to lock in the Shutdown() fix: when a task is parked on jobsCtx and the Shutdown context's deadline elapses before graceful drain, the timeout branch must reach p.shutdownJobs() and force-cancel the job — rather than blocking on <-p.done as the previous (broken) implementation did. Per @silverwind's review on #822. --- internal/app/poll/poller_test.go | 58 ++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/internal/app/poll/poller_test.go b/internal/app/poll/poller_test.go index ce8be585..6a69da49 100644 --- a/internal/app/poll/poller_test.go +++ b/internal/app/poll/poller_test.go @@ -200,3 +200,61 @@ func TestPoller_ConcurrencyLimitedByCapacity(t *testing.T) { assert.Equal(t, int64(totalTasks), runner.totalCompleted.Load(), "all tasks should have been executed") } + +// TestPoller_ShutdownForcesJobsOnTimeout locks in the fix for a +// pre-existing bug where Shutdown's timeout branch used a blocking +// `<-p.done` receive, leaving p.shutdownJobs() unreachable. With a +// task parked on jobsCtx and a Shutdown deadline shorter than the +// task's natural completion, Shutdown must force-cancel via +// shutdownJobs() and return ctx.Err() promptly — not block until the +// task would have finished on its own. +func TestPoller_ShutdownForcesJobsOnTimeout(t *testing.T) { + var served atomic.Bool + cli := mocks.NewClient(t) + cli.On("FetchTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) { + if served.CompareAndSwap(false, true) { + return connect_go.NewResponse(&runnerv1.FetchTaskResponse{ + Task: &runnerv1.Task{Id: 1}, + }), nil + } + return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil + }, + ) + + // delay >> Shutdown timeout: Run only returns when jobsCtx is + // cancelled by shutdownJobs(). + runner := &mockRunner{delay: 30 * time.Second} + + cfg, err := config.LoadDefault("") + require.NoError(t, err) + cfg.Runner.Capacity = 1 + cfg.Runner.FetchInterval = 10 * time.Millisecond + cfg.Runner.FetchIntervalMax = 10 * time.Millisecond + + poller := New(cfg, cli, runner) + + var wg sync.WaitGroup + wg.Go(poller.Poll) + + require.Eventually(t, func() bool { + return runner.running.Load() == 1 + }, time.Second, 10*time.Millisecond, "task should start running") + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + start := time.Now() + err = poller.Shutdown(ctx) + elapsed := time.Since(start) + + require.ErrorIs(t, err, context.DeadlineExceeded) + // With the fix, Shutdown returns shortly after the deadline once + // the forced job unwinds. Without the fix, the blocking <-p.done + // would hang for the full 30s mockRunner delay. + assert.Less(t, elapsed, 5*time.Second, + "Shutdown must not block on the parked task; shutdownJobs() must run on timeout") + + wg.Wait() + assert.Equal(t, int64(1), runner.totalCompleted.Load(), + "the parked task must be cancelled and unwound") +}