diff --git a/Lib/profiling/sampling/gecko_collector.py b/Lib/profiling/sampling/gecko_collector.py index 54392af95000082..dd899c0d745a0f3 100644 --- a/Lib/profiling/sampling/gecko_collector.py +++ b/Lib/profiling/sampling/gecko_collector.py @@ -1,8 +1,11 @@ +import array import itertools +import io import json import os import platform import sys +import tempfile import threading import time @@ -61,6 +64,102 @@ PROCESS_TYPE_MAIN = 0 STACKWALK_DISABLED = 0 +# In-memory buffer before spilling to disk +DEFAULT_SPILL_BUFFER_BYTES = 128 * 1024 + + +class TypedSpillColumn: + def __init__(self, directory, basename, typecode, *, + buffer_bytes=DEFAULT_SPILL_BUFFER_BYTES): + self.path = os.path.join(directory, basename) + self.buffer = array.array(typecode) + self.max_items = max(1, buffer_bytes // self.buffer.itemsize) + + def append(self, value): + self.buffer.append(value) + if len(self.buffer) >= self.max_items: + self.flush() + + def flush(self): + with open(self.path, "ab") as file: + self.buffer.tofile(file) + self.buffer.clear() + + def iter_chunks(self): + typecode = self.buffer.typecode + block_bytes = self.max_items * self.buffer.itemsize + with open(self.path, "rb") as file: + for block in iter(lambda: file.read(block_bytes), b""): + chunk = array.array(typecode) + chunk.frombytes(block) + yield chunk + + +class NDJSONSpillColumn: + _encoder = json.JSONEncoder(separators=(",", ":")) + + def __init__(self, directory, basename, *, + buffer_bytes=DEFAULT_SPILL_BUFFER_BYTES): + self.path = os.path.join(directory, basename) + self.buffer = bytearray() + self._buffer_bytes = buffer_bytes + + def append_object(self, data): + self.buffer += (self._encoder.encode(data) + "\n").encode() + if len(self.buffer) >= self._buffer_bytes: + self.flush() + + def flush(self): + with open(self.path, "ab") as file: + file.write(self.buffer) + self.buffer.clear() + + def iter_lines(self): + with open(self.path) as file: + for line in file: + yield line.rstrip("\n") + + +class GeckoThreadSpill: + _TYPED_COLUMNS = ( + ("samples_stack", "samples-stack.bin", "q"), + ("samples_time", "samples-time.bin", "d"), + ("markers_name", "markers-name.bin", "q"), + ("markers_start_time", "markers-start-time.bin", "d"), + ("markers_end_time", "markers-end-time.bin", "d"), + ("markers_phase", "markers-phase.bin", "B"), + ("markers_category", "markers-category.bin", "I"), + ) + + def __init__(self, directory, tid): + prefix = f"thread-{tid}-" + for attr, basename, typecode in self._TYPED_COLUMNS: + setattr(self, attr, TypedSpillColumn( + directory, prefix + basename, typecode)) + self.markers_data = NDJSONSpillColumn( + directory, prefix + "markers-data.ndjson") + self.sample_count = 0 + self.marker_count = 0 + + def append_sample(self, stack_index, time_ms): + self.samples_stack.append(stack_index) + self.samples_time.append(time_ms) + self.sample_count += 1 + + def append_marker(self, name_idx, start_time, end_time, phase, category, data): + self.markers_name.append(name_idx) + self.markers_start_time.append(start_time) + self.markers_end_time.append(end_time) + self.markers_phase.append(phase) + self.markers_category.append(category) + self.markers_data.append_object(data) + self.marker_count += 1 + + def prepare_read(self): + for attr, _basename, _typecode in self._TYPED_COLUMNS: + getattr(self, attr).flush() + self.markers_data.flush() + class GeckoCollector(Collector): aggregating = True @@ -77,6 +176,9 @@ def __init__(self, sample_interval_usec, *, skip_idle=False, opcodes=False): # Per-thread data structures self.threads = {} # tid -> thread data + self.spill_dir = None + self.thread_spills = {} + self.exported = False # Global tables self.libs = [] @@ -151,6 +253,9 @@ def collect(self, stack_frames, timestamps_us=None): stack_frames: List of interpreter/thread frame info timestamps_us: List of timestamps in microseconds (None for live sampling) """ + if self.exported: + raise RuntimeError("cannot append to GeckoCollector after export") + # Handle live sampling (no timestamps provided) if timestamps_us is None: current_time = (time.monotonic() * 1000) - self.start_time @@ -259,15 +364,9 @@ def collect(self, stack_frames, timestamps_us=None): stack_index = self._process_stack(thread_data, frames) # Add samples with timestamps - samples = thread_data["samples"] - samples_stack = samples["stack"] - samples_time = samples["time"] - samples_delay = samples["eventDelay"] - + thread_spill = self.thread_spills[tid] for t in times: - samples_stack.append(stack_index) - samples_time.append(t) - samples_delay.append(None) + thread_spill.append_sample(stack_index, t) # Handle opcodes if self.opcodes_enabled and frames: @@ -294,6 +393,10 @@ def collect(self, stack_frames, timestamps_us=None): def _create_thread(self, tid, is_main_thread): """Create a new thread structure with processed profile format.""" + if self.spill_dir is None: + self.spill_dir = tempfile.TemporaryDirectory() + + self.thread_spills[tid] = GeckoThreadSpill(self.spill_dir.name, tid) thread = { "name": f"Thread-{tid}", @@ -307,15 +410,6 @@ def _create_thread(self, tid, is_main_thread): "tid": tid, "processType": "default", "processName": "Python Process", - # Sample data - processed format with direct arrays - "samples": { - "stack": [], - "time": [], - "eventDelay": [], - "weight": None, - "weightType": "samples", - "length": 0, # Will be updated on export - }, # Stack table - processed format "stackTable": { "frame": [], @@ -366,16 +460,6 @@ def _create_thread(self, tid, is_main_thread): "functionSize": [], "length": 0, }, - # Markers - processed format (arrays) - "markers": { - "data": [], - "name": [], - "startTime": [], - "endTime": [], - "phase": [], - "category": [], - "length": 0, - }, # Caches for deduplication "_stackCache": {}, "_frameCache": {}, @@ -405,17 +489,10 @@ def _add_marker(self, tid, name, start_time, end_time, category): if tid not in self.threads: return - thread_data = self.threads[tid] duration = end_time - start_time name_idx = self._intern_string(name) - markers = thread_data["markers"] - markers["name"].append(name_idx) - markers["startTime"].append(start_time) - markers["endTime"].append(end_time) - markers["phase"].append(1) # 1 = interval marker - markers["category"].append(category) - markers["data"].append({ + self.thread_spills[tid].append_marker(name_idx, start_time, end_time, 1, category, { "type": name.replace(" ", ""), "duration": duration, "tid": tid @@ -426,20 +503,13 @@ def _add_opcode_interval_marker(self, tid, opcode, lineno, col_offset, funcname, if tid not in self.threads or opcode is None: return - thread_data = self.threads[tid] opcode_info = get_opcode_info(opcode) # Use formatted opcode name (with base opcode for specialized ones) formatted_opname = format_opcode(opcode) name_idx = self._intern_string(formatted_opname) - markers = thread_data["markers"] - markers["name"].append(name_idx) - markers["startTime"].append(start_time) - markers["endTime"].append(end_time) - markers["phase"].append(1) # 1 = interval marker - markers["category"].append(CATEGORY_OPCODES) - markers["data"].append({ + self.thread_spills[tid].append_marker(name_idx, start_time, end_time, 1, CATEGORY_OPCODES, { "type": "Opcode", "opcode": opcode, "opname": formatted_opname, @@ -660,7 +730,6 @@ def _finalize_markers(self): def export(self, filename): """Export the profile to a Gecko JSON file.""" - if self.sample_count > 0 and self.last_sample_time > 0: self.interval = self.last_sample_time / self.sample_count @@ -681,19 +750,31 @@ def spin(): spinner_thread = threading.Thread(target=spin, daemon=True) spinner_thread.start() + temp_path = None + replaced = False try: - # Finalize any open markers before building profile - self._finalize_markers() - - profile = self._build_profile() - - with open(filename, "w") as f: - json.dump(profile, f, separators=(",", ":")) + self._prepare_for_serialization() + output_dir = os.path.dirname(os.path.abspath(filename)) or "." + with tempfile.NamedTemporaryFile( + "w", dir=output_dir, delete=False + ) as file: + temp_path = file.name + self._stream_profile(file) + os.replace(temp_path, filename) + replaced = True finally: + self.exported = True stop_spinner.set() spinner_thread.join(timeout=1.0) # Small delay to ensure the clear happens time.sleep(0.01) + if temp_path is not None and not replaced: + try: + os.unlink(temp_path) + except FileNotFoundError: + pass + if self.spill_dir is not None: + self.spill_dir.cleanup() print(f"Gecko profile written to {filename}") print( @@ -727,34 +808,18 @@ def _build_marker_schema(self): def _build_profile(self): """Build the complete profile structure in processed format.""" - # Convert thread data to final format - threads = [] - - for tid, thread_data in self.threads.items(): - # Update lengths - samples = thread_data["samples"] - stack_table = thread_data["stackTable"] - frame_table = thread_data["frameTable"] - func_table = thread_data["funcTable"] - resource_table = thread_data["resourceTable"] - - samples["length"] = len(samples["stack"]) - stack_table["length"] = len(stack_table["frame"]) - frame_table["length"] = len(frame_table["func"]) - func_table["length"] = len(func_table["name"]) - resource_table["length"] = len(resource_table["name"]) - thread_data["markers"]["length"] = len(thread_data["markers"]["name"]) - - # Clean up internal caches - del thread_data["_stackCache"] - del thread_data["_frameCache"] - del thread_data["_funcCache"] - del thread_data["_resourceCache"] - - threads.append(thread_data) - - # Main profile structure in processed format - profile = { + try: + self._prepare_for_serialization() + file = io.StringIO() + self._stream_profile(file) + return json.loads(file.getvalue()) + finally: + self.exported = True + if self.spill_dir is not None: + self.spill_dir.cleanup() + + def _profile_head(self): + return { "meta": { "interval": self.interval, "startTime": self.start_time, @@ -784,7 +849,10 @@ def _build_profile(self): }, }, "libs": self.libs, - "threads": threads, + } + + def _profile_tail(self): + return { "pages": [], "shared": { "stringArray": self.global_strings, @@ -792,4 +860,121 @@ def _build_profile(self): }, } - return profile + def _prepare_for_serialization(self): + if self.exported: + raise RuntimeError("GeckoCollector has already been exported") + self._finalize_markers() + for spill in self.thread_spills.values(): + spill.prepare_read() + for thread_data in self.threads.values(): + thread_data["stackTable"]["length"] = len(thread_data["stackTable"]["frame"]) + thread_data["frameTable"]["length"] = len(thread_data["frameTable"]["func"]) + thread_data["funcTable"]["length"] = len(thread_data["funcTable"]["name"]) + thread_data["resourceTable"]["length"] = len(thread_data["resourceTable"]["name"]) + + def _stream_profile(self, file): + head = json.dumps( + self._profile_head(), separators=(",", ":"), allow_nan=False + )[1:-1] + tail = json.dumps( + self._profile_tail(), separators=(",", ":"), allow_nan=False + )[1:-1] + file.write("{") + file.write(head) + file.write(',"threads":[') + for index, (tid, thread_data) in enumerate(self.threads.items()): + if index: + file.write(",") + self._stream_thread(file, tid, thread_data) + file.write("],") + file.write(tail) + file.write("}") + + def _stream_thread(self, file, tid, thread_data): + spill = self.thread_spills[tid] + metadata = { + "name": thread_data["name"], + "isMainThread": thread_data["isMainThread"], + "processStartupTime": thread_data["processStartupTime"], + "processShutdownTime": thread_data["processShutdownTime"], + "registerTime": thread_data["registerTime"], + "unregisterTime": thread_data["unregisterTime"], + "pausedRanges": thread_data["pausedRanges"], + "pid": thread_data["pid"], + "tid": thread_data["tid"], + "processType": thread_data["processType"], + "processName": thread_data["processName"], + } + file.write("{") + file.write(json.dumps(metadata, separators=(",", ":"), allow_nan=False)[1:-1]) + file.write(',"samples":') + self._stream_samples(file, spill) + for key in ( + "stackTable", + "frameTable", + "funcTable", + "resourceTable", + "nativeSymbols", + ): + file.write(',"') + file.write(key) + file.write('":') + file.write(json.dumps( + thread_data[key], separators=(",", ":"), allow_nan=False + )) + file.write(',"markers":') + self._stream_markers(file, spill) + file.write("}") + + def _stream_samples(self, file, spill): + file.write('{"stack":') + _stream_array(file, _tokens(spill.samples_stack), spill.sample_count) + file.write(',"time":') + _stream_array(file, _tokens(spill.samples_time), spill.sample_count) + file.write(',"eventDelay":') + _stream_array( + file, + ("null" for _ in range(spill.sample_count)), + spill.sample_count, + ) + file.write(',"weight":null,"weightType":"samples","length":') + file.write(repr(spill.sample_count)) + file.write("}") + + def _stream_markers(self, file, spill): + file.write('{"data":') + _stream_array(file, spill.markers_data.iter_lines(), spill.marker_count) + file.write(',"name":') + _stream_array(file, _tokens(spill.markers_name), spill.marker_count) + file.write(',"startTime":') + _stream_array(file, _tokens(spill.markers_start_time), spill.marker_count) + file.write(',"endTime":') + _stream_array(file, _tokens(spill.markers_end_time), spill.marker_count) + file.write(',"phase":') + _stream_array(file, _tokens(spill.markers_phase), spill.marker_count) + file.write(',"category":') + _stream_array(file, _tokens(spill.markers_category), spill.marker_count) + file.write(',"length":') + file.write(repr(spill.marker_count)) + file.write("}") + + +def _stream_array(file, token_iter, expected_count): + file.write("[") + count = 0 + for token in token_iter: + if count: + file.write(",") + file.write(token) + count += 1 + if count != expected_count: + raise RuntimeError( + f"streamed {count} array items, expected {expected_count}" + ) + file.write("]") + + +def _tokens(column): + for chunk in column.iter_chunks(): + for value in chunk: + yield repr(value) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py index 390a1479fdd2975..9fe0d8f1bb66ff1 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py @@ -624,6 +624,8 @@ def test_gecko_collector_export(self): """Test Gecko profile export functionality.""" gecko_out = tempfile.NamedTemporaryFile(suffix=".json", delete=False) self.addCleanup(close_and_unlink, gecko_out) + # We cannot overwrite an open file on Windows. + gecko_out.close() collector = GeckoCollector(1000) @@ -2659,6 +2661,7 @@ def test_gecko_collector_opcodes_enabled(self): def test_gecko_opcode_state_tracking(self): """Test that GeckoCollector tracks opcode state changes.""" collector = GeckoCollector(sample_interval_usec=1000, opcodes=True) + self.addCleanup(lambda: collector.spill_dir.cleanup()) # First sample with opcode 90 (RAISE_VARARGS) frame1 = MockFrameInfo("test.py", 10, "func", opcode=90) @@ -2680,6 +2683,7 @@ def test_gecko_opcode_state_tracking(self): def test_gecko_opcode_state_change_emits_marker(self): """Test that opcode state change emits an interval marker.""" collector = GeckoCollector(sample_interval_usec=1000, opcodes=True) + self.addCleanup(lambda: collector.spill_dir.cleanup()) # First sample: opcode 90 frame1 = MockFrameInfo("test.py", 10, "func", opcode=90) @@ -2702,14 +2706,12 @@ def test_gecko_opcode_state_change_emits_marker(self): collector.collect(frames2) # Should have emitted a marker for the first opcode - thread_data = collector.threads[1] - markers = thread_data["markers"] - # At least one marker should have been added - self.assertGreater(len(markers["name"]), 0) + self.assertGreater(collector.thread_spills[1].marker_count, 0) def test_gecko_opcode_markers_not_emitted_when_disabled(self): """Test that no opcode markers when opcodes=False.""" collector = GeckoCollector(sample_interval_usec=1000, opcodes=False) + self.addCleanup(lambda: collector.spill_dir.cleanup()) frame1 = MockFrameInfo("test.py", 10, "func", opcode=90) frames1 = [ @@ -2735,6 +2737,7 @@ def test_gecko_opcode_markers_not_emitted_when_disabled(self): def test_gecko_opcode_with_none_opcode(self): """Test that None opcode doesn't cause issues.""" collector = GeckoCollector(sample_interval_usec=1000, opcodes=True) + self.addCleanup(lambda: collector.spill_dir.cleanup()) # Frame with no opcode (None) frame = MockFrameInfo("test.py", 10, "func", opcode=None) diff --git a/Misc/NEWS.d/next/Library/2026-06-03-13-51-29.gh-issue-150662.ELT8Vg.rst b/Misc/NEWS.d/next/Library/2026-06-03-13-51-29.gh-issue-150662.ELT8Vg.rst new file mode 100644 index 000000000000000..42ed6ad7cd3c65f --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-06-03-13-51-29.gh-issue-150662.ELT8Vg.rst @@ -0,0 +1,4 @@ +Fix the ``--gecko`` collector in :mod:`profiling.sampling` that kept every +sample in memory. It now writes sample and marker data to temporary files +and reads them back, ultimately building the output file at the end. Patch +by Pablo Galindo and Maurycy Pawłowski-Wieroński.