From b9b8da31f3a7ea8068952f69b659bafeac77519b Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 1 Jun 2026 12:20:38 +0500 Subject: [PATCH] Reduce pipeline processing latency via fetch hints and skip flag --- .../background/pipeline_tasks/jobs_running.py | 22 ++++++++++++++++++- .../pipeline_tasks/jobs_submitted.py | 1 + 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py index 45bd39a2d..068add9a6 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py @@ -321,6 +321,12 @@ async def process(self, item: JobRunningPipelineItem): job_model=context.job_model, result=result, ) + new_status = result.job_update_map.get("status") + if new_status == JobStatus.PULLING: + self._pipeline_hinter.hint_fetch(JobModel.__name__) + # Hint run pipeline for fast run transition to RUNNING status. + if new_status == JobStatus.RUNNING and context.job_model.run.status != RunStatus.RUNNING: + self._pipeline_hinter.hint_fetch(RunModel.__name__) @dataclass @@ -609,7 +615,9 @@ async def _refetch_locked_job_model( ) .options(joinedload(JobModel.instance).joinedload(InstanceModel.project)) .options(joinedload(JobModel.probes).load_only(ProbeModel.success_streak)) - .options(joinedload(JobModel.run).load_only(RunModel.id, RunModel.run_spec)) + .options( + joinedload(JobModel.run).load_only(RunModel.id, RunModel.run_spec, RunModel.status) + ) .execution_options(populate_existing=True) ) return res.unique().scalar_one_or_none() @@ -981,6 +989,18 @@ async def _apply_process_result( if result.new_probe_models: session.add_all(result.new_probe_models) + # Set RunModel.skip_min_processing_interval for fast run transition to RUNNING status. + # Cross-pipeline write is ok: worst case skip_min_processing_interval is overridden. + if ( + result.job_update_map.get("status") == JobStatus.RUNNING + and job_model.run.status != RunStatus.RUNNING + ): + await session.execute( + update(RunModel) + .where(RunModel.id == job_model.run_id) + .values(skip_min_processing_interval=True) + ) + _emit_result_events(session=session, job_model=job_model, result=result) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 2161eb895..e58e5530a 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -345,6 +345,7 @@ async def process(self, item: JobSubmittedPipelineItem): context=context, assignment=assignment, ) + self._pipeline_hinter.hint_fetch(JobModel.__name__) @dataclass