From 7523da8194f3b858621e57258b596b28943a611b Mon Sep 17 00:00:00 2001 From: Rusty Johnson Date: Wed, 13 May 2026 10:22:12 -0700 Subject: [PATCH 1/5] rsjohnson --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 44e0723..a5271dc 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +# rsjohnson # Python Performance Lab: Sharpening Your Instincts A PyCon US 2026 hands-on tutorial. You optimize intentionally slow Python code From 7bee88f1d312a23900887d44ab5a06eda9746804 Mon Sep 17 00:00:00 2001 From: Rusty Johnson Date: Wed, 13 May 2026 10:32:03 -0700 Subject: [PATCH 2/5] rsjohnson: v2 --- rounds/1_histogram/baseline.py | 35 ++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/rounds/1_histogram/baseline.py b/rounds/1_histogram/baseline.py index 81982fa..a338e9e 100644 --- a/rounds/1_histogram/baseline.py +++ b/rounds/1_histogram/baseline.py @@ -4,21 +4,24 @@ tokens) in a binary payload. """ +from pathlib import Path -def compute_histogram(path: str) -> dict[bytes, int]: - """Frequency of every 2-byte bigram in the file at ``path``.""" - # Step 1: read the whole file into memory as a single bytes object. - with open(path, "rb") as f: - data = f.read() +# ------------------------------------------------------------------------------------------------- - # Step 2: slide a 2-byte window across the buffer. For ``b"ABCD"`` the - # iterations produce ``b"AB"``, ``b"BC"``, then ``b"CD"``. For each window, - # bump the matching bucket in a ``dict`` keyed by the bigram itself. - counts: dict[bytes, int] = {} - for i in range(len(data) - 1): - bigram = data[i : i + 2] - if bigram in counts: - counts[bigram] += 1 - else: - counts[bigram] = 1 - return counts +def compute_histogram(path): + """Return frequency of every 2-byte bigram in the file at path.""" + counts = [0] * 65536 + previous = None + + with Path(path).open("rb") as file: + while chunk := file.read(1024 * 1024): + for byte in chunk: + if previous is not None: + counts[(previous << 8) | byte] += 1 + previous = byte + + return { + bigram.to_bytes(2, "big"): count + for bigram, count in enumerate(counts) + if count + } From 14d3c3fd97af6cff21cb6ebad53ed1dce2884874 Mon Sep 17 00:00:00 2001 From: Rusty Johnson Date: Wed, 13 May 2026 11:03:45 -0700 Subject: [PATCH 3/5] rsjohnson: new version --- rounds/2_corruption/baseline.py | 156 ++++++++++++++++++++++++-------- 1 file changed, 119 insertions(+), 37 deletions(-) diff --git a/rounds/2_corruption/baseline.py b/rounds/2_corruption/baseline.py index f58a4b2..6ae30bd 100644 --- a/rounds/2_corruption/baseline.py +++ b/rounds/2_corruption/baseline.py @@ -1,46 +1,128 @@ -"""Round 2 baseline: corruption scanner. +from __future__ import annotations -Compares two equally-sized binary files and reports every contiguous run of -differing bytes as ``(offset, length)``. -""" +import mmap +import os +from os import PathLike +from typing import Union -from __future__ import annotations +import numpy as np + + +Pathish = Union[str, bytes, PathLike[str], PathLike[bytes]] + + +def find_corruptions( + ref_path: Pathish, + cor_path: Pathish, + *, + chunk_size: int = 1 << 26, # 64 MiB +) -> list[tuple[int, int]]: + """ + Return [(offset, length), ...] for every differing byte range. + + Optimizations: + - checks file sizes before reading + - memory-maps both files + - compares bytes using NumPy's native vectorized code + - records only transition points, not every differing offset + - handles corruption ranges that cross chunk boundaries + """ + ref_size = os.path.getsize(ref_path) + cor_size = os.path.getsize(cor_path) -def find_corruptions(ref_path: str, cor_path: str) -> list[tuple[int, int]]: - """Return ``[(offset, length), ...]`` for every differing byte range.""" - # Step 1: read both files fully into memory as bytes objects. - with open(ref_path, "rb") as f: - ref = f.read() - with open(cor_path, "rb") as f: - cor = f.read() - if len(ref) != len(cor): + if ref_size != cor_size: raise ValueError("reference and corrupted files differ in length") - # Step 2: walk both buffers in lockstep and record every position where - # the two files disagree. The result is a sorted list of standalone byte - # offsets, e.g. [3, 4, 5, 17, 18]. - diffs: list[int] = [] - for i in range(len(ref)): - if ref[i] != cor[i]: - diffs.append(i) - - # Step 3: collapse runs of consecutive offsets into (start, length) ranges. - # The list from step 2 becomes [(3, 3), (17, 2)]: starting at 3 there are - # three differing bytes, then starting at 17 there are two more. - if not diffs: + if ref_size == 0: return [] + + if chunk_size <= 0: + raise ValueError("chunk_size must be positive") + + chunk_size = min(chunk_size, ref_size) + ranges: list[tuple[int, int]] = [] - start = diffs[0] - prev = diffs[0] - for pos in diffs[1:]: - if pos == prev + 1: - # Still inside the current run; extend it. - prev = pos - else: - # Gap. Close the current run and start a new one. - ranges.append((start, prev - start + 1)) - start = pos - prev = pos - ranges.append((start, prev - start + 1)) # Close the final run. + append = ranges.append + + in_run = False + run_start = 0 + + # Reuse this buffer so we do not allocate a new boolean array per chunk. + diff_buffer = np.empty(chunk_size, dtype=np.bool_) + + with open(ref_path, "rb") as ref_file, open(cor_path, "rb") as cor_file: + with ( + mmap.mmap(ref_file.fileno(), 0, access=mmap.ACCESS_READ) as ref_map, + mmap.mmap(cor_file.fileno(), 0, access=mmap.ACCESS_READ) as cor_map, + ): + for offset in range(0, ref_size, chunk_size): + stop = min(offset + chunk_size, ref_size) + length = stop - offset + + ref_chunk = np.frombuffer( + ref_map, + dtype=np.uint8, + count=length, + offset=offset, + ) + cor_chunk = np.frombuffer( + cor_map, + dtype=np.uint8, + count=length, + offset=offset, + ) + + diff = diff_buffer[:length] + np.not_equal(ref_chunk, cor_chunk, out=diff) + + # Fast path: this entire chunk is identical. + if not bool(diff.any()): + if in_run: + append((run_start, offset - run_start)) + in_run = False + + del ref_chunk, cor_chunk, diff + continue + + # Fast path: this entire chunk differs. + if bool(diff.all()): + if not in_run: + run_start = offset + in_run = True + + del ref_chunk, cor_chunk, diff + continue + + # Handle a transition at the chunk boundary. + first_is_diff = bool(diff[0]) + if first_is_diff != in_run: + if in_run: + append((run_start, offset - run_start)) + in_run = False + else: + run_start = offset + in_run = True + + # Internal transitions: + # False -> True starts a corruption range. + # True -> False closes a corruption range. + transitions = np.flatnonzero(diff[1:] != diff[:-1]) + 1 + + for transition in transitions: + pos = offset + int(transition) + + if in_run: + append((run_start, pos - run_start)) + in_run = False + else: + run_start = pos + in_run = True + + # Release mmap-backed NumPy views before closing mmap objects. + del ref_chunk, cor_chunk, diff, transitions + + if in_run: + append((run_start, ref_size - run_start)) + return ranges From 946515872dbca023b05c8f70ce8265e3374f776d Mon Sep 17 00:00:00 2001 From: Rusty Johnson Date: Wed, 13 May 2026 11:34:17 -0700 Subject: [PATCH 4/5] rsjohnson: Use threads --- rounds/3_dna/solution.py | 195 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 184 insertions(+), 11 deletions(-) diff --git a/rounds/3_dna/solution.py b/rounds/3_dna/solution.py index 8b917da..70e4353 100644 --- a/rounds/3_dna/solution.py +++ b/rounds/3_dna/solution.py @@ -1,17 +1,190 @@ -"""Your Round 3 solution — DNA sequence matcher. +from __future__ import annotations -**Edit this file.** It currently delegates to ``baseline.py`` so everything -passes out of the box. Replace the body of ``find_matches`` with your -own faster implementation. -""" +import os +from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait +from os import PathLike +from typing import Iterator, Union -from .baseline import find_matches as _baseline +Pathish = Union[str, bytes, PathLike[str], PathLike[bytes]] -def find_matches(fasta_path: str, pattern: bytes) -> list[tuple[str, list[int]]]: - """Find every FASTA record whose sequence contains ``pattern``. +Record = tuple[int, str, bytearray] +SearchResult = tuple[int, str, list[int]] - Returns ``[(record_id, [positions...]), ...]`` in file order. + +def _iter_fasta_records(fasta_path: Pathish) -> Iterator[Record]: + """ + Yield FASTA records as: + + (record_index, record_id, sequence) + + The sequence is accumulated as bytes, with literal spaces removed to match + the baseline behavior. + """ + + record_id: str | None = None + sequence = bytearray() + index = 0 + + with open(fasta_path, "rb") as f: + for raw_line in f: + if raw_line[:1] == b">": + if record_id is not None: + yield index, record_id, sequence + index += 1 + + record_id = raw_line[1:].strip().decode("ascii") + sequence = bytearray() + continue + + # Ignore preamble before the first FASTA header. + if record_id is None: + continue + + line = raw_line.rstrip(b"\r\n") + + # Match the baseline's `.replace(" ", "")`. + if b" " in line: + line = line.replace(b" ", b"") + + sequence.extend(line) + + if record_id is not None: + yield index, record_id, sequence + + +def _find_overlapping_positions(sequence: bytearray, pattern: bytes) -> list[int]: + """ + Find all overlapping occurrences of pattern in sequence. + + Example: + sequence = b"AAAA" + pattern = b"AA" + result = [0, 1, 2] + """ + + # Preserve baseline behavior: + # an empty pattern matches every position from 0 through len(sequence). + if not pattern: + return list(range(len(sequence) + 1)) + + positions: list[int] = [] + append = positions.append + find = sequence.find + + start = 0 + + while True: + pos = find(pattern, start) + if pos < 0: + return positions + + append(pos) + start = pos + 1 + + +def _search_batch(batch: list[Record], pattern: bytes) -> list[SearchResult]: + """ + Worker function. + + Each worker receives a batch of records to reduce ThreadPoolExecutor + scheduling overhead for FASTA files with many small records. """ - # TODO: remove this delegation and write your own implementation here. - return _baseline(fasta_path, pattern) + + return [ + (index, record_id, _find_overlapping_positions(sequence, pattern)) + for index, record_id, sequence in batch + ] + + +def find_matches( + fasta_path: Pathish, + pattern: bytes, + *, + max_workers: int | None = None, + max_pending_batches: int | None = None, + batch_records: int = 64, + batch_bytes: int = 8 << 20, # 8 MiB of sequence data +) -> list[tuple[str, list[int]]]: + """ + Find every FASTA record whose sequence contains `pattern`. + + Returns: + [(record_id, [positions...]), ...] + + Threaded design: + - main thread parses the FASTA file + - worker threads search records in parallel + - main thread collects results and emits them in original file order + + This is designed for free-threaded Python. On normal GIL-enabled CPython, + CPU-bound speedup may be much smaller. + """ + + pattern = bytes(pattern) + + if max_workers is None: + max_workers = os.cpu_count() or 1 + if max_workers < 1: + raise ValueError("max_workers must be positive") + + if max_pending_batches is None: + max_pending_batches = max_workers * 2 + if max_pending_batches < 1: + raise ValueError("max_pending_batches must be positive") + + if batch_records < 1: + raise ValueError("batch_records must be positive") + if batch_bytes < 1: + raise ValueError("batch_bytes must be positive") + + matches: list[tuple[str, list[int]]] = [] + + # Completed records waiting to be emitted in file order. + ready: dict[int, tuple[str, list[int]]] = {} + + pending: set[Future[list[SearchResult]]] = set() + next_to_emit = 0 + + def collect(done: set[Future[list[SearchResult]]]) -> None: + nonlocal next_to_emit + + for future in done: + for index, record_id, positions in future.result(): + ready[index] = (record_id, positions) + + # Emit only when the next file-order record is available. + while next_to_emit in ready: + record_id, positions = ready.pop(next_to_emit) + + if positions: + matches.append((record_id, positions)) + + next_to_emit += 1 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + batch: list[Record] = [] + batch_size = 0 + + for record in _iter_fasta_records(fasta_path): + batch.append(record) + batch_size += len(record[2]) + + if len(batch) >= batch_records or batch_size >= batch_bytes: + pending.add(executor.submit(_search_batch, batch, pattern)) + batch = [] + batch_size = 0 + + # Backpressure: do not let the parser enqueue the whole file. + if len(pending) >= max_pending_batches: + done, pending = wait(pending, return_when=FIRST_COMPLETED) + collect(done) + + if batch: + pending.add(executor.submit(_search_batch, batch, pattern)) + + while pending: + done, pending = wait(pending, return_when=FIRST_COMPLETED) + collect(done) + + return matches From 5b07a1b98e722f3793261318460075768ac2c8d0 Mon Sep 17 00:00:00 2001 From: Rusty Johnson Date: Wed, 13 May 2026 11:56:37 -0700 Subject: [PATCH 5/5] rsjohnson3: Speedup --- rounds/3_dna/solution.py | 209 +++++++++++++++++++++++++-------------- 1 file changed, 134 insertions(+), 75 deletions(-) diff --git a/rounds/3_dna/solution.py b/rounds/3_dna/solution.py index 70e4353..3ed20ad 100644 --- a/rounds/3_dna/solution.py +++ b/rounds/3_dna/solution.py @@ -1,5 +1,6 @@ from __future__ import annotations +import mmap import os from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from os import PathLike @@ -8,54 +9,67 @@ Pathish = Union[str, bytes, PathLike[str], PathLike[bytes]] -Record = tuple[int, str, bytearray] +# (record_index, record_start_offset, record_end_offset) +Span = tuple[int, int, int] + +# (record_index, record_id, match_positions) SearchResult = tuple[int, str, list[int]] +# Baseline behavior removes spaces and newlines from sequence text. +# In binary mode we also remove '\r' to match text-mode universal newlines. +_DELETE_SEQUENCE_BYTES = b" \r\n" -def _iter_fasta_records(fasta_path: Pathish) -> Iterator[Record]: - """ - Yield FASTA records as: - (record_index, record_id, sequence) +def _default_worker_count() -> int: + # Python 3.13+ may expose process_cpu_count(), which respects CPU limits. + process_cpu_count = getattr(os, "process_cpu_count", None) + + if process_cpu_count is not None: + count = process_cpu_count() + else: + count = os.cpu_count() + + return count or 1 + - The sequence is accumulated as bytes, with literal spaces removed to match - the baseline behavior. +def _iter_record_spans(mm: mmap.mmap, size: int) -> Iterator[Span]: """ + Yield FASTA record byte ranges. - record_id: str | None = None - sequence = bytearray() - index = 0 + Assumes valid FASTA-style records where headers begin with '>' at the start + of a line. This is faster than splitting the whole file on b'>'. + """ - with open(fasta_path, "rb") as f: - for raw_line in f: - if raw_line[:1] == b">": - if record_id is not None: - yield index, record_id, sequence - index += 1 + if size == 0: + return - record_id = raw_line[1:].strip().decode("ascii") - sequence = bytearray() - continue + if mm[:1] == b">": + start = 0 + else: + marker = mm.find(b"\n>") + if marker < 0: + return + start = marker + 1 - # Ignore preamble before the first FASTA header. - if record_id is None: - continue + index = 0 - line = raw_line.rstrip(b"\r\n") + while start < size: + next_marker = mm.find(b"\n>", start + 1) + end = size if next_marker < 0 else next_marker - # Match the baseline's `.replace(" ", "")`. - if b" " in line: - line = line.replace(b" ", b"") + yield index, start, end - sequence.extend(line) + index += 1 - if record_id is not None: - yield index, record_id, sequence + if next_marker < 0: + break + start = next_marker + 1 -def _find_overlapping_positions(sequence: bytearray, pattern: bytes) -> list[int]: + +def _find_overlapping_positions(sequence: bytes, pattern: bytes) -> list[int]: """ - Find all overlapping occurrences of pattern in sequence. + Return every overlapping occurrence of pattern in sequence. Example: sequence = b"AAAA" @@ -63,8 +77,7 @@ def _find_overlapping_positions(sequence: bytearray, pattern: bytes) -> list[int result = [0, 1, 2] """ - # Preserve baseline behavior: - # an empty pattern matches every position from 0 through len(sequence). + # Match the baseline's empty-pattern behavior. if not pattern: return list(range(len(sequence) + 1)) @@ -76,6 +89,7 @@ def _find_overlapping_positions(sequence: bytearray, pattern: bytes) -> list[int while True: pos = find(pattern, start) + if pos < 0: return positions @@ -83,18 +97,40 @@ def _find_overlapping_positions(sequence: bytearray, pattern: bytes) -> list[int start = pos + 1 -def _search_batch(batch: list[Record], pattern: bytes) -> list[SearchResult]: +def _search_batch( + mm: mmap.mmap, + spans: list[Span], + pattern: bytes, +) -> list[SearchResult]: """ Worker function. - Each worker receives a batch of records to reduce ThreadPoolExecutor - scheduling overhead for FASTA files with many small records. + Each worker processes a batch of records. Batching is important for a file + with ~10k sequences because submitting 10k individual futures is wasteful. """ - return [ - (index, record_id, _find_overlapping_positions(sequence, pattern)) - for index, record_id, sequence in batch - ] + results: list[SearchResult] = [] + append_result = results.append + delete_bytes = _DELETE_SEQUENCE_BYTES + + for index, start, end in spans: + header_end = mm.find(b"\n", start, end) + + if header_end < 0: + # Header-only record. + record_id = mm[start + 1 : end].strip().decode("ascii") + sequence = b"" + else: + record_id = mm[start + 1 : header_end].strip().decode("ascii") + + # This does sequence normalization in C: + # remove line breaks and spaces from the sequence portion. + sequence = mm[header_end + 1 : end].translate(None, delete_bytes) + + positions = _find_overlapping_positions(sequence, pattern) + append_result((index, record_id, positions)) + + return results def find_matches( @@ -102,9 +138,9 @@ def find_matches( pattern: bytes, *, max_workers: int | None = None, + batch_records: int = 128, + batch_bytes: int = 8 << 20, # 8 MiB max_pending_batches: int | None = None, - batch_records: int = 64, - batch_bytes: int = 8 << 20, # 8 MiB of sequence data ) -> list[tuple[str, list[int]]]: """ Find every FASTA record whose sequence contains `pattern`. @@ -112,38 +148,48 @@ def find_matches( Returns: [(record_id, [positions...]), ...] - Threaded design: - - main thread parses the FASTA file - - worker threads search records in parallel - - main thread collects results and emits them in original file order + Tuned for roughly: + - 512 MB input + - ~10,145 records + - free-threaded CPython - This is designed for free-threaded Python. On normal GIL-enabled CPython, - CPU-bound speedup may be much smaller. + The defaults create approximately 60-90 tasks for your file size, rather + than 10,145 tiny tasks. """ pattern = bytes(pattern) + # Preserve the baseline's assumption that the pattern is ASCII text. + pattern.decode("ascii") + if max_workers is None: - max_workers = os.cpu_count() or 1 + max_workers = _default_worker_count() + if max_workers < 1: raise ValueError("max_workers must be positive") - if max_pending_batches is None: - max_pending_batches = max_workers * 2 - if max_pending_batches < 1: - raise ValueError("max_pending_batches must be positive") - if batch_records < 1: raise ValueError("batch_records must be positive") + if batch_bytes < 1: raise ValueError("batch_bytes must be positive") + if max_pending_batches is None: + max_pending_batches = max_workers * 4 + + if max_pending_batches < 1: + raise ValueError("max_pending_batches must be positive") + + size = os.path.getsize(fasta_path) + + if size == 0: + return [] + matches: list[tuple[str, list[int]]] = [] # Completed records waiting to be emitted in file order. ready: dict[int, tuple[str, list[int]]] = {} - pending: set[Future[list[SearchResult]]] = set() next_to_emit = 0 def collect(done: set[Future[list[SearchResult]]]) -> None: @@ -153,7 +199,7 @@ def collect(done: set[Future[list[SearchResult]]]) -> None: for index, record_id, positions in future.result(): ready[index] = (record_id, positions) - # Emit only when the next file-order record is available. + # Preserve file order even when worker batches complete out of order. while next_to_emit in ready: record_id, positions = ready.pop(next_to_emit) @@ -162,29 +208,42 @@ def collect(done: set[Future[list[SearchResult]]]) -> None: next_to_emit += 1 - with ThreadPoolExecutor(max_workers=max_workers) as executor: - batch: list[Record] = [] - batch_size = 0 - - for record in _iter_fasta_records(fasta_path): - batch.append(record) - batch_size += len(record[2]) + with open(fasta_path, "rb") as file: + with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as mm: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + pending: set[Future[list[SearchResult]]] = set() - if len(batch) >= batch_records or batch_size >= batch_bytes: - pending.add(executor.submit(_search_batch, batch, pattern)) - batch = [] + batch: list[Span] = [] batch_size = 0 - # Backpressure: do not let the parser enqueue the whole file. - if len(pending) >= max_pending_batches: - done, pending = wait(pending, return_when=FIRST_COMPLETED) - collect(done) + for span in _iter_record_spans(mm, size): + _, start, end = span + + batch.append(span) + batch_size += end - start - if batch: - pending.add(executor.submit(_search_batch, batch, pattern)) + if len(batch) >= batch_records or batch_size >= batch_bytes: + pending.add(executor.submit(_search_batch, mm, batch, pattern)) - while pending: - done, pending = wait(pending, return_when=FIRST_COMPLETED) - collect(done) + batch = [] + batch_size = 0 + + # Backpressure. Avoid queueing unbounded work. + if len(pending) >= max_pending_batches: + done, pending = wait( + pending, + return_when=FIRST_COMPLETED, + ) + collect(done) + + if batch: + pending.add(executor.submit(_search_batch, mm, batch, pattern)) + + while pending: + done, pending = wait( + pending, + return_when=FIRST_COMPLETED, + ) + collect(done) return matches