Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion docs/integrations/engines/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,45 @@ If a model has many records in each partition, you may see additional performanc

Choose a model's time partitioning granularity based on the characteristics of the data it will process, making sure the total number of partitions is 1000 or fewer.

## Multi-gateway setup

ClickHouse does not have a catalog concept — its fully-qualified table names are two-level (`database.table`), not three-level (`catalog.database.table`).

When a SQLMesh project uses ClickHouse alongside a catalog-aware gateway such as Trino or BigQuery, the two gateway types produce FQNs with different nesting depths. SQLMesh's internal schema tracking requires uniform nesting, so it assigns a **virtual catalog** to ClickHouse models at load time.

### How the virtual catalog works

- SQLMesh automatically detects the nesting mismatch and injects a virtual catalog into each ClickHouse adapter when a catalog-aware gateway is also present.
- ClickHouse models will appear with three-level FQNs in `sqlmesh plan` output and logs — for example, `__ch_prod__.mydb.mytable` for a gateway named `ch_prod`.
- The virtual catalog prefix is **never sent to ClickHouse**. It is stripped from every DDL and DML statement before execution.
- When ClickHouse is the only gateway in a project, no virtual catalog is assigned and models remain two-level.

### Adding a second gateway to an existing ClickHouse-only project

If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway for the first time causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`), triggering a full re-materialization on the next `sqlmesh apply`. This is a one-time cost.

### Virtual catalog naming

By default, the virtual catalog name is derived from **the gateway name you chose in your config**, wrapped in double underscores — for example, a gateway named `clickhouse` produces `__clickhouse__`, and a gateway named `ch_prod` produces `__ch_prod__`. The double-underscore wrapping makes it visually clear that this is an internal SQLMesh concept, not a real ClickHouse object.

You can override the default name by setting `virtual_catalog` in your ClickHouse connection configuration:

```yaml
gateways:
clickhouse:
connection:
type: clickhouse
host: my-clickhouse-host
username: default
virtual_catalog: ch_virtual # optional; defaults to __{gateway_name}__ (e.g. __clickhouse__)
trino:
connection:
type: trino
...
```

With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytable` in plan output instead of `__clickhouse__.mydb.mytable`.

## Local/Built-in Scheduler

**Engine Adapter Type**: `clickhouse`
Expand All @@ -446,4 +485,5 @@ If a model has many records in each partition, you may see additional performanc
| `server_host_name` | The ClickHouse server hostname as identified by the CN or SNI of its TLS certificate. Set this to avoid SSL errors when connecting through a proxy or tunnel with a different hostname. | string | N |
| `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N |
| `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N |
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N |
| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N |
16 changes: 15 additions & 1 deletion sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,7 @@ class ClickhouseConnectionConfig(ConnectionConfig):
password: t.Optional[str] = None
port: t.Optional[int] = None
cluster: t.Optional[str] = None
virtual_catalog: t.Optional[str] = None
connect_timeout: int = 10
send_receive_timeout: int = 300
query_limit: int = 0
Expand Down Expand Up @@ -2119,6 +2120,15 @@ class ClickhouseConnectionConfig(ConnectionConfig):

_engine_import_validator = _get_engine_import_validator("clickhouse_connect", "clickhouse")

@field_validator("virtual_catalog")
def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]:
if v is not None and not v.strip():
raise ConfigError(
"virtual_catalog cannot be an empty string. "
"Omit the field to use the default synthetic prefix (__<gateway_name>__)."
)
return v

@property
def _connection_kwargs_keys(self) -> t.Set[str]:
kwargs = {
Expand Down Expand Up @@ -2180,7 +2190,11 @@ def cloud_mode(self) -> bool:

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"cluster": self.cluster, "cloud_mode": self.cloud_mode}
return {
"cluster": self.cluster,
"cloud_mode": self.cloud_mode,
"virtual_catalog": self.virtual_catalog,
}

@property
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
Expand Down
22 changes: 21 additions & 1 deletion sqlmesh/core/config/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,29 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator:

def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]:
default_catalogs_per_gateway: t.Dict[str, str] = {}
unsupported_gateways = []

for gateway, adapter in context.engine_adapters.items():
if catalog := adapter.default_catalog:
if adapter.catalog_support.is_unsupported:
unsupported_gateways.append((gateway, adapter))
elif catalog := adapter.default_catalog:
default_catalogs_per_gateway[gateway] = catalog

# When catalog-aware gateways exist, assign the gateway name as a virtual catalog for
# catalog-unsupported gateways that opt in (e.g. ClickHouse) so that all models in the
# project have a uniform 3-level FQN and the MappingSchema nesting level check passes.
# Only adapters that explicitly return True from supports_virtual_catalog() are mutated;
# other UNSUPPORTED adapters are left unchanged to avoid silent breakage.
if default_catalogs_per_gateway and unsupported_gateways:
for gateway, adapter in unsupported_gateways:
if adapter.supports_virtual_catalog():
adapter.inject_virtual_catalog(gateway)
# Read the actual virtual catalog name back from the adapter — it may differ
# from the gateway name if the user configured a custom virtual_catalog value.
# inject_virtual_catalog() always sets _default_catalog so default_catalog
# cannot return None at this point.
default_catalogs_per_gateway[gateway] = adapter.default_catalog # type: ignore[assignment]

return default_catalogs_per_gateway


Expand Down
10 changes: 10 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ def engine_adapter(self) -> EngineAdapter:
@property
def snapshot_evaluator(self) -> SnapshotEvaluator:
if not self._snapshot_evaluator:
self._ensure_virtual_catalog_injection()
self._snapshot_evaluator = SnapshotEvaluator(
{
gateway: adapter.with_settings(execute_log_level=logging.INFO)
Expand All @@ -497,6 +498,15 @@ def snapshot_evaluator(self) -> SnapshotEvaluator:
)
return self._snapshot_evaluator

def _ensure_virtual_catalog_injection(self) -> None:
"""Ensure virtual catalog injection has run before adapters are cloned for SnapshotEvaluator.

Injection is a side effect of get_default_catalog_per_gateway. In normal usage it fires
earlier (default_catalog is accessed during model loading), but this guard covers the edge
case where snapshot_evaluator is accessed directly on a fresh context before any model ops.
"""
_ = self.default_catalog_per_gateway

def execution_context(
self,
deployability_index: t.Optional[DeployabilityIndex] = None,
Expand Down
23 changes: 23 additions & 0 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,29 @@ def comments_enabled(self) -> bool:
def catalog_support(self) -> CatalogSupport:
return CatalogSupport.UNSUPPORTED

def supports_virtual_catalog(self) -> bool:
"""Return True if this adapter can accept a virtual catalog for multi-gateway nesting alignment.

When a project mixes catalog-aware gateways (e.g. DuckDB) with catalog-unsupported gateways
(e.g. ClickHouse), all adapters need a uniform 3-level FQN so MappingSchema nesting stays
consistent. Adapters that return True here opt in to receiving an injected virtual catalog
via inject_virtual_catalog(), which causes the set_catalog decorator to strip the catalog
from DDL expressions rather than raising UnsupportedCatalogOperationError.
"""
return False

def inject_virtual_catalog(self, catalog: str) -> None:
"""Inject a virtual catalog name for multi-gateway nesting alignment.

Only call this on adapters that return True from supports_virtual_catalog(). After
injection, catalog_support should return SINGLE_CATALOG_ONLY so the set_catalog decorator
strips the virtual catalog from DDL expressions instead of raising an error.
"""
raise NotImplementedError(
f"{self.dialect} does not support virtual catalog injection. "
"Override supports_virtual_catalog() to return True and implement inject_virtual_catalog()."
)

@cached_property
def schema_differ(self) -> SchemaDiffer:
return SchemaDiffer(
Expand Down
35 changes: 35 additions & 0 deletions sqlmesh/core/engine_adapter/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport
from sqlmesh.core.engine_adapter.shared import (
CatalogSupport,
DataObject,
DataObjectType,
EngineRunMode,
Expand Down Expand Up @@ -42,6 +43,22 @@ class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin):
DEFAULT_TABLE_ENGINE = "MergeTree"
ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$"

@property
def catalog_support(self) -> CatalogSupport:
# This property is intentionally dynamic: it transitions from UNSUPPORTED to
# SINGLE_CATALOG_ONLY after inject_virtual_catalog() sets _default_catalog. Callers must
# not cache the result — always read it live so they see the post-injection state.
if self._default_catalog:
return CatalogSupport.SINGLE_CATALOG_ONLY
return CatalogSupport.UNSUPPORTED

def supports_virtual_catalog(self) -> bool:
return True

def inject_virtual_catalog(self, gateway: str) -> None:
configured = self._extra_config.get("virtual_catalog")
self._default_catalog = f"__{gateway}__" if configured is None else configured

@property
def engine_run_mode(self) -> EngineRunMode:
if self._extra_config.get("cloud_mode"):
Expand Down Expand Up @@ -172,10 +189,28 @@ def create_schema(

Clickhouse has a two-level naming scheme [database].[table].
"""
from sqlmesh.utils.errors import SQLMeshError

properties_copy = properties.copy()
if self.engine_run_mode.is_cluster:
properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster)))

# ClickHouse does not support catalogs. When a virtual catalog has been injected
# (self._default_catalog is set), strip it from the schema name. This mirrors the
# SINGLE_CATALOG_ONLY branch in the set_catalog decorator, which does not apply here
# because this override is not wrapped by @set_catalog().
if self._default_catalog:
schema_exp = to_schema(schema_name)
catalog_name = schema_exp.catalog
if catalog_name:
if catalog_name != self._default_catalog:
raise SQLMeshError(
f"clickhouse requires that all catalog operations be against a single catalog: "
f"{self._default_catalog}. Provided catalog: {catalog_name}"
)
schema_exp.set("catalog", None)
schema_name = schema_exp

# can't call super() because it will try to set a catalog
return self._create_schema(
schema_name=schema_name,
Expand Down
83 changes: 83 additions & 0 deletions tests/core/engine_adapter/test_clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -1407,3 +1407,86 @@ def test_exchange_tables(
'RENAME TABLE "table2" TO "table1"',
'DROP TABLE IF EXISTS "__temp_table1_abcd"',
]


def test_virtual_catalog_ddl_stripping(make_mocked_engine_adapter: t.Callable):
"""After inject_virtual_catalog(), create_schema() with the virtual catalog prefix must strip
the catalog and execute without raising, and with a wrong catalog must raise SQLMeshError."""
from sqlmesh.utils.errors import SQLMeshError

adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter)

assert adapter.supports_virtual_catalog() is True
adapter.inject_virtual_catalog("clickhouse_gw")

# catalog_support must switch to SINGLE_CATALOG_ONLY after injection
from sqlmesh.core.engine_adapter.shared import CatalogSupport

assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY
# The default synthetic virtual catalog wraps the gateway name in double underscores.
assert adapter._default_catalog == "__clickhouse_gw__"

# create_schema with the virtual catalog prefix must strip the catalog and not raise
adapter.create_schema("__clickhouse_gw__.mydb")
assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "mydb"']

# create_schema with a wrong catalog must raise SQLMeshError
with pytest.raises(SQLMeshError, match="__clickhouse_gw__"):
adapter.create_schema("wrong_catalog.mydb")


def test_supports_virtual_catalog_returns_true():
"""ClickhouseEngineAdapter.supports_virtual_catalog() must return True without any connection."""
from unittest.mock import MagicMock

adapter = ClickhouseEngineAdapter(
lambda *a, **k: MagicMock(),
dialect="clickhouse",
)
assert adapter.supports_virtual_catalog() is True
assert adapter._default_catalog is None


def test_inject_virtual_catalog_uses_custom_config(make_mocked_engine_adapter: t.Callable):
"""When virtual_catalog is set in _extra_config, inject_virtual_catalog uses that value
instead of the synthetic __gateway_name__ default."""
adapter = make_mocked_engine_adapter(
ClickhouseEngineAdapter,
virtual_catalog="my_custom_catalog",
)

adapter.inject_virtual_catalog("clickhouse_gw")

# The user-configured value must take precedence over the synthetic default.
assert adapter._default_catalog == "my_custom_catalog"

from sqlmesh.core.engine_adapter.shared import CatalogSupport

assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY


def test_clickhouse_connection_config_virtual_catalog_extra_engine_config():
"""virtual_catalog set on ClickhouseConnectionConfig must appear in _extra_engine_config
so that the value reaches the adapter's _extra_config dict."""
from sqlmesh.core.config.connection import ClickhouseConnectionConfig

config = ClickhouseConnectionConfig(
host="localhost", username="user", virtual_catalog="my_catalog"
)
assert config._extra_engine_config.get("virtual_catalog") == "my_catalog"


def test_clickhouse_connection_config_virtual_catalog_empty_string_rejected():
"""virtual_catalog: "" is a footgun — the empty string propagates to _default_catalog,
which is falsy, so catalog_support stays UNSUPPORTED and the nesting error persists.
Reject it at config parse time with a clear message."""
import pytest

from sqlmesh.core.config.connection import ClickhouseConnectionConfig
from sqlmesh.utils.errors import ConfigError

with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"):
ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog="")

with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"):
ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog=" ")
Loading