Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
- unreleased (feat/warc-range-sources)
+ new `repackage` command: extract a WARC subset from pluggable sources (cdx / sql{athena,duckdb} / csv), reading source WARCs over HTTP or S3 and writing verbatim
+ `--processes N` multi-process fetching (one event loop per core); range jobs are sharded by WARC filename and the shards are merged into a single `<prefix>.warc.gz` with one warcinfo record (server-side on S3 / streamed locally). `--keep-shards` keeps the shards
+ one writer per process (removed the multi-writer-per-process fan-out); `--parallel_readers` sets async readers per process
+ optional uvloop event loop via `CDXT_UVLOOP=1`
+ see docs/notes/warc-fetcher-performance.md for benchmarks and tuning

- 0.9.38
+ deprecated support for py3.7 and py.3.8
+ added support for py3.13 and py3.14
Expand Down
98 changes: 49 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,68 +305,68 @@ Filtering throughput depends on your machine. For reference,
on an AWS EC2 c5n.xlarge instance filtering all 300 CDX files
from CC-MAIN-2024-30 takes ~1.4 hours with 100k URLs in the whitelist.

## WARC extraction using CDX files
## WARC extraction / repackaging

You can extract parts of WARC files using the cdxt command line script.
The WARC extraction can read CDX files from local and remote file
systems, like S3 buckets. Multiple CDX files can be defined
using a glob pattern. For downloading WARC parts from HTTP or S3, you can
define the download prefix, e.g., `s3://commoncrawl` for S3 download.
The `cdxt repackage` command extracts a subset of WARC records (selected by a
columnar/CDX index) and writes them into one or more new WARC files. Records are
copied **verbatim** (no recompression). It can read the source WARCs over HTTP or,
much faster in-region, directly from S3 (`--warc-download-prefix=s3://commoncrawl`),
and write the output to the local filesystem or S3.

```
$ cdxt -v --cc warc_by_cdx \
<path_to_cdx> [--cdx-glob <glob pattern, e.g., "*.gz">] \
--prefix <output prefix> \
--warc-download-prefix=<warc download prefix, e.g., s3://commoncrawl> \
--creator <name and contact of creator> \
--operator <name and contact of creator> \
[--implementation <fsspec or aiobot3, defaults to fsspec>]
[--write-paths-as-resource-records <one or more paths for resource records>]
[--write-paths-as-resource-records-metadata <one or more paths for metadata of resource records>]
```
The records to extract come from a pluggable source, chosen with `--target-source`:

By default, we use a [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html)
implementation to write and read to local or remote file systems.
For better throughput for S3 read/write, we have also a specific implementation
using [aioboto3](https://github.com/terricain/aioboto3) that you can enable with
the `--implementation=aioboto3` argument. With aioboto3, we achieved ~ 80 requests / second
on an AWS EC2 c5n.xlarge instance.
- `cdx` — CDX index file(s) (`--cdx-path`, `--cdx-glob`);
- `sql` — the CC columnar index via `--engine athena` or `--engine duckdb`
(filter with `--hostnames` / `--domains`, or raw `--query` / `--query-file`);
- `csv` — a range-jobs CSV (`--csv-path`) of `warc_filename,warc_record_offset,warc_record_length`.

You can add one or multiple files with metadata as resource records to
the extracted WARC. For instance, this is useful to maintain the CDX filter
inputs, e.g., the whitelist list. To do this, you need to provide the
corresponding file paths as arguments `--write-paths-as-resource-records=s3:///my-s3-bucket/path/to/my-url-whitelist.txt`
and `--write-paths-as-resource-records-metadata=s3:///my-s3-bucket/path/to/metadata.json`.
The metadata file is optional and can have the following optional fields:
You can materialize the selected ranges to a CSV without fetching
(`--range-jobs-output FILE --no-fetch`) and fetch them later with `--target-source csv`.

```json
{
"warc_content_type": "str",
"uri": "str",
"http_headers": {"k": "v"},
"warc_headers_dict": {"k": "v"}
}
```
### Multi-core fetching and a single output file

This in one example for a metadata JSON file:
The fetcher is asyncio-based: many concurrent range reads, but a single event loop
runs on one CPU core. For large jobs in-region the limiter is request-rate on that
core. Use `--processes N` (set it to the vCPU count) to run N worker processes, each
with its own event loop; `--parallel_readers R` sets the async readers per process.

The range jobs are sharded by WARC filename across the processes, each process writes
one shard, and the shards are **merged into a single `<prefix>.warc.gz`** (server-side
on S3, or streamed locally) with a single `warcinfo` record. Pass `--keep-shards` to
keep the per-process shards instead.

```json
{
"uri": "filter_cdx.gz",
"warc_content_type": "application/cdx",
}
```
$ CDXT_UVLOOP=1 cdxt -v repackage \
--target-source csv --csv-path range-jobs.csv \
--prefix s3://my-bucket/path/homepages \
--warc-download-prefix=s3://commoncrawl \
--processes 4 --parallel_readers 48 \
--creator "..." --operator "..." --is-part-of "CC-MAIN-2026-21"
# -> writes a single s3://my-bucket/path/homepages.warc.gz
```

Setting `CDXT_UVLOOP=1` uses the [uvloop](https://github.com/MagicStack/uvloop) event
loop (install `uvloop` separately) for a small single-core speedup.

On an AWS EC2 c5n.xlarge (4 vCPU) in us-east-1, reading from `s3://commoncrawl`, a
~1 GiB / ~35k-record homepages job runs at ~1100 records/s (~450/s per core, scaling
~linearly with `--processes`). See
[docs/notes/warc-fetcher-performance.md](docs/notes/warc-fetcher-performance.md) for the
full analysis and tuning guidance.

### Metadata records

The full WARC extraction command could look like this:
You can add one or more files as WARC `metadata` records at the top of the output
(after the `warcinfo` record) with `--write-paths-as-metadata-records`. This is useful
to carry, e.g., the filter/whitelist inputs alongside the extracted records:

```
$ cdxt -v --cc warc_by_cdx \
s3://my-s3-bucket/filtered-cdxs --cdx-glob "*.gz" \
--prefix /local/path/filtered-warcs/ \
$ cdxt -v repackage \
--target-source cdx s3://my-bucket/filtered-cdxs --cdx-glob "*.gz" \
--prefix /local/path/filtered-warcs \
--warc-download-prefix=s3://commoncrawl \
--creator foo --operator bob \
--write-paths-as-resource-records=s3:///my-s3-bucket/path/to/my-url-whitelist.txt \
--write-paths-as-resource-records-metadata=s3:///my-s3-bucket/path/to/metadata.json
--write-paths-as-metadata-records /path/to/url-whitelist.txt /path/to/metadata.json
```

## TODO
Expand Down
14 changes: 7 additions & 7 deletions cdx_toolkit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from cdx_toolkit.filter_cdx.command import run_filter_cdx
from cdx_toolkit.filter_cdx.args import add_filter_cdx_args

from cdx_toolkit.filter_warc.command import run_warcer_by_cdx
from cdx_toolkit.filter_warc.args import add_warcer_by_cdx_args
from cdx_toolkit.filter_warc.command import run_repackage
from cdx_toolkit.filter_warc.args import add_repackage_args


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -124,12 +124,12 @@ def main(args=None):
warc.add_argument('url')
warc.set_defaults(func=warcer)

warc_by_cdx = subparsers.add_parser(
'warc_by_cdx',
help='iterate over capture content based on an CDX index file, creating a warc'
repackage = subparsers.add_parser(
'repackage',
help='repackage WARC ranges from a CDX/SQL/CSV source into a new WARC'
)
add_warcer_by_cdx_args(warc_by_cdx)
warc_by_cdx.set_defaults(func=run_warcer_by_cdx)
add_repackage_args(repackage)
repackage.set_defaults(func=run_repackage)

filter_cdx = subparsers.add_parser('filter_cdx', help='Filter CDX files based on SURT prefixes whitelist')
add_filter_cdx_args(filter_cdx)
Expand Down
125 changes: 99 additions & 26 deletions cdx_toolkit/filter_warc/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,59 +5,118 @@
logger = logging.getLogger(__name__)


def add_warcer_by_cdx_args(parser: argparse.ArgumentParser):
def add_repackage_args(parser: argparse.ArgumentParser):
# --- CDX source ---
parser.add_argument(
'--cdx-path',
type=str,
default=None,
help='Path to CDX index file (local or remote, e.g. S3). Required if target source is set to `cdx`.',
help='Path to CDX index file (local or remote, e.g. S3). Used when --target-source cdx.',
)
parser.add_argument(
'--cdx-glob',
type=str,
default=None,
help='a glob pattern for read from multiple CDX indices',
)
# --- SQL source (--target-source sql --engine athena|duckdb) ---
parser.add_argument(
'--athena-hostnames',
'--engine',
type=str,
nargs="+",
default=None,
help=('Hostnames to filter for via Athena (whitelist). Use this OR --athena-query/'
'--athena-query-file (mutually exclusive) when target source is `athena`.'),
choices=['athena', 'duckdb'],
help='SQL engine for the columnar index. Required when --target-source sql.',
)
parser.add_argument(
'--athena-query',
'--hostnames',
type=str,
nargs='+',
default=None,
help=('Raw Athena SQL to run instead of the hostname-based query (power users). The query '
'must SELECT the columns warc_filename, warc_record_offset, warc_record_length. '
'Mutually exclusive with --athena-hostnames and --athena-query-file.'),
help=('Exact hostnames (url_host_name, e.g. www.example.com) to filter for via the SQL '
'index. Combine with --domains; mutually exclusive with --query/--query-file. '
'Combine with the global --crawl to restrict the scan to specific crawls '
'(strongly recommended for cost).'),
)
parser.add_argument(
'--athena-query-file',
'--domains',
type=str,
nargs='+',
default=None,
help='Path to a file containing the raw Athena SQL (alternative to --athena-query).',
help=('Registered domains (url_host_registered_domain, e.g. example.com) to filter for via '
'the SQL index; also matches subdomains. Combine with --hostnames; mutually exclusive '
'with --query/--query-file.'),
)
parser.add_argument(
'--query',
type=str,
default=None,
help=('Raw SQL to run instead of the hostname-based query (power users). Must SELECT the '
'columns warc_filename, warc_record_offset, warc_record_length. Engine-specific '
'dialect. Mutually exclusive with --hostnames and --query-file.'),
)
parser.add_argument(
'--query-file',
type=str,
default=None,
help='Path to a file containing the raw SQL (alternative to --query).',
)
parser.add_argument(
'--athena-database',
type=str,
default=None,
help='Athena database. Required if target source is set to `athena`.',
help='Athena database (engine=athena). Defaults to `ccindex`.',
)
parser.add_argument(
'--athena-s3-output',
type=str,
default=None,
help='Athena S3 output location. Required if target source is set to `athena`.',
help='Athena S3 output location (engine=athena). Required for engine=athena.',
)
parser.add_argument(
'--confirm-athena-cost',
'--duckdb-index-path',
type=str,
default='s3://commoncrawl/cc-index/table/cc-main/warc/',
help='Base S3 path to the CC columnar index parquet (engine=duckdb).',
)
parser.add_argument(
'--confirm-cost',
action='store_true',
help=('Skip the Athena cost-confirmation prompt and run even unpartitioned / large-scan '
help=('Skip the cost-confirmation prompt and run even unpartitioned / large-scan SQL '
'queries. Athena bills per TB scanned; restrict with --crawl to reduce cost.'),
)
# --- CSV source ---
parser.add_argument(
'--csv-path',
type=str,
default=None,
help='Path to a range-jobs CSV/TSV (local or remote). Used when --target-source csv.',
)
# --- Range-jobs materialization (any source) ---
parser.add_argument(
'--range-jobs-output',
type=str,
default=None,
help='If set, write each generated RangeJob to this CSV (filename,offset,length by default).',
)
parser.add_argument(
'--no-fetch',
action='store_true',
help='Only generate range jobs (write --range-jobs-output); skip fetching/writing WARCs.',
)
parser.add_argument(
'--csv-self-contained',
action='store_true',
help='Write full URLs (warc_url,...) to --range-jobs-output instead of relative filenames.',
)
parser.add_argument(
'--no-sort-ranges',
action='store_true',
help=('Do not sort range jobs by (warc_filename, warc_record_offset) before fetching. '
'Sorting (the default) groups records of the same WARC file with ascending offsets '
'for better S3 range-read locality; it adds an ORDER BY to a guided SQL query and '
'buffers a CSV source in memory. Disable for an already-sorted or very large CSV, or '
'to preserve a raw query/CSV order.'),
)
parser.add_argument('--prefix', default='TEST', help='prefix for the output warc filename')
parser.add_argument(
'--subprefix',
Expand Down Expand Up @@ -99,22 +158,35 @@ def add_warcer_by_cdx_args(parser: argparse.ArgumentParser):
help='Paths to multiple files. File content is written to as a metadata record to each the WARC file',
)
parser.add_argument(
'--parallel',
'--processes',
type=int,
default=1,
help='Number of parallel workers for reading and writing WARC records (default: 1, sequential processing)',
help=('Number of worker processes for fetching (default: 1). A single asyncio loop '
'saturates one CPU core on many small range reads; set this to the vCPU count '
'to use all cores. The range jobs are sharded by WARC filename across processes '
'and the per-process output shards are merged into a single <prefix>.warc.gz '
'(one warcinfo record). Each process uses --parallel_readers async readers.'),
)
parser.add_argument(
'--parallel_readers',
'--keep-shards',
action='store_true',
help='In multi-process mode, keep the intermediate per-process shard WARCs instead of '
'deleting them after the merge.',
)
parser.add_argument(
'--parallel',
type=int,
default=None,
help='Number of parallel workers for reading WARC records (default: same as `parallel`)',
default=1,
help='Number of async readers per process for fetching WARC records (default: 1). Each '
'process has a single writer; combine with --processes to use multiple cores.',
)
parser.add_argument(
'--parallel_writers',
'--parallel_readers',
type=int,
default=None,
help='Number of parallel workers for writing WARC records (default: same as `parallel`)',
help='Number of async readers per process for reading WARC records (default: same as '
'`parallel`). Each process has a single writer; use --processes for multi-core '
'scaling and output sharding.',
)
parser.add_argument(
'--log_every_n',
Expand All @@ -126,8 +198,9 @@ def add_warcer_by_cdx_args(parser: argparse.ArgumentParser):
'--target-source',
action='store',
default='cdx',
help=('Source from that the filter targets are loaded (available options: `cdx`, `athena`; '
'defaults to `cdx`). For `athena`, use the global --crawl to restrict the scan to '
'specific crawls (strongly recommended; Athena bills per TB scanned).'),
choices=['cdx', 'sql', 'csv'],
help=('Where range jobs come from: `cdx` (index files), `sql` (columnar index via '
'--engine athena|duckdb), or `csv` (a range-jobs CSV). Defaults to `cdx`. For `sql`, '
'use the global --crawl to restrict the scan to specific crawls (recommended for cost).'),
)
return parser
Loading