From fbc25e0d0a2fa2c776177ffbb5367714104bb799 Mon Sep 17 00:00:00 2001 From: Nicolas Chiaruttini Date: Wed, 3 Jun 2026 23:36:00 +0200 Subject: [PATCH] Fix "thread death" TOCTOU race in python_worker In Worker._process_input(), non-main tasks assigned task._thread before calling start(). The janitor thread (_cleanup_threads) flags any task where task._thread is set and the thread is not alive. A just-constructed, not-yet-started thread is not alive, so if the janitor's 0.05s poll landed in the window between Thread() construction and start(), it would wrongly fail the task with "thread death" even though the thread was about to run fine. Assign task._thread only after start() returns, closing the window. Also add test_thread_death_stress, which floods the worker with many concurrent tiny tasks (none of which can legitimately die) to surface the race. It failed reliably before the fix and passes consistently after. Fixes apposed/appose#15. Co-Authored-By: Claude Opus 4.8 --- src/appose/python_worker.py | 11 +++++++++-- tests/test_service.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/appose/python_worker.py b/src/appose/python_worker.py index d4c2429..12c1371 100644 --- a/src/appose/python_worker.py +++ b/src/appose/python_worker.py @@ -222,8 +222,15 @@ def _process_input(self) -> None: else: # Create a thread and save a reference to it, in case its script # kills the thread. This happens e.g. if it calls sys.exit. - task._thread = Thread(target=task._run, name=f"Appose-{uuid}") - task._thread.start() + # + # Assign task._thread only AFTER start() returns. Otherwise the + # janitor (_cleanup_threads) can observe task._thread set while + # the thread is not yet alive (the window between Thread() + # construction and start()) and spuriously fail the task with + # "thread death". See apposed/appose#15. + t = Thread(target=task._run, name=f"Appose-{uuid}") + t.start() + task._thread = t elif request_type == RequestType.CANCEL: task = self.tasks.get(uuid) diff --git a/tests/test_service.py b/tests/test_service.py index 5a4afcc..501fbce 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -7,6 +7,7 @@ from appose.service import ResponseType, TaskException, TaskStatus from tests.test_base import execute_and_assert, maybe_debug from pathlib import Path +import threading import time import os import re @@ -354,3 +355,39 @@ def test_task_result_null(): # result() should return None. assert task.result() is None + + +def test_thread_death_stress(): + """Floods the worker with many concurrent tiny tasks to surface the + spurious 'thread death' race (apposed/appose#15). No task here can + legitimately die, so any 'thread death' is the bug.""" + env = appose.system() + n_threads = 16 + n_tasks = 200 # per thread + errors = [] + err_lock = threading.Lock() + submit_lock = threading.Lock() # serialize stdin writes only + + with env.python() as service: + maybe_debug(service) + + def worker(): + for _ in range(n_tasks): + with submit_lock: + task = service.task("task.outputs['result'] = 1") + task.start() + try: + task.wait_for() + except Exception as e: + with err_lock: + errors.append(str(e)) + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, ( + f"{len(errors)}/{n_threads * n_tasks} tasks failed; sample: {errors[:5]}" + )