Skip to content
178 changes: 149 additions & 29 deletions src/specify_cli/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,44 @@ def _open_url(self, url: str, timeout: int = 10):
from specify_cli.authentication.http import open_url
return open_url(url, timeout)

def _validate_catalog_payload(self, catalog_data: Any, url: str) -> None:
"""Validate a parsed catalog payload's shape.

Applied to both network-fetched and cache-loaded payloads so a
once-poisoned cache (older spec-kit version, manual edit, upstream
served a bad payload before the network-side guards were added)
cannot re-crash ``_get_merged_extensions`` on subsequent calls.

Checking only key presence would let a payload like
``{"extensions": []}`` or ``{"extensions": null}`` slip through
here and then crash with ``AttributeError: 'list' object has no
attribute 'items'`` deep inside ``_get_merged_extensions``. The
sibling integration catalog reader already guards both the root
object and the nested mapping (see ``integrations/catalog.py``);
the extension catalog must stay consistent so a malformed payload
surfaces as the user-facing ``Invalid catalog format`` error
instead of a raw Python traceback.

Args:
catalog_data: Parsed JSON payload from the catalog source.
url: Source URL — used in the error message so the user can
tell which catalog in a multi-catalog stack is malformed.

Raises:
ExtensionError: If the payload's shape is invalid.
"""
if not isinstance(catalog_data, dict):
raise ExtensionError(
f"Invalid catalog format from {url}: expected a JSON object"
)
if "schema_version" not in catalog_data or "extensions" not in catalog_data:
raise ExtensionError(f"Invalid catalog format from {url}")
if not isinstance(catalog_data.get("extensions"), dict):
raise ExtensionError(
f"Invalid catalog format from {url}: "
"'extensions' must be a JSON object"
)

def get_active_catalogs(self) -> List[CatalogEntry]:
"""Get the ordered list of active catalogs.

Expand Down Expand Up @@ -1817,38 +1855,74 @@ def _fetch_single_catalog(self, entry: CatalogEntry, force_refresh: bool = False
is_valid = False
if not force_refresh and cache_file.exists() and cache_meta_file.exists():
try:
metadata = json.loads(cache_meta_file.read_text())
metadata = json.loads(
cache_meta_file.read_text(encoding="utf-8")
)
cached_at = datetime.fromisoformat(metadata.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - cached_at).total_seconds()
is_valid = age < self.CACHE_DURATION
except (json.JSONDecodeError, ValueError, KeyError, TypeError):
# If metadata is invalid or missing expected fields, treat cache as invalid
except (
json.JSONDecodeError,
OSError,
UnicodeError,
ValueError,
KeyError,
TypeError,
):
# Cache validity is best-effort: invalid/missing metadata
# fields, an unreadable metadata file (permissions / disk),
# or a wrongly-encoded metadata file (written by a tool
# using the system locale codec) all degrade to "cache
# invalid" so the caller falls through to a network
# refetch instead of crashing.
pass

# Use cache if valid
# Use cache if valid. A previously-cached payload must clear the
# same shape checks as a freshly-fetched one — otherwise a once-
# poisoned cache (older spec-kit version, manual edit, upstream
# served a bad payload before the network-side guards were added)
# would re-crash on every invocation despite the cache being
# "valid" by age. If validation fails on the cached read, fall
# through to the network fetch path so the cache gets refreshed.
if is_valid:
try:
return json.loads(cache_file.read_text())
except json.JSONDecodeError:
cached_data = json.loads(cache_file.read_text(encoding="utf-8"))
self._validate_catalog_payload(cached_data, entry.url)
return cached_data
except (json.JSONDecodeError, OSError, UnicodeError, ExtensionError):
# Cache is best-effort: a JSON-decode failure, an OS-level
# read failure (permissions / disk / handle limit), or a
# text-encoding failure on a cache file written by an older
# client all fall through to the network fetch path. Only
# the network failure is surfaced to the caller.
pass

# Fetch from network
try:
with self._open_url(entry.url, timeout=10) as response:
catalog_data = json.loads(response.read())

if "schema_version" not in catalog_data or "extensions" not in catalog_data:
raise ExtensionError(f"Invalid catalog format from {entry.url}")
self._validate_catalog_payload(catalog_data, entry.url)

# Save to cache
# Save to cache. Both files are explicitly UTF-8 to match the
# ``read_text(encoding="utf-8")`` on the read side and the
# ``integrations/catalog.py:193-203`` precedent. Without this,
# platforms whose default encoding isn't UTF-8 would write
# locale-encoded bytes that the read path can't decode, forcing
# an unnecessary network refetch on every invocation.
self.cache_dir.mkdir(parents=True, exist_ok=True)
cache_file.write_text(json.dumps(catalog_data, indent=2))
cache_meta_file.write_text(json.dumps({
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": entry.url,
}, indent=2))
cache_file.write_text(
json.dumps(catalog_data, indent=2), encoding="utf-8"
)
cache_meta_file.write_text(
json.dumps({
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": entry.url,
}, indent=2),
encoding="utf-8",
)

return catalog_data

Expand Down Expand Up @@ -1895,6 +1969,16 @@ def _get_merged_extensions(self, force_refresh: bool = False) -> List[Dict[str,
continue

for ext_id, ext_data in catalog_data.get("extensions", {}).items():
# Per-entry guard: ``_fetch_single_catalog`` already validates
# that ``catalog_data["extensions"]`` is a mapping, but it
# does not (and should not) validate every entry shape there
# — one malformed entry shouldn't poison an otherwise valid
# catalog. Skip non-mapping entries here so a payload like
# ``{"extensions": {"foo": [], "bar": {...}}}`` still merges
# the valid entries without crashing on ``**ext_data``.
# Mirrors ``integrations/catalog.py:245``.
if not isinstance(ext_data, dict):
continue
if ext_id not in merged: # Higher-priority catalog wins
merged[ext_id] = {
**ext_data,
Expand All @@ -1911,20 +1995,35 @@ def _get_merged_extensions(self, force_refresh: bool = False) -> List[Dict[str,
def is_cache_valid(self) -> bool:
"""Check if cached catalog is still valid.

Returns ``False`` for any read/decoding failure on the metadata
file (missing fields, malformed JSON, permissions / disk errors,
wrong text encoding) so callers fall through to a network refetch
instead of crashing. Treating cache validity as best-effort
matches the contract used by the per-URL cache check below.

Returns:
True if cache exists and is within cache duration
"""
if not self.cache_file.exists() or not self.cache_metadata_file.exists():
return False

try:
metadata = json.loads(self.cache_metadata_file.read_text())
metadata = json.loads(
self.cache_metadata_file.read_text(encoding="utf-8")
)
cached_at = datetime.fromisoformat(metadata.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds()
return age_seconds < self.CACHE_DURATION
except (json.JSONDecodeError, ValueError, KeyError, TypeError):
except (
json.JSONDecodeError,
OSError,
UnicodeError,
ValueError,
KeyError,
TypeError,
):
return False

def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]:
Expand All @@ -1939,36 +2038,57 @@ def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]:
Raises:
ExtensionError: If catalog cannot be fetched
"""
# Check cache first unless force refresh
catalog_url = self.get_catalog_url()

# Check the cache first unless ``force_refresh`` was requested,
# then fall through to a network fetch. Match the
# ``_fetch_single_catalog`` cache contract: a poisoned or
# unreadable cache silently falls through to a network refetch
# rather than crashing the caller. ``_validate_catalog_payload``
# is reused here so a cache written by an older client
# (pre-validation) is rejected and refreshed instead of returning
# the stale malformed payload. ``is_cache_valid`` itself swallows
# OSError/UnicodeError on the metadata read, so a cache-validity
# check can't crash this method before the read-side fallback
# runs.
if not force_refresh and self.is_cache_valid():
try:
return json.loads(self.cache_file.read_text())
except json.JSONDecodeError:
cached_data = json.loads(self.cache_file.read_text(encoding="utf-8"))
self._validate_catalog_payload(cached_data, catalog_url)
return cached_data
Comment thread
Quratulain-bilal marked this conversation as resolved.
except (json.JSONDecodeError, OSError, UnicodeError, ExtensionError):
pass # Fall through to network fetch

# Fetch from network
catalog_url = self.get_catalog_url()

try:
import urllib.error

with self._open_url(catalog_url, timeout=10) as response:
catalog_data = json.loads(response.read())

# Validate catalog structure
if "schema_version" not in catalog_data or "extensions" not in catalog_data:
raise ExtensionError("Invalid catalog format")

# Save to cache
# Validate catalog structure. Reuses the same helper as
# ``_fetch_single_catalog`` so all three branches (root type,
# missing keys, nested-mapping type) stay consistent.
self._validate_catalog_payload(catalog_data, catalog_url)

# Save to cache. Explicit UTF-8 on both writes mirrors the
# ``read_text(encoding="utf-8")`` on the read side and the
# ``integrations/catalog.py:193-203`` precedent — otherwise
# platforms whose default encoding isn't UTF-8 would write
# locale-encoded bytes the read path can't decode, forcing an
# unnecessary refetch on every invocation.
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file.write_text(json.dumps(catalog_data, indent=2))
self.cache_file.write_text(
json.dumps(catalog_data, indent=2), encoding="utf-8"
)

# Save cache metadata
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": catalog_url,
}
self.cache_metadata_file.write_text(json.dumps(metadata, indent=2))
self.cache_metadata_file.write_text(
json.dumps(metadata, indent=2), encoding="utf-8"
)

return catalog_data

Expand Down
Loading