From 8d9c675be7cc57b614f31f7a50ab712674612e50 Mon Sep 17 00:00:00 2001 From: mday-io Date: Wed, 3 Jun 2026 13:46:08 -0400 Subject: [PATCH 1/3] fix(clickhouse): support multi-gateway projects with catalog-aware engines ClickHouse's UNSUPPORTED catalog_support caused 2-level FQNs that broke sqlglot MappingSchema when mixed with 3-level FQNs from Trino or other catalog-aware gateways. Fix: auto-inject a virtual catalog (gateway name) for UNSUPPORTED adapters when catalog-aware peers exist, then strip it before any SQL reaches ClickHouse. Signed-off-by: mday-io --- sqlmesh/core/config/scheduler.py | 18 +++- sqlmesh/core/context.py | 6 ++ sqlmesh/core/engine_adapter/base.py | 23 ++++ sqlmesh/core/engine_adapter/clickhouse.py | 35 ++++++ tests/core/engine_adapter/test_clickhouse.py | 37 +++++++ tests/core/test_context.py | 108 +++++++++++++++++++ 6 files changed, 226 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index 9d9d1d3c79..0567592b11 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -138,9 +138,25 @@ def create_plan_evaluator(self, context: GenericContext) -> PlanEvaluator: def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str, str]: default_catalogs_per_gateway: t.Dict[str, str] = {} + unsupported_gateways = [] + for gateway, adapter in context.engine_adapters.items(): - if catalog := adapter.default_catalog: + if adapter.catalog_support.is_unsupported: + unsupported_gateways.append((gateway, adapter)) + elif catalog := adapter.default_catalog: default_catalogs_per_gateway[gateway] = catalog + + # When catalog-aware gateways exist, assign the gateway name as a virtual catalog for + # catalog-unsupported gateways that opt in (e.g. ClickHouse) so that all models in the + # project have a uniform 3-level FQN and the MappingSchema nesting level check passes. + # Only adapters that explicitly return True from supports_virtual_catalog() are mutated; + # other UNSUPPORTED adapters are left unchanged to avoid silent breakage. + if default_catalogs_per_gateway and unsupported_gateways: + for gateway, adapter in unsupported_gateways: + if adapter.supports_virtual_catalog(): + adapter.inject_virtual_catalog(gateway) + default_catalogs_per_gateway[gateway] = gateway + return default_catalogs_per_gateway diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 9d5fe2ff88..5d539c158a 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -487,6 +487,12 @@ def engine_adapter(self) -> EngineAdapter: @property def snapshot_evaluator(self) -> SnapshotEvaluator: if not self._snapshot_evaluator: + # Ensure virtual catalog injection (via default_catalog_per_gateway) has run before + # cloning adapters with with_settings(). Adapters that support virtual catalogs (e.g. + # ClickHouse alongside catalog-aware gateways) mutate _default_catalog during + # get_default_catalog_per_gateway. with_settings() forwards _default_catalog to the + # clone, so the mutation must happen first or the clones will miss the virtual catalog. + self.default_catalog_per_gateway # noqa: B018 self._snapshot_evaluator = SnapshotEvaluator( { gateway: adapter.with_settings(execute_log_level=logging.INFO) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 5465ea1197..e2ec6e07a7 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -218,6 +218,29 @@ def comments_enabled(self) -> bool: def catalog_support(self) -> CatalogSupport: return CatalogSupport.UNSUPPORTED + def supports_virtual_catalog(self) -> bool: + """Return True if this adapter can accept a virtual catalog for multi-gateway nesting alignment. + + When a project mixes catalog-aware gateways (e.g. DuckDB) with catalog-unsupported gateways + (e.g. ClickHouse), all adapters need a uniform 3-level FQN so MappingSchema nesting stays + consistent. Adapters that return True here opt in to receiving an injected virtual catalog + via inject_virtual_catalog(), which causes the set_catalog decorator to strip the catalog + from DDL expressions rather than raising UnsupportedCatalogOperationError. + """ + return False + + def inject_virtual_catalog(self, catalog: str) -> None: + """Inject a virtual catalog name for multi-gateway nesting alignment. + + Only call this on adapters that return True from supports_virtual_catalog(). After + injection, catalog_support should return SINGLE_CATALOG_ONLY so the set_catalog decorator + strips the virtual catalog from DDL expressions instead of raising an error. + """ + raise NotImplementedError( + f"{self.dialect} does not support virtual catalog injection. " + "Override supports_virtual_catalog() to return True and implement inject_virtual_catalog()." + ) + @cached_property def schema_differ(self) -> SchemaDiffer: return SchemaDiffer( diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index c41681ade2..946579ab6d 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -8,6 +8,7 @@ from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport from sqlmesh.core.engine_adapter.shared import ( + CatalogSupport, DataObject, DataObjectType, EngineRunMode, @@ -42,6 +43,22 @@ class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): DEFAULT_TABLE_ENGINE = "MergeTree" ORDER_BY_TABLE_ENGINE_REGEX = "^.*?MergeTree.*$" + @property + def catalog_support(self) -> CatalogSupport: + # When a virtual catalog has been injected via inject_virtual_catalog() (to align + # nesting levels with catalog-aware gateways in the same project), treat ClickHouse as + # SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL + # expressions instead of raising UnsupportedCatalogOperationError. + if self._default_catalog: + return CatalogSupport.SINGLE_CATALOG_ONLY + return CatalogSupport.UNSUPPORTED + + def supports_virtual_catalog(self) -> bool: + return True + + def inject_virtual_catalog(self, catalog: str) -> None: + self._default_catalog = catalog + @property def engine_run_mode(self) -> EngineRunMode: if self._extra_config.get("cloud_mode"): @@ -172,10 +189,28 @@ def create_schema( Clickhouse has a two-level naming scheme [database].[table]. """ + from sqlmesh.utils.errors import SQLMeshError + properties_copy = properties.copy() if self.engine_run_mode.is_cluster: properties_copy.append(exp.OnCluster(this=exp.to_identifier(self.cluster))) + # ClickHouse does not support catalogs. When a virtual catalog has been injected + # (self._default_catalog is set), strip it from the schema name. This mirrors the + # SINGLE_CATALOG_ONLY branch in the set_catalog decorator, which does not apply here + # because this override is not wrapped by @set_catalog(). + if self._default_catalog: + schema_exp = to_schema(schema_name) + catalog_name = schema_exp.catalog + if catalog_name: + if catalog_name != self._default_catalog: + raise SQLMeshError( + f"clickhouse requires that all catalog operations be against a single catalog: " + f"{self._default_catalog}. Provided catalog: {catalog_name}" + ) + schema_exp.set("catalog", None) + schema_name = schema_exp + # can't call super() because it will try to set a catalog return self._create_schema( schema_name=schema_name, diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index b2ff0592d2..30f8d8c226 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -1407,3 +1407,40 @@ def test_exchange_tables( 'RENAME TABLE "table2" TO "table1"', 'DROP TABLE IF EXISTS "__temp_table1_abcd"', ] + + +def test_virtual_catalog_ddl_stripping(make_mocked_engine_adapter: t.Callable): + """After inject_virtual_catalog(), create_schema() with the virtual catalog prefix must strip + the catalog and execute without raising, and with a wrong catalog must raise SQLMeshError.""" + from sqlmesh.utils.errors import SQLMeshError + + adapter = make_mocked_engine_adapter(ClickhouseEngineAdapter) + + assert adapter.supports_virtual_catalog() is True + adapter.inject_virtual_catalog("clickhouse_gw") + + # catalog_support must switch to SINGLE_CATALOG_ONLY after injection + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + assert adapter._default_catalog == "clickhouse_gw" + + # create_schema with the virtual catalog prefix must strip the catalog and not raise + adapter.create_schema("clickhouse_gw.mydb") + assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "mydb"'] + + # create_schema with a wrong catalog must raise SQLMeshError + with pytest.raises(SQLMeshError, match="clickhouse_gw"): + adapter.create_schema("wrong_catalog.mydb") + + +def test_supports_virtual_catalog_returns_true(): + """ClickhouseEngineAdapter.supports_virtual_catalog() must return True without any connection.""" + from unittest.mock import MagicMock + + adapter = ClickhouseEngineAdapter( + lambda *a, **k: MagicMock(), + dialect="clickhouse", + ) + assert adapter.supports_virtual_catalog() is True + assert adapter._default_catalog is None diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 49b7e56e55..b46e7f477c 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -399,6 +399,114 @@ def test_multiple_gateways(tmp_path: Path): assert context.dag._sorted == ['"db"."staging"."stg_model"', '"db"."main"."final_model"'] +def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker): + """ClickHouse (catalog UNSUPPORTED) alongside DuckDB (catalog FULL_SUPPORT) must not raise a + nesting-level SchemaError when models are loaded. + + Expected behaviour after the fix: + - get_default_catalog_per_gateway assigns the gateway name as a virtual catalog for + catalog-unsupported gateways when catalog-aware gateways are present. + - ClickHouse models end up with a 3-level FQN so the MappingSchema nesting is uniform. + - The virtual catalog is stripped from DDL expressions (not raised as an error) because the + adapter's catalog_support flips to SINGLE_CATALOG_ONLY when _default_catalog is set. + """ + + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + db_path = str(tmp_path / "db.db") + + # Build a real DuckDB adapter for the primary gateway. + duck_adapter = DuckDBEngineAdapter( + lambda *a, **k: __import__("duckdb").connect(db_path), + dialect="duckdb", + ) + + # Build a minimal ClickHouse adapter stub — no real connection needed. + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + + # Simulate the context's engine_adapters dict and call the scheduler directly. + engine_adapters = { + "duckdb_gw": duck_adapter, + "clickhouse_gw": ch_adapter, + } + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = engine_adapters + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # DuckDB gateway must have a real catalog entry. + assert "duckdb_gw" in catalog_per_gw + # DuckDB's default catalog is the database filename without extension. + assert catalog_per_gw["duckdb_gw"] == "db" + # ClickHouse gateway must now also have a virtual catalog equal to its gateway name. + assert "clickhouse_gw" in catalog_per_gw + assert catalog_per_gw["clickhouse_gw"] == "clickhouse_gw" + + # The ClickHouse adapter's _default_catalog must be mutated to the virtual catalog name. + assert ch_adapter._default_catalog == "clickhouse_gw" + + # The adapter's catalog_support must now be SINGLE_CATALOG_ONLY (not UNSUPPORTED), + # so that the set_catalog decorator strips the virtual catalog instead of raising. + assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + # Loading models for both gateways must not raise a SchemaError. + duckdb_model = load_sql_based_model( + parse("MODEL(name main.duckdb_tbl, kind FULL, gateway duckdb_gw);\nSELECT 1 AS col"), + default_catalog="db", + ) + ch_model = load_sql_based_model( + parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"), + default_catalog="clickhouse_gw", + ) + + # Both models must have 3-level FQNs so MappingSchema nesting is uniform. + assert duckdb_model.fqn.count(".") == 2, ( + f"Expected 3-level FQN for duckdb model, got: {duckdb_model.fqn}" + ) + assert ch_model.fqn.count(".") == 2, f"Expected 3-level FQN for ch model, got: {ch_model.fqn}" + + # Both models loaded into the same MappingSchema must not raise a nesting SchemaError. + from sqlglot.schema import MappingSchema + + schema = MappingSchema(normalize=False) + schema.add_table(duckdb_model.fqn, duckdb_model.columns_to_types or {}) + schema.add_table(ch_model.fqn, ch_model.columns_to_types or {}) + + +def test_single_gateway_clickhouse_no_virtual_catalog(mocker): + """When ClickHouse is the only gateway (no catalog-aware peer), it must NOT receive a virtual + catalog. Models remain 2-level and catalog_support stays UNSUPPORTED.""" + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + ) + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = {"clickhouse_gw": ch_adapter} + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # With only a catalog-unsupported gateway there must be no entry at all. + assert "clickhouse_gw" not in catalog_per_gw + + # The adapter must remain unchanged — no virtual catalog injected. + assert ch_adapter._default_catalog is None + assert ch_adapter.catalog_support == CatalogSupport.UNSUPPORTED + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model( From 212efc0dfa6dad8ce8b470815ca5fbfc159d541b Mon Sep 17 00:00:00 2001 From: mday-io Date: Wed, 3 Jun 2026 15:30:10 -0400 Subject: [PATCH 2/3] Clarify virtual catalog --- docs/integrations/engines/clickhouse.md | 42 ++++++++++++++++++- sqlmesh/core/config/connection.py | 7 +++- sqlmesh/core/config/scheduler.py | 6 ++- sqlmesh/core/engine_adapter/clickhouse.py | 5 ++- tests/core/engine_adapter/test_clickhouse.py | 36 ++++++++++++++-- tests/core/test_context.py | 44 +++++++++++++++++--- 6 files changed, 127 insertions(+), 13 deletions(-) diff --git a/docs/integrations/engines/clickhouse.md b/docs/integrations/engines/clickhouse.md index 14e931b046..6074e75e48 100644 --- a/docs/integrations/engines/clickhouse.md +++ b/docs/integrations/engines/clickhouse.md @@ -420,6 +420,45 @@ If a model has many records in each partition, you may see additional performanc Choose a model's time partitioning granularity based on the characteristics of the data it will process, making sure the total number of partitions is 1000 or fewer. +## Multi-gateway setup + +ClickHouse does not have a catalog concept — its fully-qualified table names are two-level (`database.table`), not three-level (`catalog.database.table`). + +When a SQLMesh project uses ClickHouse alongside a catalog-aware gateway such as Trino or BigQuery, the two gateway types produce FQNs with different nesting depths. SQLMesh's internal schema tracking requires uniform nesting, so it assigns a **virtual catalog** to ClickHouse models at load time. + +### How the virtual catalog works + +- SQLMesh automatically detects the nesting mismatch and injects a virtual catalog into each ClickHouse adapter when a catalog-aware gateway is also present. +- ClickHouse models will appear with three-level FQNs in `sqlmesh plan` output and logs — for example, `__ch_prod__.mydb.mytable` for a gateway named `ch_prod`. +- The virtual catalog prefix is **never sent to ClickHouse**. It is stripped from every DDL and DML statement before execution. +- When ClickHouse is the only gateway in a project, no virtual catalog is assigned and models remain two-level. + +### Adding a second gateway to an existing ClickHouse-only project + +If your project previously used ClickHouse as the only gateway, your models were fingerprinted with 2-level FQNs (`db.table`). Adding a catalog-aware gateway for the first time causes all ClickHouse models to be treated as new versions (their FQNs change to `__{gateway_name}__.db.table`), triggering a full re-materialization on the next `sqlmesh apply`. This is a one-time cost. + +### Virtual catalog naming + +By default, the virtual catalog name is derived from **the gateway name you chose in your config**, wrapped in double underscores — for example, a gateway named `clickhouse` produces `__clickhouse__`, and a gateway named `ch_prod` produces `__ch_prod__`. The double-underscore wrapping makes it visually clear that this is an internal SQLMesh concept, not a real ClickHouse object. + +You can override the default name by setting `virtual_catalog` in your ClickHouse connection configuration: + +```yaml +gateways: + clickhouse: + connection: + type: clickhouse + host: my-clickhouse-host + username: default + virtual_catalog: ch_virtual # optional; defaults to __{gateway_name}__ (e.g. __clickhouse__) + trino: + connection: + type: trino + ... +``` + +With this configuration, ClickHouse models will appear as `ch_virtual.mydb.mytable` in plan output instead of `__clickhouse__.mydb.mytable`. + ## Local/Built-in Scheduler **Engine Adapter Type**: `clickhouse` @@ -446,4 +485,5 @@ If a model has many records in each partition, you may see additional performanc | `server_host_name` | The ClickHouse server hostname as identified by the CN or SNI of its TLS certificate. Set this to avoid SSL errors when connecting through a proxy or tunnel with a different hostname. | string | N | | `tls_mode` | Controls advanced TLS behavior. proxy and strict do not invoke ClickHouse mutual TLS connection, but do send client cert and key. mutual assumes ClickHouse mutual TLS auth with a client certificate. | string | N | | `connection_settings` | Additional [connection settings](https://clickhouse.com/docs/integrations/python#settings-argument) | dict | N | -| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N | \ No newline at end of file +| `connection_pool_options` | Additional [options](https://clickhouse.com/docs/integrations/python#customizing-the-http-connection-pool) for the HTTP connection pool | dict | N | +| `virtual_catalog` | Override the virtual catalog name used when ClickHouse runs alongside a catalog-aware gateway (e.g. Trino). Defaults to `__{gateway_name}__`. See [Multi-gateway setup](#multi-gateway-setup) for details. | string | N | \ No newline at end of file diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 343414eab2..ff53ef8dab 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -2085,6 +2085,7 @@ class ClickhouseConnectionConfig(ConnectionConfig): password: t.Optional[str] = None port: t.Optional[int] = None cluster: t.Optional[str] = None + virtual_catalog: t.Optional[str] = None connect_timeout: int = 10 send_receive_timeout: int = 300 query_limit: int = 0 @@ -2180,7 +2181,11 @@ def cloud_mode(self) -> bool: @property def _extra_engine_config(self) -> t.Dict[str, t.Any]: - return {"cluster": self.cluster, "cloud_mode": self.cloud_mode} + return { + "cluster": self.cluster, + "cloud_mode": self.cloud_mode, + "virtual_catalog": self.virtual_catalog, + } @property def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: diff --git a/sqlmesh/core/config/scheduler.py b/sqlmesh/core/config/scheduler.py index 0567592b11..4cce9b0f76 100644 --- a/sqlmesh/core/config/scheduler.py +++ b/sqlmesh/core/config/scheduler.py @@ -155,7 +155,11 @@ def get_default_catalog_per_gateway(self, context: GenericContext) -> t.Dict[str for gateway, adapter in unsupported_gateways: if adapter.supports_virtual_catalog(): adapter.inject_virtual_catalog(gateway) - default_catalogs_per_gateway[gateway] = gateway + # Read the actual virtual catalog name back from the adapter — it may differ + # from the gateway name if the user configured a custom virtual_catalog value. + # inject_virtual_catalog() always sets _default_catalog so default_catalog + # cannot return None at this point. + default_catalogs_per_gateway[gateway] = adapter.default_catalog # type: ignore[assignment] return default_catalogs_per_gateway diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index 946579ab6d..284f3dd485 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -56,8 +56,9 @@ def catalog_support(self) -> CatalogSupport: def supports_virtual_catalog(self) -> bool: return True - def inject_virtual_catalog(self, catalog: str) -> None: - self._default_catalog = catalog + def inject_virtual_catalog(self, gateway: str) -> None: + configured = self._extra_config.get("virtual_catalog") + self._default_catalog = f"__{gateway}__" if configured is None else configured @property def engine_run_mode(self) -> EngineRunMode: diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index 30f8d8c226..3075e6ecea 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -1423,14 +1423,15 @@ def test_virtual_catalog_ddl_stripping(make_mocked_engine_adapter: t.Callable): from sqlmesh.core.engine_adapter.shared import CatalogSupport assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY - assert adapter._default_catalog == "clickhouse_gw" + # The default synthetic virtual catalog wraps the gateway name in double underscores. + assert adapter._default_catalog == "__clickhouse_gw__" # create_schema with the virtual catalog prefix must strip the catalog and not raise - adapter.create_schema("clickhouse_gw.mydb") + adapter.create_schema("__clickhouse_gw__.mydb") assert to_sql_calls(adapter) == ['CREATE DATABASE IF NOT EXISTS "mydb"'] # create_schema with a wrong catalog must raise SQLMeshError - with pytest.raises(SQLMeshError, match="clickhouse_gw"): + with pytest.raises(SQLMeshError, match="__clickhouse_gw__"): adapter.create_schema("wrong_catalog.mydb") @@ -1444,3 +1445,32 @@ def test_supports_virtual_catalog_returns_true(): ) assert adapter.supports_virtual_catalog() is True assert adapter._default_catalog is None + + +def test_inject_virtual_catalog_uses_custom_config(make_mocked_engine_adapter: t.Callable): + """When virtual_catalog is set in _extra_config, inject_virtual_catalog uses that value + instead of the synthetic __gateway_name__ default.""" + adapter = make_mocked_engine_adapter( + ClickhouseEngineAdapter, + virtual_catalog="my_custom_catalog", + ) + + adapter.inject_virtual_catalog("clickhouse_gw") + + # The user-configured value must take precedence over the synthetic default. + assert adapter._default_catalog == "my_custom_catalog" + + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + assert adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + +def test_clickhouse_connection_config_virtual_catalog_extra_engine_config(): + """virtual_catalog set on ClickhouseConnectionConfig must appear in _extra_engine_config + so that the value reaches the adapter's _extra_config dict.""" + from sqlmesh.core.config.connection import ClickhouseConnectionConfig + + config = ClickhouseConnectionConfig( + host="localhost", username="user", virtual_catalog="my_catalog" + ) + assert config._extra_engine_config.get("virtual_catalog") == "my_catalog" diff --git a/tests/core/test_context.py b/tests/core/test_context.py index b46e7f477c..19089ee8af 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -446,12 +446,12 @@ def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker): assert "duckdb_gw" in catalog_per_gw # DuckDB's default catalog is the database filename without extension. assert catalog_per_gw["duckdb_gw"] == "db" - # ClickHouse gateway must now also have a virtual catalog equal to its gateway name. + # ClickHouse gateway must now also have a virtual catalog wrapped in double underscores. assert "clickhouse_gw" in catalog_per_gw - assert catalog_per_gw["clickhouse_gw"] == "clickhouse_gw" + assert catalog_per_gw["clickhouse_gw"] == "__clickhouse_gw__" - # The ClickHouse adapter's _default_catalog must be mutated to the virtual catalog name. - assert ch_adapter._default_catalog == "clickhouse_gw" + # The ClickHouse adapter's _default_catalog must be mutated to the synthetic virtual catalog. + assert ch_adapter._default_catalog == "__clickhouse_gw__" # The adapter's catalog_support must now be SINGLE_CATALOG_ONLY (not UNSUPPORTED), # so that the set_catalog decorator strips the virtual catalog instead of raising. @@ -464,7 +464,7 @@ def test_multi_gateway_catalog_aware_and_unsupported(tmp_path: Path, mocker): ) ch_model = load_sql_based_model( parse("MODEL(name mydb.ch_tbl, kind FULL, gateway clickhouse_gw);\nSELECT 1 AS col"), - default_catalog="clickhouse_gw", + default_catalog="__clickhouse_gw__", ) # Both models must have 3-level FQNs so MappingSchema nesting is uniform. @@ -507,6 +507,40 @@ def test_single_gateway_clickhouse_no_virtual_catalog(mocker): assert ch_adapter.catalog_support == CatalogSupport.UNSUPPORTED +def test_multi_gateway_clickhouse_custom_virtual_catalog(tmp_path: Path, mocker): + """When virtual_catalog is configured on the ClickHouse connection, that value is used as the + virtual catalog instead of the synthetic __gateway_name__ default.""" + from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig + from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter + from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter + from sqlmesh.core.engine_adapter.shared import CatalogSupport + + db_path = str(tmp_path / "db.db") + + duck_adapter = DuckDBEngineAdapter( + lambda *a, **k: __import__("duckdb").connect(db_path), + dialect="duckdb", + ) + + # Pass virtual_catalog via _extra_config (the same path used by ClickhouseConnectionConfig). + ch_adapter = ClickhouseEngineAdapter( + lambda *a, **k: mocker.NonCallableMock(), + dialect="clickhouse", + virtual_catalog="my_custom_catalog", + ) + + ctx_mock = mocker.MagicMock() + ctx_mock.engine_adapters = {"duckdb_gw": duck_adapter, "clickhouse_gw": ch_adapter} + + scheduler = BuiltInSchedulerConfig() + catalog_per_gw = scheduler.get_default_catalog_per_gateway(ctx_mock) + + # The configured virtual_catalog value must be used, not __clickhouse_gw__. + assert catalog_per_gw["clickhouse_gw"] == "my_custom_catalog" + assert ch_adapter._default_catalog == "my_custom_catalog" + assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model( From 89696e506c4b0fd70e14f070e1f43cdd7709b522 Mon Sep 17 00:00:00 2001 From: mday-io Date: Wed, 3 Jun 2026 15:53:29 -0400 Subject: [PATCH 3/3] harden virtual catalog injection and config validation --- sqlmesh/core/config/connection.py | 9 +++++++++ sqlmesh/core/context.py | 16 ++++++++++------ sqlmesh/core/engine_adapter/clickhouse.py | 7 +++---- tests/core/engine_adapter/test_clickhouse.py | 16 ++++++++++++++++ tests/core/test_context.py | 17 +++++++++++++++++ 5 files changed, 55 insertions(+), 10 deletions(-) diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index ff53ef8dab..ca68f89b36 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -2120,6 +2120,15 @@ class ClickhouseConnectionConfig(ConnectionConfig): _engine_import_validator = _get_engine_import_validator("clickhouse_connect", "clickhouse") + @field_validator("virtual_catalog") + def validate_virtual_catalog(cls, v: t.Optional[str]) -> t.Optional[str]: + if v is not None and not v.strip(): + raise ConfigError( + "virtual_catalog cannot be an empty string. " + "Omit the field to use the default synthetic prefix (____)." + ) + return v + @property def _connection_kwargs_keys(self) -> t.Set[str]: kwargs = { diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 5d539c158a..19271debc1 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -487,12 +487,7 @@ def engine_adapter(self) -> EngineAdapter: @property def snapshot_evaluator(self) -> SnapshotEvaluator: if not self._snapshot_evaluator: - # Ensure virtual catalog injection (via default_catalog_per_gateway) has run before - # cloning adapters with with_settings(). Adapters that support virtual catalogs (e.g. - # ClickHouse alongside catalog-aware gateways) mutate _default_catalog during - # get_default_catalog_per_gateway. with_settings() forwards _default_catalog to the - # clone, so the mutation must happen first or the clones will miss the virtual catalog. - self.default_catalog_per_gateway # noqa: B018 + self._ensure_virtual_catalog_injection() self._snapshot_evaluator = SnapshotEvaluator( { gateway: adapter.with_settings(execute_log_level=logging.INFO) @@ -503,6 +498,15 @@ def snapshot_evaluator(self) -> SnapshotEvaluator: ) return self._snapshot_evaluator + def _ensure_virtual_catalog_injection(self) -> None: + """Ensure virtual catalog injection has run before adapters are cloned for SnapshotEvaluator. + + Injection is a side effect of get_default_catalog_per_gateway. In normal usage it fires + earlier (default_catalog is accessed during model loading), but this guard covers the edge + case where snapshot_evaluator is accessed directly on a fresh context before any model ops. + """ + _ = self.default_catalog_per_gateway + def execution_context( self, deployability_index: t.Optional[DeployabilityIndex] = None, diff --git a/sqlmesh/core/engine_adapter/clickhouse.py b/sqlmesh/core/engine_adapter/clickhouse.py index 284f3dd485..8408ed73e8 100644 --- a/sqlmesh/core/engine_adapter/clickhouse.py +++ b/sqlmesh/core/engine_adapter/clickhouse.py @@ -45,10 +45,9 @@ class ClickhouseEngineAdapter(EngineAdapterWithIndexSupport, LogicalMergeMixin): @property def catalog_support(self) -> CatalogSupport: - # When a virtual catalog has been injected via inject_virtual_catalog() (to align - # nesting levels with catalog-aware gateways in the same project), treat ClickHouse as - # SINGLE_CATALOG_ONLY so the set_catalog decorator strips the virtual catalog from DDL - # expressions instead of raising UnsupportedCatalogOperationError. + # This property is intentionally dynamic: it transitions from UNSUPPORTED to + # SINGLE_CATALOG_ONLY after inject_virtual_catalog() sets _default_catalog. Callers must + # not cache the result — always read it live so they see the post-injection state. if self._default_catalog: return CatalogSupport.SINGLE_CATALOG_ONLY return CatalogSupport.UNSUPPORTED diff --git a/tests/core/engine_adapter/test_clickhouse.py b/tests/core/engine_adapter/test_clickhouse.py index 3075e6ecea..b72aa422a4 100644 --- a/tests/core/engine_adapter/test_clickhouse.py +++ b/tests/core/engine_adapter/test_clickhouse.py @@ -1474,3 +1474,19 @@ def test_clickhouse_connection_config_virtual_catalog_extra_engine_config(): host="localhost", username="user", virtual_catalog="my_catalog" ) assert config._extra_engine_config.get("virtual_catalog") == "my_catalog" + + +def test_clickhouse_connection_config_virtual_catalog_empty_string_rejected(): + """virtual_catalog: "" is a footgun — the empty string propagates to _default_catalog, + which is falsy, so catalog_support stays UNSUPPORTED and the nesting error persists. + Reject it at config parse time with a clear message.""" + import pytest + + from sqlmesh.core.config.connection import ClickhouseConnectionConfig + from sqlmesh.utils.errors import ConfigError + + with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"): + ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog="") + + with pytest.raises(ConfigError, match="virtual_catalog cannot be an empty string"): + ClickhouseConnectionConfig(host="localhost", username="user", virtual_catalog=" ") diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 19089ee8af..761207a0d3 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -541,6 +541,23 @@ def test_multi_gateway_clickhouse_custom_virtual_catalog(tmp_path: Path, mocker) assert ch_adapter.catalog_support == CatalogSupport.SINGLE_CATALOG_ONLY +def test_snapshot_evaluator_calls_ensure_virtual_catalog_injection(mocker): + """snapshot_evaluator must call _ensure_virtual_catalog_injection before cloning adapters. + + This guards the edge case where snapshot_evaluator is the first property accessed on a fresh + context — before default_catalog fires during model loading — and ensures virtual catalog + injection still happens even in that order. + """ + ctx = Context(config=Config()) + ctx._snapshot_evaluator = None # force re-initialization + + inject_spy = mocker.patch.object(ctx, "_ensure_virtual_catalog_injection") + + _ = ctx.snapshot_evaluator + + inject_spy.assert_called_once() + + def test_plan_execution_time(): context = Context(config=Config()) context.upsert_model(