diff --git a/src/specify_cli/workflows/engine.py b/src/specify_cli/workflows/engine.py index 2f68fab244..468f749693 100644 --- a/src/specify_cli/workflows/engine.py +++ b/src/specify_cli/workflows/engine.py @@ -265,16 +265,49 @@ def _validate_steps( class RunState: """Manages workflow run state for persistence and resume.""" + # ``run_id`` is interpolated into a filesystem path (``runs/``) + # by both ``save()`` and ``load()``. Constrain it to a charset that + # cannot contain path separators (``/`` ``\``), parent-directory + # segments (``..``), or NULs — anything that could escape the + # ``.specify/workflows/runs/`` directory or be mis-interpreted by the + # filesystem. The first-character anchor blocks IDs that start with + # ``-`` (which would be mistaken for a CLI flag in error messages + # and shell completions). + _RUN_ID_PATTERN = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$") + + @classmethod + def _validate_run_id(cls, run_id: str) -> None: + """Raise ``ValueError`` if ``run_id`` is not a safe path component. + + This is the single source of truth for what counts as a valid + ``run_id``. ``__init__`` calls it to reject malformed IDs at + construction time; ``load`` calls it *before* interpolating the + ID into a path so a malicious value cannot probe or read files + outside ``.specify/workflows/runs//``. + """ + if not isinstance(run_id, str) or not cls._RUN_ID_PATTERN.match(run_id): + raise ValueError( + f"Invalid run_id {run_id!r}: must be alphanumeric with " + "hyphens/underscores only (and must start with an " + "alphanumeric character)." + ) + def __init__( self, run_id: str | None = None, workflow_id: str = "", project_root: Path | None = None, ) -> None: - self.run_id = run_id or str(uuid.uuid4())[:8] - if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_-]*$', self.run_id): - msg = f"Invalid run_id {self.run_id!r}: must be alphanumeric with hyphens/underscores only." - raise ValueError(msg) + # ``run_id is None`` (omitted) → auto-generate. An explicit empty + # string is *not* the same as "omitted" and must be validated like + # any other caller-provided value — otherwise ``__init__("")`` + # would silently substitute a UUID while ``load("")`` rejects, and + # the two entry points would diverge on the empty-string vector. + if run_id is None: + self.run_id = str(uuid.uuid4())[:8] + else: + self.run_id = run_id + self._validate_run_id(self.run_id) self.workflow_id = workflow_id self.project_root = project_root or Path(".") self.status = RunStatus.CREATED @@ -315,7 +348,20 @@ def save(self) -> None: @classmethod def load(cls, run_id: str, project_root: Path) -> RunState: - """Load a run state from disk.""" + """Load a run state from disk. + + Validates ``run_id`` against ``_RUN_ID_PATTERN`` *before* building + the lookup path. Without this guard, a caller passing a value like + ``../escape`` (e.g. via ``specify workflow resume`` CLI argument) + would interpolate path-traversal segments into + ``runs_dir`` below, letting ``state_path.exists()`` probe arbitrary + paths and ``json.load`` read attacker-planted JSON from outside + the project's ``runs/`` directory. ``__init__`` already runs this + check on the stored ``state_data["run_id"]``, but that fires + *after* the file lookup — too late to prevent the disclosure. + Mirrors the precedent in ``agents._ensure_within_directory``. + """ + cls._validate_run_id(run_id) runs_dir = project_root / ".specify" / "workflows" / "runs" / run_id state_path = runs_dir / "state.json" if not state_path.exists(): diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 7995512d6f..19edaf1eb5 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -2412,6 +2412,112 @@ def test_load_not_found(self, project_dir): with pytest.raises(FileNotFoundError): RunState.load("nonexistent", project_dir) + @pytest.mark.parametrize( + "malicious_run_id", + [ + # Parent-directory traversal — the classic path-escape vector. + "../escape", + "..", + "../../etc/passwd", + # Embedded path separators — both POSIX and Windows. + "foo/bar", + "foo\\bar", + # Leading non-alphanumeric characters that the existing + # pattern's anchor blocks (would be mistaken for CLI flags + # or hidden files in shell completions / error messages). + ".hidden", + "-flag", + # NUL byte — some filesystems treat the prefix as a valid + # path and silently truncate at the NUL. + "foo\x00bar", + # Empty string — degenerate case, matches no file but the + # validator should reject it before any I/O. + "", + ], + ) + def test_load_rejects_path_traversal(self, project_dir, malicious_run_id): + """``RunState.load`` validates ``run_id`` before touching the + filesystem. + + Without this guard, a value like ``../escape`` passed via + ``specify workflow resume`` would interpolate path-traversal + segments into the lookup path. ``state_path.exists()`` would + probe arbitrary paths the process can read (a file-existence + oracle) and ``json.load`` would happily parse attacker-planted + JSON from outside ``.specify/workflows/runs/``. The check must + fire *before* the path is built — ``__init__``'s identical + regex on ``state_data["run_id"]`` fires too late. + """ + from specify_cli.workflows.engine import RunState + + # Plant a state.json *outside* the legitimate ``runs/`` directory + # at the location ``../escape`` would traverse to, so a missing + # guard would surface as a successful load rather than a + # ``FileNotFoundError`` (which would be ambiguous with the + # not-found case). + runs_dir = project_dir / ".specify" / "workflows" / "runs" + runs_dir.mkdir(parents=True, exist_ok=True) + attacker_dir = project_dir / ".specify" / "workflows" / "escape" + attacker_dir.mkdir(exist_ok=True) + (attacker_dir / "state.json").write_text( + json.dumps( + { + "run_id": "pwned", + "workflow_id": "attacker-owned", + "status": "created", + } + ), + encoding="utf-8", + ) + + with pytest.raises(ValueError, match="Invalid run_id"): + RunState.load(malicious_run_id, project_dir) + + @pytest.mark.parametrize( + "bad_run_id", + [ + # One vector per category from ``test_load_rejects_path_traversal`` + # — enough to prove both entry points agree without re-running + # the full attack matrix here. + "../escape", # parent-directory traversal + "foo/bar", # embedded path separator + ".hidden", # leading non-alphanumeric + "", # empty / degenerate + ], + ) + def test_init_and_load_share_validation(self, project_dir, bad_run_id): + """``__init__`` *and* ``load`` reject the same malformed IDs. + + The two entry points must stay in sync — drift would let an ID + slip in via one path that the other would reject, producing + confusing crashes mid-workflow. The previous version of this + test only exercised ``__init__`` and ``_validate_run_id`` (the + shared helper), so a regression in ``load`` — e.g. someone + deleting the ``cls._validate_run_id(run_id)`` call there — could + slip through despite ``__init__`` and the helper staying + aligned. We now hit ``load`` directly with the same vector so + any drift between the two call sites is caught by this test. + """ + from specify_cli.workflows.engine import RunState + + # ``__init__`` rejects up front. + with pytest.raises(ValueError, match="Invalid run_id"): + RunState(run_id=bad_run_id) + + # The shared helper rejects the value too (sanity check that the + # ``__init__`` rejection came from the validator, not some + # unrelated constructor failure). + with pytest.raises(ValueError, match="Invalid run_id"): + RunState._validate_run_id(bad_run_id) + + # And ``load`` rejects it *before* touching the filesystem. This + # is the assertion the previous version was missing: without it, + # a regression in ``load`` (e.g. forgetting to call the + # validator before building the path) would not be caught even + # though ``__init__`` and the helper still agreed. + with pytest.raises(ValueError, match="Invalid run_id"): + RunState.load(bad_run_id, project_dir) + def test_append_log(self, project_dir): from specify_cli.workflows.engine import RunState