test(poll): cover Shutdown timeout branch reaching shutdownJobs()

Add TestPoller_ShutdownForcesJobsOnTimeout to lock in the Shutdown()
fix: when a task is parked on jobsCtx and the Shutdown context's
deadline elapses before graceful drain, the timeout branch must reach
p.shutdownJobs() and force-cancel the job — rather than blocking on
<-p.done as the previous (broken) implementation did.

Per @silverwind's review on #822.
This commit is contained in:
Bo-Yi Wu
2026-04-18 11:43:15 +08:00
parent a60030390d
commit f5abbba716

View File

@@ -200,3 +200,61 @@ func TestPoller_ConcurrencyLimitedByCapacity(t *testing.T) {
assert.Equal(t, int64(totalTasks), runner.totalCompleted.Load(),
"all tasks should have been executed")
}
// TestPoller_ShutdownForcesJobsOnTimeout locks in the fix for a
// pre-existing bug where Shutdown's timeout branch used a blocking
// `<-p.done` receive, leaving p.shutdownJobs() unreachable. With a
// task parked on jobsCtx and a Shutdown deadline shorter than the
// task's natural completion, Shutdown must force-cancel via
// shutdownJobs() and return ctx.Err() promptly — not block until the
// task would have finished on its own.
func TestPoller_ShutdownForcesJobsOnTimeout(t *testing.T) {
var served atomic.Bool
cli := mocks.NewClient(t)
cli.On("FetchTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
if served.CompareAndSwap(false, true) {
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{
Task: &runnerv1.Task{Id: 1},
}), nil
}
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil
},
)
// delay >> Shutdown timeout: Run only returns when jobsCtx is
// cancelled by shutdownJobs().
runner := &mockRunner{delay: 30 * time.Second}
cfg, err := config.LoadDefault("")
require.NoError(t, err)
cfg.Runner.Capacity = 1
cfg.Runner.FetchInterval = 10 * time.Millisecond
cfg.Runner.FetchIntervalMax = 10 * time.Millisecond
poller := New(cfg, cli, runner)
var wg sync.WaitGroup
wg.Go(poller.Poll)
require.Eventually(t, func() bool {
return runner.running.Load() == 1
}, time.Second, 10*time.Millisecond, "task should start running")
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
start := time.Now()
err = poller.Shutdown(ctx)
elapsed := time.Since(start)
require.ErrorIs(t, err, context.DeadlineExceeded)
// With the fix, Shutdown returns shortly after the deadline once
// the forced job unwinds. Without the fix, the blocking <-p.done
// would hang for the full 30s mockRunner delay.
assert.Less(t, elapsed, 5*time.Second,
"Shutdown must not block on the parked task; shutdownJobs() must run on timeout")
wg.Wait()
assert.Equal(t, int64(1), runner.totalCompleted.Load(),
"the parked task must be cancelled and unwound")
}