Skip to content

Allow long-running jobs to be processed as they come#457

Open
antonije wants to merge 9 commits into
runpod:mainfrom
antonije:main
Open

Allow long-running jobs to be processed as they come#457
antonije wants to merge 9 commits into
runpod:mainfrom
antonije:main

Conversation

@antonije
Copy link
Copy Markdown

These changes fix the following issues:

  • long-running jobs were being blocked until the first one completes
  • jobs having double states (being in "QUEUE" and "IN_PROCESS") before actually being accepted
  • stop recreating the jobs_queue and setting initial maxsize

NOTE: I have added longer sleep in the run_jobs method to suit my needs, but feel free to edit those.

…progress

Previously, jobs were marked as "in progress" in `get_jobs()` immediately after being fetched from the server, before they were actually scheduled by `run_jobs()`. This caused jobs to appear both in the queue and in progress simultaneously, which inflated `current_occupancy()` and blocked new jobs from starting until earlier ones had finished.

The fix moves the `job_progress.add(job)` call into `run_jobs()`, right after the job is dequeued and scheduled as a task. This ensures that:

- Jobs are only marked "in progress" once they actually start running.
- Queue and progress metrics no longer overlap.
- Concurrency limits are enforced correctly (e.g. with concurrency=4, two jobs now run in parallel instead of sequentially).

As a result, jobs are dispatched immediately when capacity is available, and status logging reflects the real state of the worker.
- add a timeout of 100 milliseconds that doesn't block the code and accepts new jobs
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the serverless worker’s JobScaler loop to better support long-running jobs by decoupling job acquisition from job execution, avoiding queue backpressure, and aligning job state tracking with when work actually begins.

Changes:

  • Replace the bounded asyncio.Queue(maxsize=concurrency) with an unbounded queue to prevent acquisition from blocking on long-running jobs.
  • Adjust scaling logic to update current_concurrency without recreating the queue.
  • Rework run_jobs to manage a live set of tasks and process jobs as capacity opens up; move job_progress.add() to dequeue-time to avoid “queued + in-process” double-state.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 81 to +87
while self.current_occupancy() > 0:
# not safe to scale when jobs are in flight
await asyncio.sleep(1)
continue

self.jobs_queue = asyncio.Queue(maxsize=self.current_concurrency)
log.debug(
f"JobScaler.set_scale | New concurrency set to: {self.current_concurrency}"
)
self.current_concurrency = new_concurrency
log.debug(f"JobScaler.set_scale | New concurrency set to: {self.current_concurrency}")
Runs the block in an infinite loop while the worker is alive or jobs queue is not empty.
"""
tasks = [] # Store the tasks for concurrent job processing
tasks: set[asyncio.Task] = set() # Store the tasks for concurrent job processing
Comment on lines 236 to 240
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
tasks,
timeout=0.1,
return_when=asyncio.FIRST_COMPLETED,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants