Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ scripts/
```

Each round's `data/` directory is generated locally and gitignored.
This is <your-name>'s PR
This is Bogomil's PR
118 changes: 112 additions & 6 deletions rounds/3_dna/solution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,119 @@
own faster implementation.
"""

from .baseline import find_matches as _baseline
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
from typing import Iterable

def find_matches(fasta_path: str, pattern: bytes) -> list[tuple[str, list[int]]]:
"""Find every FASTA record whose sequence contains ``pattern``.

Returns ``[(record_id, [positions...]), ...]`` in file order.
def _search_record(
item: tuple[int, str, str],
pattern: str,
) -> tuple[int, str, list[int]] | None:
"""
# TODO: remove this delegation and write your own implementation here.
return _baseline(fasta_path, pattern)
Search one FASTA record.

Returns:
(original_index, record_id, positions)
or None if no matches.
"""
index, record_id, sequence = item

positions: list[int] = []
start = 0

while True:
pos = sequence.find(pattern, start)
if pos == -1:
break

positions.append(pos)

# advance by 1 so overlapping matches count
start = pos + 1

if positions:
return (index, record_id, positions)

return None


def _parse_fasta(path: str) -> Iterable[tuple[int, str, str]]:
"""
Stream FASTA records one at a time.

Yields:
(record_index, record_id, sequence)
"""
with open(path, "r") as f:
record_id = None
seq_parts: list[str] = []
index = 0

for line in f:
line = line.strip()

if not line:
continue

if line.startswith(">"):
# emit previous record
if record_id is not None:
yield (
index,
record_id,
"".join(seq_parts),
)
index += 1

record_id = line[1:].strip()
seq_parts = []

else:
seq_parts.append(line)

# emit final record
if record_id is not None:
yield (
index,
record_id,
"".join(seq_parts),
)


def find_matches(
fasta_path: str,
pattern: bytes,
) -> list[tuple[str, list[int]]]:
"""
Find every FASTA record whose sequence contains pattern.

Returns:
[(record_id, [positions...]), ...]
"""
pattern_str = pattern.decode("ascii")

results: list[tuple[int, str, list[int]]] = []

# free-threaded Python can actually parallelize this
with ThreadPoolExecutor() as pool:

futures = [
pool.submit(_search_record, record, pattern_str)
for record in _parse_fasta(fasta_path)
]

for future in futures:
result = future.result()

if result is not None:
results.append(result)

# preserve FASTA file order
results.sort(key=lambda x: x[0])

return [
(record_id, positions)
for _, record_id, positions in results
]
Loading