diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index 871503f0ae..7e8392c6eb 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -1702,6 +1702,44 @@ def _open_url(self, url: str, timeout: int = 10): from specify_cli.authentication.http import open_url return open_url(url, timeout) + def _validate_catalog_payload(self, catalog_data: Any, url: str) -> None: + """Validate a parsed catalog payload's shape. + + Applied to both network-fetched and cache-loaded payloads so a + once-poisoned cache (older spec-kit version, manual edit, upstream + served a bad payload before the network-side guards were added) + cannot re-crash ``_get_merged_extensions`` on subsequent calls. + + Checking only key presence would let a payload like + ``{"extensions": []}`` or ``{"extensions": null}`` slip through + here and then crash with ``AttributeError: 'list' object has no + attribute 'items'`` deep inside ``_get_merged_extensions``. The + sibling integration catalog reader already guards both the root + object and the nested mapping (see ``integrations/catalog.py``); + the extension catalog must stay consistent so a malformed payload + surfaces as the user-facing ``Invalid catalog format`` error + instead of a raw Python traceback. + + Args: + catalog_data: Parsed JSON payload from the catalog source. + url: Source URL — used in the error message so the user can + tell which catalog in a multi-catalog stack is malformed. + + Raises: + ExtensionError: If the payload's shape is invalid. + """ + if not isinstance(catalog_data, dict): + raise ExtensionError( + f"Invalid catalog format from {url}: expected a JSON object" + ) + if "schema_version" not in catalog_data or "extensions" not in catalog_data: + raise ExtensionError(f"Invalid catalog format from {url}") + if not isinstance(catalog_data.get("extensions"), dict): + raise ExtensionError( + f"Invalid catalog format from {url}: " + "'extensions' must be a JSON object" + ) + def get_active_catalogs(self) -> List[CatalogEntry]: """Get the ordered list of active catalogs. @@ -1817,21 +1855,48 @@ def _fetch_single_catalog(self, entry: CatalogEntry, force_refresh: bool = False is_valid = False if not force_refresh and cache_file.exists() and cache_meta_file.exists(): try: - metadata = json.loads(cache_meta_file.read_text()) + metadata = json.loads( + cache_meta_file.read_text(encoding="utf-8") + ) cached_at = datetime.fromisoformat(metadata.get("cached_at", "")) if cached_at.tzinfo is None: cached_at = cached_at.replace(tzinfo=timezone.utc) age = (datetime.now(timezone.utc) - cached_at).total_seconds() is_valid = age < self.CACHE_DURATION - except (json.JSONDecodeError, ValueError, KeyError, TypeError): - # If metadata is invalid or missing expected fields, treat cache as invalid + except ( + json.JSONDecodeError, + OSError, + UnicodeError, + ValueError, + KeyError, + TypeError, + ): + # Cache validity is best-effort: invalid/missing metadata + # fields, an unreadable metadata file (permissions / disk), + # or a wrongly-encoded metadata file (written by a tool + # using the system locale codec) all degrade to "cache + # invalid" so the caller falls through to a network + # refetch instead of crashing. pass - # Use cache if valid + # Use cache if valid. A previously-cached payload must clear the + # same shape checks as a freshly-fetched one — otherwise a once- + # poisoned cache (older spec-kit version, manual edit, upstream + # served a bad payload before the network-side guards were added) + # would re-crash on every invocation despite the cache being + # "valid" by age. If validation fails on the cached read, fall + # through to the network fetch path so the cache gets refreshed. if is_valid: try: - return json.loads(cache_file.read_text()) - except json.JSONDecodeError: + cached_data = json.loads(cache_file.read_text(encoding="utf-8")) + self._validate_catalog_payload(cached_data, entry.url) + return cached_data + except (json.JSONDecodeError, OSError, UnicodeError, ExtensionError): + # Cache is best-effort: a JSON-decode failure, an OS-level + # read failure (permissions / disk / handle limit), or a + # text-encoding failure on a cache file written by an older + # client all fall through to the network fetch path. Only + # the network failure is surfaced to the caller. pass # Fetch from network @@ -1839,16 +1904,25 @@ def _fetch_single_catalog(self, entry: CatalogEntry, force_refresh: bool = False with self._open_url(entry.url, timeout=10) as response: catalog_data = json.loads(response.read()) - if "schema_version" not in catalog_data or "extensions" not in catalog_data: - raise ExtensionError(f"Invalid catalog format from {entry.url}") + self._validate_catalog_payload(catalog_data, entry.url) - # Save to cache + # Save to cache. Both files are explicitly UTF-8 to match the + # ``read_text(encoding="utf-8")`` on the read side and the + # ``integrations/catalog.py:193-203`` precedent. Without this, + # platforms whose default encoding isn't UTF-8 would write + # locale-encoded bytes that the read path can't decode, forcing + # an unnecessary network refetch on every invocation. self.cache_dir.mkdir(parents=True, exist_ok=True) - cache_file.write_text(json.dumps(catalog_data, indent=2)) - cache_meta_file.write_text(json.dumps({ - "cached_at": datetime.now(timezone.utc).isoformat(), - "catalog_url": entry.url, - }, indent=2)) + cache_file.write_text( + json.dumps(catalog_data, indent=2), encoding="utf-8" + ) + cache_meta_file.write_text( + json.dumps({ + "cached_at": datetime.now(timezone.utc).isoformat(), + "catalog_url": entry.url, + }, indent=2), + encoding="utf-8", + ) return catalog_data @@ -1895,6 +1969,16 @@ def _get_merged_extensions(self, force_refresh: bool = False) -> List[Dict[str, continue for ext_id, ext_data in catalog_data.get("extensions", {}).items(): + # Per-entry guard: ``_fetch_single_catalog`` already validates + # that ``catalog_data["extensions"]`` is a mapping, but it + # does not (and should not) validate every entry shape there + # — one malformed entry shouldn't poison an otherwise valid + # catalog. Skip non-mapping entries here so a payload like + # ``{"extensions": {"foo": [], "bar": {...}}}`` still merges + # the valid entries without crashing on ``**ext_data``. + # Mirrors ``integrations/catalog.py:245``. + if not isinstance(ext_data, dict): + continue if ext_id not in merged: # Higher-priority catalog wins merged[ext_id] = { **ext_data, @@ -1911,6 +1995,12 @@ def _get_merged_extensions(self, force_refresh: bool = False) -> List[Dict[str, def is_cache_valid(self) -> bool: """Check if cached catalog is still valid. + Returns ``False`` for any read/decoding failure on the metadata + file (missing fields, malformed JSON, permissions / disk errors, + wrong text encoding) so callers fall through to a network refetch + instead of crashing. Treating cache validity as best-effort + matches the contract used by the per-URL cache check below. + Returns: True if cache exists and is within cache duration """ @@ -1918,13 +2008,22 @@ def is_cache_valid(self) -> bool: return False try: - metadata = json.loads(self.cache_metadata_file.read_text()) + metadata = json.loads( + self.cache_metadata_file.read_text(encoding="utf-8") + ) cached_at = datetime.fromisoformat(metadata.get("cached_at", "")) if cached_at.tzinfo is None: cached_at = cached_at.replace(tzinfo=timezone.utc) age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds() return age_seconds < self.CACHE_DURATION - except (json.JSONDecodeError, ValueError, KeyError, TypeError): + except ( + json.JSONDecodeError, + OSError, + UnicodeError, + ValueError, + KeyError, + TypeError, + ): return False def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]: @@ -1939,36 +2038,57 @@ def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]: Raises: ExtensionError: If catalog cannot be fetched """ - # Check cache first unless force refresh + catalog_url = self.get_catalog_url() + + # Check the cache first unless ``force_refresh`` was requested, + # then fall through to a network fetch. Match the + # ``_fetch_single_catalog`` cache contract: a poisoned or + # unreadable cache silently falls through to a network refetch + # rather than crashing the caller. ``_validate_catalog_payload`` + # is reused here so a cache written by an older client + # (pre-validation) is rejected and refreshed instead of returning + # the stale malformed payload. ``is_cache_valid`` itself swallows + # OSError/UnicodeError on the metadata read, so a cache-validity + # check can't crash this method before the read-side fallback + # runs. if not force_refresh and self.is_cache_valid(): try: - return json.loads(self.cache_file.read_text()) - except json.JSONDecodeError: + cached_data = json.loads(self.cache_file.read_text(encoding="utf-8")) + self._validate_catalog_payload(cached_data, catalog_url) + return cached_data + except (json.JSONDecodeError, OSError, UnicodeError, ExtensionError): pass # Fall through to network fetch - # Fetch from network - catalog_url = self.get_catalog_url() - try: import urllib.error with self._open_url(catalog_url, timeout=10) as response: catalog_data = json.loads(response.read()) - # Validate catalog structure - if "schema_version" not in catalog_data or "extensions" not in catalog_data: - raise ExtensionError("Invalid catalog format") - - # Save to cache + # Validate catalog structure. Reuses the same helper as + # ``_fetch_single_catalog`` so all three branches (root type, + # missing keys, nested-mapping type) stay consistent. + self._validate_catalog_payload(catalog_data, catalog_url) + + # Save to cache. Explicit UTF-8 on both writes mirrors the + # ``read_text(encoding="utf-8")`` on the read side and the + # ``integrations/catalog.py:193-203`` precedent — otherwise + # platforms whose default encoding isn't UTF-8 would write + # locale-encoded bytes the read path can't decode, forcing an + # unnecessary refetch on every invocation. self.cache_dir.mkdir(parents=True, exist_ok=True) - self.cache_file.write_text(json.dumps(catalog_data, indent=2)) + self.cache_file.write_text( + json.dumps(catalog_data, indent=2), encoding="utf-8" + ) # Save cache metadata metadata = { "cached_at": datetime.now(timezone.utc).isoformat(), "catalog_url": catalog_url, } - self.cache_metadata_file.write_text(json.dumps(metadata, indent=2)) + self.cache_metadata_file.write_text( + json.dumps(metadata, indent=2), encoding="utf-8" + ) return catalog_data diff --git a/src/specify_cli/presets.py b/src/specify_cli/presets.py index fd9d4745f1..d6bd2488d3 100644 --- a/src/specify_cli/presets.py +++ b/src/specify_cli/presets.py @@ -1860,6 +1860,48 @@ def _open_url(self, url: str, timeout: int = 10): from specify_cli.authentication.http import open_url return open_url(url, timeout) + def _validate_catalog_payload(self, catalog_data: Any, url: str) -> None: + """Validate a parsed preset-catalog payload's shape. + + Applied to both network-fetched and cache-loaded payloads so a + once-poisoned cache (older spec-kit version, manual edit, upstream + served a bad payload before the network-side guards were added) + cannot re-crash ``_get_merged_packs`` on subsequent calls. + + Checking only key presence would let a payload like + ``{"presets": []}`` or ``{"presets": null}`` slip through here and + then crash with ``AttributeError: 'list' object has no attribute + 'items'`` deep inside ``_get_merged_packs``. The sibling + integration catalog reader already guards both the root object and + the nested mapping (see ``integrations/catalog.py``); the preset + catalog must stay consistent so a malformed payload surfaces as + the user-facing ``Invalid preset catalog format`` error instead of + a raw Python traceback. + + Args: + catalog_data: Parsed JSON payload from the catalog source. + url: Source URL — used in the error message so the user can + tell which catalog in a multi-catalog stack is malformed. + + Raises: + PresetError: If the payload's shape is invalid. + """ + if not isinstance(catalog_data, dict): + raise PresetError( + f"Invalid preset catalog format from {url}: " + "expected a JSON object" + ) + if ( + "schema_version" not in catalog_data + or "presets" not in catalog_data + ): + raise PresetError(f"Invalid preset catalog format from {url}") + if not isinstance(catalog_data.get("presets"), dict): + raise PresetError( + f"Invalid preset catalog format from {url}: " + "'presets' must be a JSON object" + ) + def _load_catalog_config(self, config_path: Path) -> Optional[List[PresetCatalogEntry]]: """Load catalog stack configuration from a YAML file. @@ -2009,7 +2051,7 @@ def _is_url_cache_valid(self, url: str) -> bool: if not cache_file.exists() or not metadata_file.exists(): return False try: - metadata = json.loads(metadata_file.read_text()) + metadata = json.loads(metadata_file.read_text(encoding="utf-8")) cached_at = datetime.fromisoformat(metadata.get("cached_at", "")) if cached_at.tzinfo is None: cached_at = cached_at.replace(tzinfo=timezone.utc) @@ -2017,7 +2059,19 @@ def _is_url_cache_valid(self, url: str) -> bool: datetime.now(timezone.utc) - cached_at ).total_seconds() return age_seconds < self.CACHE_DURATION - except (json.JSONDecodeError, ValueError, KeyError, TypeError): + except ( + json.JSONDecodeError, + OSError, + UnicodeError, + ValueError, + KeyError, + TypeError, + ): + # Cache validity is best-effort: invalid/missing fields, an + # unreadable metadata file (permissions / disk), or a wrongly + # encoded one (written by a tool using the system locale + # codec) all degrade to "cache invalid" so the caller falls + # through to a network refetch instead of crashing. return False def _fetch_single_catalog(self, entry: PresetCatalogEntry, force_refresh: bool = False) -> Dict[str, Any]: @@ -2035,29 +2089,48 @@ def _fetch_single_catalog(self, entry: PresetCatalogEntry, force_refresh: bool = """ cache_file, metadata_file = self._get_cache_paths(entry.url) + # Use cache if valid. A previously-cached payload must clear the + # same shape checks as a freshly-fetched one — otherwise a once- + # poisoned cache would re-crash on every invocation despite the + # cache being "valid" by age. If validation fails on the cached + # read, fall through to the network fetch path so the cache gets + # refreshed. if not force_refresh and self._is_url_cache_valid(entry.url): try: - return json.loads(cache_file.read_text()) - except json.JSONDecodeError: + cached_data = json.loads(cache_file.read_text(encoding="utf-8")) + self._validate_catalog_payload(cached_data, entry.url) + return cached_data + except (json.JSONDecodeError, OSError, UnicodeError, PresetError): + # Cache is best-effort: a JSON-decode failure, an OS-level + # read failure (permissions / disk / handle limit), or a + # text-encoding failure on a cache file written by an + # older client all fall through to the network fetch path. + # Only the network failure is surfaced to the caller. pass try: with self._open_url(entry.url, timeout=10) as response: catalog_data = json.loads(response.read()) - if ( - "schema_version" not in catalog_data - or "presets" not in catalog_data - ): - raise PresetError("Invalid preset catalog format") + self._validate_catalog_payload(catalog_data, entry.url) + # Both files are written explicitly as UTF-8 to match the + # ``read_text(encoding="utf-8")`` on the read side and the + # ``integrations/catalog.py:193-203`` precedent. Without this, + # platforms whose default encoding isn't UTF-8 would write + # locale-encoded bytes the read path can't decode, forcing an + # unnecessary refetch on every invocation. self.cache_dir.mkdir(parents=True, exist_ok=True) - cache_file.write_text(json.dumps(catalog_data, indent=2)) + cache_file.write_text( + json.dumps(catalog_data, indent=2), encoding="utf-8" + ) metadata = { "cached_at": datetime.now(timezone.utc).isoformat(), "catalog_url": entry.url, } - metadata_file.write_text(json.dumps(metadata, indent=2)) + metadata_file.write_text( + json.dumps(metadata, indent=2), encoding="utf-8" + ) return catalog_data @@ -2083,6 +2156,17 @@ def _get_merged_packs(self, force_refresh: bool = False) -> Dict[str, Dict[str, try: data = self._fetch_single_catalog(entry, force_refresh) for pack_id, pack_data in data.get("presets", {}).items(): + # Per-entry guard: ``_fetch_single_catalog`` already + # validates that ``data["presets"]`` is a mapping, but it + # does not (and should not) validate every entry shape + # there — one malformed entry shouldn't poison an + # otherwise valid catalog. Skip non-mapping entries here + # so a payload like ``{"presets": {"foo": [], "bar": + # {...}}}`` still merges the valid entries without + # crashing on ``**pack_data``. Mirrors + # ``integrations/catalog.py:245``. + if not isinstance(pack_data, dict): + continue pack_data_with_catalog = {**pack_data, "_catalog_name": entry.name, "_install_allowed": entry.install_allowed} merged[pack_id] = pack_data_with_catalog except PresetError: @@ -2093,6 +2177,12 @@ def _get_merged_packs(self, force_refresh: bool = False) -> Dict[str, Dict[str, def is_cache_valid(self) -> bool: """Check if cached catalog is still valid. + Returns ``False`` for any read/decoding failure on the metadata + file (missing fields, malformed JSON, permissions / disk errors, + wrong text encoding) so callers fall through to a network refetch + instead of crashing. Treating cache validity as best-effort + matches the contract used by ``_is_url_cache_valid`` above. + Returns: True if cache exists and is within cache duration """ @@ -2100,7 +2190,9 @@ def is_cache_valid(self) -> bool: return False try: - metadata = json.loads(self.cache_metadata_file.read_text()) + metadata = json.loads( + self.cache_metadata_file.read_text(encoding="utf-8") + ) cached_at = datetime.fromisoformat(metadata.get("cached_at", "")) if cached_at.tzinfo is None: cached_at = cached_at.replace(tzinfo=timezone.utc) @@ -2108,7 +2200,14 @@ def is_cache_valid(self) -> bool: datetime.now(timezone.utc) - cached_at ).total_seconds() return age_seconds < self.CACHE_DURATION - except (json.JSONDecodeError, ValueError, KeyError, TypeError): + except ( + json.JSONDecodeError, + OSError, + UnicodeError, + ValueError, + KeyError, + TypeError, + ): return False def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]: @@ -2125,34 +2224,54 @@ def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]: """ catalog_url = self.get_catalog_url() + # Match the ``_fetch_single_catalog`` cache contract: a poisoned + # or unreadable cache silently falls through to a network refetch + # rather than crashing the caller. ``_validate_catalog_payload`` + # is reused here so a cache written by an older client + # (pre-validation) is rejected and refreshed instead of returning + # the stale malformed payload. if not force_refresh and self.is_cache_valid(): try: - metadata = json.loads(self.cache_metadata_file.read_text()) + metadata = json.loads( + self.cache_metadata_file.read_text(encoding="utf-8") + ) if metadata.get("catalog_url") == catalog_url: - return json.loads(self.cache_file.read_text()) - except (json.JSONDecodeError, OSError): - # Cache is corrupt or unreadable; fall through to network fetch + cached_data = json.loads( + self.cache_file.read_text(encoding="utf-8") + ) + self._validate_catalog_payload(cached_data, catalog_url) + return cached_data + except (json.JSONDecodeError, OSError, UnicodeError, PresetError): + # Cache is corrupt, unreadable, or fails the shape check; + # fall through to network fetch. pass try: with self._open_url(catalog_url, timeout=10) as response: catalog_data = json.loads(response.read()) - if ( - "schema_version" not in catalog_data - or "presets" not in catalog_data - ): - raise PresetError("Invalid preset catalog format") - + # Validate catalog structure. Reuses the same helper as + # ``_fetch_single_catalog`` so all three branches (root type, + # missing keys, nested-mapping type) stay consistent. + self._validate_catalog_payload(catalog_data, catalog_url) + + # Save to cache. Explicit UTF-8 on both writes mirrors the + # ``read_text(encoding="utf-8")`` on the read side and the + # ``integrations/catalog.py:193-203`` precedent — otherwise + # platforms whose default encoding isn't UTF-8 would write + # locale-encoded bytes the read path can't decode, forcing an + # unnecessary refetch on every invocation. self.cache_dir.mkdir(parents=True, exist_ok=True) - self.cache_file.write_text(json.dumps(catalog_data, indent=2)) + self.cache_file.write_text( + json.dumps(catalog_data, indent=2), encoding="utf-8" + ) metadata = { "cached_at": datetime.now(timezone.utc).isoformat(), "catalog_url": catalog_url, } self.cache_metadata_file.write_text( - json.dumps(metadata, indent=2) + json.dumps(metadata, indent=2), encoding="utf-8" ) return catalog_data diff --git a/tests/test_extensions.py b/tests/test_extensions.py index 153388a541..357c278b21 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -2577,6 +2577,342 @@ def fake_open(req, timeout=None): assert captured["req"].get_header("Authorization") == "Bearer ghp_testtoken" + @pytest.mark.parametrize( + "payload", + [ + # Root is not a JSON object. + [], + "oops", + 42, + None, + # Root is fine but ``extensions`` is the wrong type. + {"schema_version": "1.0", "extensions": []}, + {"schema_version": "1.0", "extensions": "oops"}, + {"schema_version": "1.0", "extensions": None}, + {"schema_version": "1.0", "extensions": 42}, + ], + ) + def test_fetch_single_catalog_rejects_malformed_payload(self, temp_dir, payload): + """Malformed catalog payloads raise ExtensionError, not AttributeError. + + Without this guard, a payload like ``{"extensions": []}`` would pass the + key-presence check and then crash with ``AttributeError: 'list' object + has no attribute 'items'`` deep inside ``_get_merged_extensions``. The + sibling integration catalog reader already validates both the root + object and the nested mapping (see ``integrations/catalog.py``); the + extension catalog must stay consistent. + """ + from unittest.mock import patch, MagicMock + + catalog = self._make_catalog(temp_dir) + + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(payload).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + entry = CatalogEntry( + url="https://example.com/catalog.json", + name="default", + priority=1, + install_allowed=True, + ) + + with patch.object(catalog, "_open_url", return_value=mock_response): + with pytest.raises(ExtensionError, match="Invalid catalog format"): + catalog._fetch_single_catalog(entry, force_refresh=True) + + @pytest.mark.parametrize( + "cached_payload", + [ + [], + "oops", + 42, + None, + {"schema_version": "1.0", "extensions": []}, + {"schema_version": "1.0", "extensions": "oops"}, + {"schema_version": "1.0", "extensions": None}, + ], + ) + def test_fetch_single_catalog_rejects_malformed_cached_payload( + self, temp_dir, cached_payload + ): + """A poisoned cache silently falls back to the network instead of + crashing — cached payloads pass through the same shape validation + as freshly-fetched ones. + + Without this, a cache poisoned by an older spec-kit version (or a + manual edit, or an upstream that briefly served a bad payload + before the network guards landed) would re-crash every invocation + of ``_get_merged_extensions`` despite the cache being "valid" by + age. The recovery contract is: if the cached payload fails + validation, drop it and refetch — never propagate + ``AttributeError`` to the caller. + """ + from unittest.mock import patch, MagicMock + + catalog = self._make_catalog(temp_dir) + + # Poison the default-URL cache. ``DEFAULT_CATALOG_URL`` is the + # branch that goes through ``is_cache_valid()`` (the non-default + # branch uses per-URL hashed cache files but the same code path + # below). + catalog.cache_dir.mkdir(parents=True, exist_ok=True) + catalog.cache_file.write_text(json.dumps(cached_payload)) + catalog.cache_metadata_file.write_text( + json.dumps( + { + "cached_at": datetime.now(timezone.utc).isoformat(), + "catalog_url": ExtensionCatalog.DEFAULT_CATALOG_URL, + } + ) + ) + + # Network refetch returns a valid payload so the recovery path + # can complete. + valid = { + "schema_version": "1.0", + "extensions": {"foo": {"name": "Foo", "version": "1.0.0"}}, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(valid).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + entry = CatalogEntry( + url=ExtensionCatalog.DEFAULT_CATALOG_URL, + name="default", + priority=1, + install_allowed=True, + ) + + with patch.object(catalog, "_open_url", return_value=mock_response): + result = catalog._fetch_single_catalog(entry, force_refresh=False) + + # The poisoned cache was discarded and the network payload returned. + assert result == valid + + @pytest.mark.parametrize( + "payload", + [ + # Root is not a JSON object. + [], + "oops", + 42, + None, + # Root is fine but ``extensions`` is the wrong type. + {"schema_version": "1.0", "extensions": []}, + {"schema_version": "1.0", "extensions": "oops"}, + {"schema_version": "1.0", "extensions": None}, + ], + ) + def test_fetch_catalog_rejects_malformed_payload(self, temp_dir, payload): + """Legacy ``fetch_catalog`` reuses the same shape-validation helper. + + Before this change ``fetch_catalog`` only checked key presence — so + a payload like ``42`` would crash with + ``TypeError: argument of type 'int' is not iterable`` during the + ``"schema_version" in catalog_data`` check, and an entry mapping + of the wrong type would crash downstream. Reusing + ``_validate_catalog_payload`` keeps the network-side behaviour of + the legacy single-catalog method consistent with the multi-catalog + ``_fetch_single_catalog`` path. + """ + from unittest.mock import patch, MagicMock + + catalog = self._make_catalog(temp_dir) + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(payload).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + with patch.object(catalog, "_open_url", return_value=mock_response): + with pytest.raises(ExtensionError, match="Invalid catalog format"): + catalog.fetch_catalog(force_refresh=True) + + def test_fetch_catalog_recovers_from_unreadable_cache(self, temp_dir): + """An unreadable / wrong-encoded cache file silently refetches. + + The cache contract is best-effort: a JSON-decode failure, an OS + read failure (permissions / disk / handle limit), or an invalid + text encoding on a cache file written by an older client must + all fall through to the network fetch rather than crash the + caller. Covers Copilot's review point that the previous + ``except (json.JSONDecodeError,)`` was too narrow. + """ + from unittest.mock import patch, MagicMock + + catalog = self._make_catalog(temp_dir) + # Write invalid UTF-8 bytes to the cache file so ``read_text`` + # raises ``UnicodeDecodeError`` (a subclass of ``UnicodeError``). + catalog.cache_dir.mkdir(parents=True, exist_ok=True) + catalog.cache_file.write_bytes(b"\xff\xfe\x00not-utf-8") + catalog.cache_metadata_file.write_text( + json.dumps( + { + "cached_at": datetime.now(timezone.utc).isoformat(), + "catalog_url": ExtensionCatalog.DEFAULT_CATALOG_URL, + } + ), + encoding="utf-8", + ) + + valid = { + "schema_version": "1.0", + "extensions": {"foo": {"name": "Foo", "version": "1.0.0"}}, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(valid).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + with patch.object(catalog, "_open_url", return_value=mock_response): + result = catalog.fetch_catalog(force_refresh=False) + + # Recovered via network rather than crashing on the unreadable cache. + assert result == valid + + def test_fetch_catalog_recovers_from_unreadable_metadata(self, temp_dir): + """A wrongly-encoded metadata file degrades to a cache miss. + + ``is_cache_valid`` is consulted *before* the cache payload is + read; if the metadata file itself can't be decoded (e.g. it was + written on a Windows host whose default codec isn't UTF-8) the + validity check must return ``False`` rather than propagate + ``UnicodeDecodeError``. Without that guard, a corrupted metadata + file would crash every invocation instead of falling through to + a network refetch. + """ + from unittest.mock import patch, MagicMock + + catalog = self._make_catalog(temp_dir) + catalog.cache_dir.mkdir(parents=True, exist_ok=True) + catalog.cache_file.write_text("{}", encoding="utf-8") + # Bytes that are not valid UTF-8 — ``read_text(encoding="utf-8")`` + # will raise ``UnicodeDecodeError`` (subclass of ``UnicodeError``). + catalog.cache_metadata_file.write_bytes(b"\xff\xfe\x00bad") + + # is_cache_valid must absorb the decode failure, not crash. + assert catalog.is_cache_valid() is False + + valid = { + "schema_version": "1.0", + "extensions": {"foo": {"name": "Foo", "version": "1.0.0"}}, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(valid).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + with patch.object(catalog, "_open_url", return_value=mock_response): + result = catalog.fetch_catalog(force_refresh=False) + + assert result == valid + + def test_fetch_catalog_writes_cache_as_utf8(self, temp_dir, monkeypatch): + """Cache + metadata writes pass ``encoding="utf-8"``, observably. + + The earlier version of this test claimed to assert UTF-8 at the + byte level but actually only round-tripped a non-ASCII string + through ``json.dumps`` and ``read_text(encoding="utf-8")``. + Because ``json.dumps`` defaults to ``ensure_ascii=True``, "café" + was serialized as the all-ASCII escape ``caf\\u00e9`` before it + ever reached ``write_text`` — the bytes on disk were identical + regardless of the encoding kwarg, so a locale-encoded write + would have round-tripped just fine. The drift Copilot's review + flagged wasn't actually being caught. + + Fix: directly observe the ``encoding`` argument passed to every + ``write_text`` call made against the cache directory. This is + the production code's encoding choice, which is exactly what + the regression guard cares about; non-ASCII payload tricks are + unnecessary because the assertion is about the kwarg, not the + bytes. + """ + from unittest.mock import patch, MagicMock + from pathlib import Path as _PathCls + + catalog = self._make_catalog(temp_dir) + payload = { + "schema_version": "1.0", + "extensions": {"foo": {"name": "Foo", "version": "1.0.0"}}, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(payload).encode("utf-8") + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + # Record every ``write_text`` call's encoding kwarg so the + # assertion observes the production writer's argument directly. + recorded: list[dict] = [] + real_write_text = _PathCls.write_text + + def recording_write_text(self, data, *args, **kwargs): + recorded.append( + {"path": str(self), "encoding": kwargs.get("encoding")} + ) + return real_write_text(self, data, *args, **kwargs) + + monkeypatch.setattr(_PathCls, "write_text", recording_write_text) + + with patch.object(catalog, "_open_url", return_value=mock_response): + catalog.fetch_catalog(force_refresh=True) + + # Filter to writes inside the catalog's cache directory so + # unrelated writes from other machinery don't pollute the + # assertion. + cache_writes = [ + r for r in recorded if str(catalog.cache_dir) in r["path"] + ] + assert cache_writes, "fetch_catalog made no writes to the cache dir" + for record in cache_writes: + assert record["encoding"] == "utf-8", ( + f"write_text on {record['path']} used encoding " + f"{record['encoding']!r}; expected 'utf-8'" + ) + + def test_get_merged_extensions_skips_non_mapping_entries(self, temp_dir): + """Per-entry guard: one malformed entry shouldn't poison the merge. + + ``_fetch_single_catalog`` validates that ``extensions`` is a mapping, + but it doesn't (and shouldn't) validate every entry inside it — a + single bad entry in an otherwise-valid catalog should be skipped, not + crash the whole resolve path. Mirrors the per-entry skip in + ``integrations/catalog.py``: a malformed entry returns no error, + valid entries continue to merge normally. + """ + from unittest.mock import patch, MagicMock + + catalog = self._make_catalog(temp_dir) + # Mix of valid entry, list-shaped entry, and string-shaped entry. + payload = { + "schema_version": "1.0", + "extensions": { + "good": {"name": "Good", "version": "1.0.0"}, + "bad-list": [], + "bad-str": "oops", + }, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(payload).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + entry = CatalogEntry( + url="https://example.com/catalog.json", + name="default", + priority=1, + install_allowed=True, + ) + + with patch.object(catalog, "_open_url", return_value=mock_response), \ + patch.object(catalog, "get_active_catalogs", return_value=[entry]): + merged = catalog._get_merged_extensions(force_refresh=True) + + # Only the well-formed entry survives; the two malformed entries are + # silently dropped rather than raising or crashing. + assert [ext["id"] for ext in merged] == ["good"] + def test_download_extension_sends_auth_header(self, temp_dir, monkeypatch): """download_extension passes Authorization header when a provider is configured.""" from unittest.mock import patch, MagicMock diff --git a/tests/test_presets.py b/tests/test_presets.py index 8fa700fa77..9075406ca1 100644 --- a/tests/test_presets.py +++ b/tests/test_presets.py @@ -1514,6 +1514,337 @@ def fake_open(req, timeout=None): assert captured["req"].get_header("Authorization") == "Bearer ghp_testtoken" + @pytest.mark.parametrize( + "payload", + [ + # Root is not a JSON object. + [], + "oops", + 42, + None, + # Root is fine but ``presets`` is the wrong type. + {"schema_version": "1.0", "presets": []}, + {"schema_version": "1.0", "presets": "oops"}, + {"schema_version": "1.0", "presets": None}, + {"schema_version": "1.0", "presets": 42}, + ], + ) + def test_fetch_single_catalog_rejects_malformed_payload(self, project_dir, payload): + """Malformed catalog payloads raise PresetError, not AttributeError. + + Without this guard, a payload like ``{"presets": []}`` would pass the + key-presence check and then crash with ``AttributeError: 'list' object + has no attribute 'items'`` deep inside ``_get_merged_packs``. The + sibling integration catalog reader already validates both the root + object and the nested mapping (see ``integrations/catalog.py``); the + preset catalog must stay consistent. + """ + from unittest.mock import patch, MagicMock + + catalog = PresetCatalog(project_dir) + + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(payload).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + entry = PresetCatalogEntry( + url="https://example.com/catalog.json", + name="default", + priority=1, + install_allowed=True, + ) + + with patch.object(catalog, "_open_url", return_value=mock_response): + with pytest.raises(PresetError, match="Invalid preset catalog format"): + catalog._fetch_single_catalog(entry, force_refresh=True) + + @pytest.mark.parametrize( + "cached_payload", + [ + [], + "oops", + 42, + None, + {"schema_version": "1.0", "presets": []}, + {"schema_version": "1.0", "presets": "oops"}, + {"schema_version": "1.0", "presets": None}, + ], + ) + def test_fetch_single_catalog_rejects_malformed_cached_payload( + self, project_dir, cached_payload + ): + """A poisoned cache silently falls back to the network instead of + crashing — cached payloads pass through the same shape validation + as freshly-fetched ones. + + Without this, a cache poisoned by an older spec-kit version (or a + manual edit, or an upstream that briefly served a bad payload + before the network guards landed) would re-crash every invocation + of ``_get_merged_packs`` despite the cache being "valid" by age. + The recovery contract is: if the cached payload fails validation, + drop it and refetch — never propagate ``AttributeError`` to the + caller. + """ + from unittest.mock import patch, MagicMock + + catalog = PresetCatalog(project_dir) + + # Poison the default-URL cache. ``DEFAULT_CATALOG_URL`` and + # non-default URLs both flow through the same cache-load branch. + cache_file, metadata_file = catalog._get_cache_paths( + catalog.DEFAULT_CATALOG_URL + ) + cache_file.parent.mkdir(parents=True, exist_ok=True) + cache_file.write_text(json.dumps(cached_payload)) + metadata_file.write_text( + json.dumps( + { + "cached_at": datetime.now(timezone.utc).isoformat(), + "catalog_url": catalog.DEFAULT_CATALOG_URL, + } + ) + ) + + # Network refetch returns a valid payload so the recovery path + # can complete. + valid = { + "schema_version": "1.0", + "presets": {"foo": {"name": "Foo", "version": "1.0.0"}}, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(valid).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + entry = PresetCatalogEntry( + url=catalog.DEFAULT_CATALOG_URL, + name="default", + priority=1, + install_allowed=True, + ) + + with patch.object(catalog, "_open_url", return_value=mock_response): + result = catalog._fetch_single_catalog(entry, force_refresh=False) + + # The poisoned cache was discarded and the network payload returned. + assert result == valid + + @pytest.mark.parametrize( + "payload", + [ + # Root is not a JSON object. + [], + "oops", + 42, + None, + # Root is fine but ``presets`` is the wrong type. + {"schema_version": "1.0", "presets": []}, + {"schema_version": "1.0", "presets": "oops"}, + {"schema_version": "1.0", "presets": None}, + ], + ) + def test_fetch_catalog_rejects_malformed_payload(self, project_dir, payload): + """Legacy ``fetch_catalog`` reuses the same shape-validation helper. + + Before this change ``fetch_catalog`` only checked key presence — + so a payload like ``42`` would crash with + ``TypeError: argument of type 'int' is not iterable`` during the + ``"schema_version" in catalog_data`` check, and an entry mapping + of the wrong type would crash downstream. Reusing + ``_validate_catalog_payload`` keeps the network-side behaviour of + the legacy single-catalog method consistent with the multi-catalog + ``_fetch_single_catalog`` path. + """ + from unittest.mock import patch, MagicMock + + catalog = PresetCatalog(project_dir) + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(payload).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + with patch.object(catalog, "_open_url", return_value=mock_response): + with pytest.raises(PresetError, match="Invalid preset catalog format"): + catalog.fetch_catalog(force_refresh=True) + + def test_fetch_catalog_recovers_from_unreadable_cache(self, project_dir): + """An unreadable / wrong-encoded cache file silently refetches. + + The cache contract is best-effort: a JSON-decode failure, an OS + read failure (permissions / disk / handle limit), or an invalid + text encoding on a cache file written by an older client must + all fall through to the network fetch rather than crash the + caller. Covers Copilot's review point that the previous + ``except (json.JSONDecodeError, OSError)`` was missing + ``UnicodeError``. + """ + from unittest.mock import patch, MagicMock + + catalog = PresetCatalog(project_dir) + catalog.cache_dir.mkdir(parents=True, exist_ok=True) + # Invalid UTF-8 bytes so ``read_text`` raises ``UnicodeDecodeError`` + # (a subclass of ``UnicodeError``). + catalog.cache_file.write_bytes(b"\xff\xfe\x00not-utf-8") + catalog.cache_metadata_file.write_text( + json.dumps( + { + "cached_at": datetime.now(timezone.utc).isoformat(), + "catalog_url": catalog.get_catalog_url(), + } + ), + encoding="utf-8", + ) + + valid = { + "schema_version": "1.0", + "presets": {"foo": {"name": "Foo", "version": "1.0.0"}}, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(valid).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + with patch.object(catalog, "_open_url", return_value=mock_response): + result = catalog.fetch_catalog(force_refresh=False) + + # Recovered via network rather than crashing on the unreadable cache. + assert result == valid + + def test_fetch_catalog_recovers_from_unreadable_metadata(self, project_dir): + """A wrongly-encoded metadata file degrades to a cache miss. + + ``is_cache_valid`` is consulted *before* the cache payload is + read; if the metadata file itself can't be decoded (e.g. it was + written on a host whose default codec isn't UTF-8) the validity + check must return ``False`` rather than propagate + ``UnicodeDecodeError``. Without that guard, a corrupted metadata + file would crash every invocation instead of falling through to + a network refetch. + """ + from unittest.mock import patch, MagicMock + + catalog = PresetCatalog(project_dir) + catalog.cache_dir.mkdir(parents=True, exist_ok=True) + catalog.cache_file.write_text("{}", encoding="utf-8") + # Bytes that are not valid UTF-8 — ``read_text(encoding="utf-8")`` + # will raise ``UnicodeDecodeError`` (subclass of ``UnicodeError``). + catalog.cache_metadata_file.write_bytes(b"\xff\xfe\x00bad") + + # is_cache_valid must absorb the decode failure, not crash. + assert catalog.is_cache_valid() is False + + valid = { + "schema_version": "1.0", + "presets": {"foo": {"name": "Foo", "version": "1.0.0"}}, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(valid).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + with patch.object(catalog, "_open_url", return_value=mock_response): + result = catalog.fetch_catalog(force_refresh=False) + + assert result == valid + + def test_fetch_catalog_writes_cache_as_utf8(self, project_dir, monkeypatch): + """Cache + metadata writes pass ``encoding="utf-8"``, observably. + + The earlier version of this test claimed to assert UTF-8 at the + byte level but actually only round-tripped a non-ASCII string + through ``json.dumps`` and ``read_text(encoding="utf-8")``. + Because ``json.dumps`` defaults to ``ensure_ascii=True``, "café" + was serialized as the all-ASCII escape ``caf\\u00e9`` before it + ever reached ``write_text`` — the bytes on disk were identical + regardless of the encoding kwarg. The drift Copilot's review + flagged wasn't actually being caught. + + Fix: directly observe the ``encoding`` argument passed to every + ``write_text`` call made against the cache directory. This is + the production code's encoding choice, which is exactly what + the regression guard cares about. + """ + from unittest.mock import patch, MagicMock + from pathlib import Path as _PathCls + + catalog = PresetCatalog(project_dir) + payload = { + "schema_version": "1.0", + "presets": {"foo": {"name": "Foo", "version": "1.0.0"}}, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(payload).encode("utf-8") + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + # Record every ``write_text`` call's encoding kwarg so the + # assertion observes the production writer's argument directly. + recorded: list[dict] = [] + real_write_text = _PathCls.write_text + + def recording_write_text(self, data, *args, **kwargs): + recorded.append( + {"path": str(self), "encoding": kwargs.get("encoding")} + ) + return real_write_text(self, data, *args, **kwargs) + + monkeypatch.setattr(_PathCls, "write_text", recording_write_text) + + with patch.object(catalog, "_open_url", return_value=mock_response): + catalog.fetch_catalog(force_refresh=True) + + cache_writes = [ + r for r in recorded if str(catalog.cache_dir) in r["path"] + ] + assert cache_writes, "fetch_catalog made no writes to the cache dir" + for record in cache_writes: + assert record["encoding"] == "utf-8", ( + f"write_text on {record['path']} used encoding " + f"{record['encoding']!r}; expected 'utf-8'" + ) + + def test_get_merged_packs_skips_non_mapping_entries(self, project_dir): + """Per-entry guard: one malformed entry shouldn't poison the merge. + + ``_fetch_single_catalog`` validates that ``presets`` is a mapping, + but it doesn't (and shouldn't) validate every entry inside it — a + single bad entry in an otherwise-valid catalog should be skipped, + not crash the whole resolve path. Mirrors the per-entry skip in + ``integrations/catalog.py``: a malformed entry returns no error, + valid entries continue to merge normally. + """ + from unittest.mock import patch, MagicMock + + catalog = PresetCatalog(project_dir) + payload = { + "schema_version": "1.0", + "presets": { + "good": {"name": "Good", "version": "1.0.0"}, + "bad-list": [], + "bad-str": "oops", + }, + } + mock_response = MagicMock() + mock_response.read.return_value = json.dumps(payload).encode() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + + entry = PresetCatalogEntry( + url="https://example.com/catalog.json", + name="default", + priority=1, + install_allowed=True, + ) + + with patch.object(catalog, "_open_url", return_value=mock_response), \ + patch.object(catalog, "get_active_catalogs", return_value=[entry]): + merged = catalog._get_merged_packs(force_refresh=True) + + # Only the well-formed entry survives; the two malformed entries are + # silently dropped rather than raising or crashing. + assert list(merged.keys()) == ["good"] + def test_download_pack_sends_auth_header(self, project_dir, monkeypatch): """download_pack passes Authorization header when configured.""" from unittest.mock import patch, MagicMock