Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ async def _select_assignment(
master_job_provisioning_data=preconditions.master_job_provisioning_data,
volumes=preconditions.prepared_job_volumes.volumes,
exclude_not_available=True,
skip_backend_offers_on_pool_capacity=True,
)

if fleet_model is None:
Expand Down
150 changes: 88 additions & 62 deletions src/dstack/_internal/server/services/runs/plan.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import math
from collections.abc import Hashable, Mapping
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union

Expand Down Expand Up @@ -280,6 +281,22 @@ async def select_run_candidate_fleet_models_with_filters(
return fleet_models_with_instances, fleet_models_without_instances


@dataclass
class _FleetCandidate:
fleet_model: FleetModel
fleet_spec: FleetSpec
instance_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]]
min_instance_offer_price: float
has_pool_capacity: bool


@dataclass
class _FleetCandidateWithBackendOffers:
candidate: _FleetCandidate
backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]]
sort_key: tuple[bool, float, float]


async def find_optimal_fleet_with_offers(
project: ProjectModel,
fleet_models: list[FleetModel],
Expand All @@ -289,6 +306,7 @@ async def find_optimal_fleet_with_offers(
master_job_provisioning_data: Optional[JobProvisioningData],
volumes: Optional[list[list[Volume]]],
exclude_not_available: bool,
skip_backend_offers_on_pool_capacity: bool = False,
) -> tuple[
Optional[FleetModel],
list[tuple[InstanceModel, InstanceOfferWithAvailability]],
Expand Down Expand Up @@ -320,34 +338,27 @@ async def find_optimal_fleet_with_offers(
# Then choose a fleet with the cheapest pool offer among all fleets with pool offers.
# If there are no fleets with pool offers, choose a fleet with a cheapest backend offer.
# TODO: Consider trying all backend offers and then choosing a fleet.
candidate_fleets_with_offers: list[
tuple[
FleetModel,
list[tuple[InstanceModel, InstanceOfferWithAvailability]],
list[tuple[Backend, InstanceOfferWithAvailability]],
int,
int,
tuple[int, float, float],
]
] = []
for candidate_fleet_model in fleet_models:
candidate_fleet_spec = get_fleet_spec(candidate_fleet_model)

# First step: consider instance offers.
candidates: list[_FleetCandidate] = []
for fleet_model in fleet_models:
fleet_spec = get_fleet_spec(fleet_model)
if (
is_multinode_job(job)
and candidate_fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER
and fleet_spec.configuration.placement != InstanceGroupPlacement.CLUSTER
):
# Limit multinode runs to cluster fleets to guarantee best connectivity.
continue

if not _run_can_fit_into_fleet(run_spec, candidate_fleet_model, candidate_fleet_spec):
if not _run_can_fit_into_fleet(run_spec, fleet_model, fleet_spec):
logger.debug(
"Skipping fleet %s from consideration: run cannot fit into fleet",
candidate_fleet_model.name,
fleet_model.name,
)
continue

all_instance_offers = get_instance_offers_in_fleet(
fleet_model=candidate_fleet_model,
fleet_model=fleet_model,
run_spec=run_spec,
job=job,
# No need to pass master_job_provisioning_data for master job
Expand All @@ -357,61 +368,76 @@ async def find_optimal_fleet_with_offers(
exclude_not_available=False,
)
available_instance_offers = _exclude_non_available_instance_offers(all_instance_offers)
instance_offers = (
available_instance_offers if exclude_not_available else all_instance_offers
)
has_pool_capacity = nodes_required_num <= len(available_instance_offers)
min_instance_offer_price = _get_min_instance_or_backend_offer_price(
available_instance_offers
candidates.append(
_FleetCandidate(
fleet_model=fleet_model,
fleet_spec=fleet_spec,
instance_offers=(
available_instance_offers if exclude_not_available else all_instance_offers
),
min_instance_offer_price=_get_min_instance_or_backend_offer_price(
available_instance_offers
),
has_pool_capacity=nodes_required_num <= len(available_instance_offers),
)
)

backend_offers = await _get_backend_offers_in_fleet(
project=project,
fleet_model=candidate_fleet_model,
fleet_spec=candidate_fleet_spec,
run_spec=run_spec,
job=job,
volumes=volumes,
max_offers=_PER_FLEET_MAX_OFFERS,
)
# If any candidate fleet has pool capacity, the optimal fleet will be one of
# those, so backend offers from any fleet won't affect selection — skip them entirely when allowed.
skip_backend_offers = skip_backend_offers_on_pool_capacity and any(
candidate.has_pool_capacity for candidate in candidates
)

# Second step: gather backend offers unless skipped.
candidates_with_backend_offers: list[_FleetCandidateWithBackendOffers] = []
for candidate in candidates:
backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]]
if skip_backend_offers:
backend_offers = []
else:
backend_offers = await _get_backend_offers_in_fleet(
project=project,
fleet_model=candidate.fleet_model,
fleet_spec=candidate.fleet_spec,
run_spec=run_spec,
job=job,
volumes=volumes,
max_offers=_PER_FLEET_MAX_OFFERS,
)
available_backend_offers = _exclude_non_available_backend_offers(backend_offers)
min_backend_offer_price = _get_min_instance_or_backend_offer_price(
available_backend_offers
)

fleet_priority = (
not has_pool_capacity,
min_instance_offer_price,
min_backend_offer_price,
)
candidate_fleets_with_offers.append(
(
candidate_fleet_model,
instance_offers,
backend_offers,
len(available_instance_offers),
len(available_backend_offers),
fleet_priority,
candidates_with_backend_offers.append(
_FleetCandidateWithBackendOffers(
candidate=candidate,
backend_offers=backend_offers,
# Pool-capacity fleets first; then cheapest pool offer; then cheapest backend.
sort_key=(
not candidate.has_pool_capacity,
candidate.min_instance_offer_price,
_get_min_instance_or_backend_offer_price(available_backend_offers),
),
)
)

if len(candidate_fleets_with_offers) == 0:
if not candidates_with_backend_offers:
return None, [], []

candidate_fleets_with_offers.sort(key=lambda t: t[-1])
optimal_fleet_model, instance_offers = candidate_fleets_with_offers[0][:2]
# Refetch backend offers without limit to return all offers for the optimal fleet.
backend_offers = await _get_backend_offers_in_fleet(
project=project,
fleet_model=optimal_fleet_model,
run_spec=run_spec,
job=job,
volumes=volumes,
max_offers=None,
)
if exclude_not_available:
backend_offers = _exclude_non_available_backend_offers(backend_offers)
optimal = min(candidates_with_backend_offers, key=lambda c: c.sort_key)
optimal_fleet_model = optimal.candidate.fleet_model
instance_offers = optimal.candidate.instance_offers
if skip_backend_offers:
backend_offers = []
else:
# Refetch backend offers without limit to return all offers for the optimal fleet.
backend_offers = await _get_backend_offers_in_fleet(
project=project,
fleet_model=optimal_fleet_model,
run_spec=run_spec,
job=job,
volumes=volumes,
max_offers=None,
)
if exclude_not_available:
backend_offers = _exclude_non_available_backend_offers(backend_offers)
return optimal_fleet_model, instance_offers, backend_offers


Expand Down
Loading