diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 2161eb895..a665d1222 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -516,6 +516,7 @@ async def _select_assignment( master_job_provisioning_data=preconditions.master_job_provisioning_data, volumes=preconditions.prepared_job_volumes.volumes, exclude_not_available=True, + skip_backend_offers_on_pool_capacity=True, ) if fleet_model is None: diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 0493f308e..942bd1302 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -1,5 +1,6 @@ import math from collections.abc import Hashable, Mapping +from dataclasses import dataclass from enum import Enum from typing import Optional, Union @@ -280,6 +281,22 @@ async def select_run_candidate_fleet_models_with_filters( return fleet_models_with_instances, fleet_models_without_instances +@dataclass +class _FleetCandidate: + fleet_model: FleetModel + fleet_spec: FleetSpec + instance_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] + min_instance_offer_price: float + has_pool_capacity: bool + + +@dataclass +class _FleetCandidateWithBackendOffers: + candidate: _FleetCandidate + backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]] + sort_key: tuple[bool, float, float] + + async def find_optimal_fleet_with_offers( project: ProjectModel, fleet_models: list[FleetModel], @@ -289,6 +306,7 @@ async def find_optimal_fleet_with_offers( master_job_provisioning_data: Optional[JobProvisioningData], volumes: Optional[list[list[Volume]]], exclude_not_available: bool, + skip_backend_offers_on_pool_capacity: bool = False, ) -> tuple[ Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]], @@ -320,34 +338,27 @@ async def find_optimal_fleet_with_offers( # Then choose a fleet with the cheapest pool offer among all fleets with pool offers. # If there are no fleets with pool offers, choose a fleet with a cheapest backend offer. # TODO: Consider trying all backend offers and then choosing a fleet. - candidate_fleets_with_offers: list[ - tuple[ - FleetModel, - list[tuple[InstanceModel, InstanceOfferWithAvailability]], - list[tuple[Backend, InstanceOfferWithAvailability]], - int, - int, - tuple[int, float, float], - ] - ] = [] - for candidate_fleet_model in fleet_models: - candidate_fleet_spec = get_fleet_spec(candidate_fleet_model) + + # First step: consider instance offers. + candidates: list[_FleetCandidate] = [] + for fleet_model in fleet_models: + fleet_spec = get_fleet_spec(fleet_model) if ( is_multinode_job(job) - and candidate_fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER + and fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER ): # Limit multinode runs to cluster fleets to guarantee best connectivity. continue - if not _run_can_fit_into_fleet(run_spec, candidate_fleet_model, candidate_fleet_spec): + if not _run_can_fit_into_fleet(run_spec, fleet_model, fleet_spec): logger.debug( "Skipping fleet %s from consideration: run cannot fit into fleet", - candidate_fleet_model.name, + fleet_model.name, ) continue all_instance_offers = get_instance_offers_in_fleet( - fleet_model=candidate_fleet_model, + fleet_model=fleet_model, run_spec=run_spec, job=job, # No need to pass master_job_provisioning_data for master job @@ -357,61 +368,76 @@ async def find_optimal_fleet_with_offers( exclude_not_available=False, ) available_instance_offers = _exclude_non_available_instance_offers(all_instance_offers) - instance_offers = ( - available_instance_offers if exclude_not_available else all_instance_offers - ) - has_pool_capacity = nodes_required_num <= len(available_instance_offers) - min_instance_offer_price = _get_min_instance_or_backend_offer_price( - available_instance_offers + candidates.append( + _FleetCandidate( + fleet_model=fleet_model, + fleet_spec=fleet_spec, + instance_offers=( + available_instance_offers if exclude_not_available else all_instance_offers + ), + min_instance_offer_price=_get_min_instance_or_backend_offer_price( + available_instance_offers + ), + has_pool_capacity=nodes_required_num <= len(available_instance_offers), + ) ) - backend_offers = await _get_backend_offers_in_fleet( - project=project, - fleet_model=candidate_fleet_model, - fleet_spec=candidate_fleet_spec, - run_spec=run_spec, - job=job, - volumes=volumes, - max_offers=_PER_FLEET_MAX_OFFERS, - ) + # If any candidate fleet has pool capacity, the optimal fleet will be one of + # those, so backend offers from any fleet won't affect selection — skip them entirely when allowed. + skip_backend_offers = skip_backend_offers_on_pool_capacity and any( + candidate.has_pool_capacity for candidate in candidates + ) + # Second step: gather backend offers unless skipped. + candidates_with_backend_offers: list[_FleetCandidateWithBackendOffers] = [] + for candidate in candidates: + backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]] + if skip_backend_offers: + backend_offers = [] + else: + backend_offers = await _get_backend_offers_in_fleet( + project=project, + fleet_model=candidate.fleet_model, + fleet_spec=candidate.fleet_spec, + run_spec=run_spec, + job=job, + volumes=volumes, + max_offers=_PER_FLEET_MAX_OFFERS, + ) available_backend_offers = _exclude_non_available_backend_offers(backend_offers) - min_backend_offer_price = _get_min_instance_or_backend_offer_price( - available_backend_offers - ) - - fleet_priority = ( - not has_pool_capacity, - min_instance_offer_price, - min_backend_offer_price, - ) - candidate_fleets_with_offers.append( - ( - candidate_fleet_model, - instance_offers, - backend_offers, - len(available_instance_offers), - len(available_backend_offers), - fleet_priority, + candidates_with_backend_offers.append( + _FleetCandidateWithBackendOffers( + candidate=candidate, + backend_offers=backend_offers, + # Pool-capacity fleets first; then cheapest pool offer; then cheapest backend. + sort_key=( + not candidate.has_pool_capacity, + candidate.min_instance_offer_price, + _get_min_instance_or_backend_offer_price(available_backend_offers), + ), ) ) - if len(candidate_fleets_with_offers) == 0: + if not candidates_with_backend_offers: return None, [], [] - candidate_fleets_with_offers.sort(key=lambda t: t[-1]) - optimal_fleet_model, instance_offers = candidate_fleets_with_offers[0][:2] - # Refetch backend offers without limit to return all offers for the optimal fleet. - backend_offers = await _get_backend_offers_in_fleet( - project=project, - fleet_model=optimal_fleet_model, - run_spec=run_spec, - job=job, - volumes=volumes, - max_offers=None, - ) - if exclude_not_available: - backend_offers = _exclude_non_available_backend_offers(backend_offers) + optimal = min(candidates_with_backend_offers, key=lambda c: c.sort_key) + optimal_fleet_model = optimal.candidate.fleet_model + instance_offers = optimal.candidate.instance_offers + if skip_backend_offers: + backend_offers = [] + else: + # Refetch backend offers without limit to return all offers for the optimal fleet. + backend_offers = await _get_backend_offers_in_fleet( + project=project, + fleet_model=optimal_fleet_model, + run_spec=run_spec, + job=job, + volumes=volumes, + max_offers=None, + ) + if exclude_not_available: + backend_offers = _exclude_non_available_backend_offers(backend_offers) return optimal_fleet_model, instance_offers, backend_offers