diff --git a/docs/adr/0003-deps-bq-table-selection.md b/docs/adr/0003-deps-bq-table-selection.md index 84a222105d..78247575dd 100644 --- a/docs/adr/0003-deps-bq-table-selection.md +++ b/docs/adr/0003-deps-bq-table-selection.md @@ -1,4 +1,4 @@ -# ADR-0003: Use DependencyGraphEdgesLatest for deps ingestion; defer DependenciesLatest until NUGET needed +# ADR-0003: Use DependencyGraphEdgesLatest for deps ingestion; defer DependenciesLatest until NUGET or GO needed **Date**: 2026-05-29 **Status**: accepted @@ -14,9 +14,9 @@ NPM + MAVEN only. ## Decision Use `DependencyGraphEdgesLatest` (Option A) with `From.Name = Name AND From.Version = Version` as the depth-1 filter. Switch to `DependenciesLatest` -(Option B) only when NUGET ingestion is required, since Option A does not -support NUGET. At that point evaluate whether to migrate all ecosystems or -add NUGET via Option B only. +(Option B) only when NUGET or GO ingestion is required, since Option A does not +support either ecosystem. At that point evaluate whether to migrate all ecosystems +or add GO/NUGET via Option B only. ## Alternatives Considered @@ -27,7 +27,7 @@ add NUGET via Option B only. e.g. `^1.2.3`); only exposes resolved version - **Why not**: NPM and MAVEN are the only ecosystems needed now. Option A retains `version_constraint` which has security feature value. Cost delta is - acceptable for the current scope. Re-evaluate when NUGET is required. + acceptable for the current scope. Re-evaluate when NUGET or GO is required. ## Consequences @@ -36,11 +36,15 @@ add NUGET via Option B only. - No migration risk — simpler to stay on the table already coded ### Negative -- NUGET not supported; must revisit when NUGET ingestion is added +- Only NPM, MAVEN, PYPI, CARGO are present in `DependencyGraphEdgesLatest`. + GO and NUGET are absent entirely — confirmed via BQ query on the live table + (`SELECT System, COUNT(*) … GROUP BY System` returns exactly those 4). + GO uses Minimal Version Selection (no graph resolution by deps.dev); + NUGET is simply not covered. Must revisit when either ecosystem is added. - Full bootstrap ~$1,291 vs ~$494 for NPM+MAVEN (Option B cheaper) - Weekly incremental ~$26 vs ~$7 (Option B cheaper) ### Risks -- If NUGET is added before a re-evaluation, the team may miss that Option B +- If NUGET or GO is added before a re-evaluation, the team may miss that Option B is required. Mitigation: this ADR and the note in `personal/osspckgs-bq-cost-report.txt` flag the trigger condition. diff --git a/docs/adr/README.md b/docs/adr/README.md index 7a4c84f09d..9bb2045dca 100644 --- a/docs/adr/README.md +++ b/docs/adr/README.md @@ -9,7 +9,7 @@ Use the `/adr` skill in Claude Code to record new ADRs or query past decisions. | ADR | Title | Status | Date | | --------------------------------------------------- | ---------------------------------------- | ------ | ---------- | | [ADR-0001](./0001-oss-packages-design-decisions.md) | OSS packages — design decisions (living) | living | 2026-05-27 | -| [ADR-0003](./0003-deps-bq-table-selection.md) | Use DependencyGraphEdgesLatest for deps ingestion | accepted | 2026-05-29 | +| [ADR-0003](./0003-deps-bq-table-selection.md) | Use DependencyGraphEdgesLatest for deps ingestion; defer DependenciesLatest until NUGET or GO needed | accepted | 2026-05-29 | ## Why ADRs? diff --git a/scripts/cli b/scripts/cli index 0c9ce03ded..84b678f83e 100755 --- a/scripts/cli +++ b/scripts/cli @@ -255,6 +255,55 @@ function select_services() { echo "Selected services: $selected_services" } +function monitor_workflow() { + local REPOSITORY="$1" + local WORKFLOW_FILE="$2" + local EXISTING_IDS="${3:-}" + + yell "Waiting for workflow run to appear..." + + local RUN_ID="" + local ATTEMPTS=0 + + while [[ -z "$RUN_ID" && $ATTEMPTS -lt 24 ]]; do + sleep 5 + local CANDIDATE + CANDIDATE=$(gh run list \ + --repo "$REPOSITORY" \ + --workflow "$WORKFLOW_FILE" \ + --limit 1 \ + --json databaseId \ + --jq '.[0].databaseId // empty' 2>/dev/null || echo "") + + if [[ -n "$CANDIDATE" ]] && ! echo "$EXISTING_IDS" | grep -q "^${CANDIDATE}$"; then + RUN_ID="$CANDIDATE" + fi + + ATTEMPTS=$((ATTEMPTS + 1)) + done + + if [[ -z "$RUN_ID" ]]; then + error "Could not find new workflow run after $((ATTEMPTS * 5)) seconds." + return 1 + fi + + say "Monitoring run #$RUN_ID — https://github.com/$REPOSITORY/actions/runs/$RUN_ID" + nl + + gh run watch "$RUN_ID" --repo "$REPOSITORY" || true + + local CONCLUSION + CONCLUSION=$(gh run view "$RUN_ID" --repo "$REPOSITORY" --json conclusion --jq '.conclusion') + + nl + if [[ "$CONCLUSION" == "success" ]]; then + say "✓ Deployment succeeded!" + else + error "✗ Deployment failed: $CONCLUSION" + return 1 + fi +} + function deploy_staging() { REPOSITORY="CrowdDotDev/crowd.dev" WORKFLOW_FILE="lf-oracle-staging-deploy.yaml" @@ -264,7 +313,11 @@ function deploy_staging() { echo "${SELECTED_SERVICES[*]}" )" + local EXISTING_IDS + EXISTING_IDS=$(gh run list --repo "$REPOSITORY" --workflow "$WORKFLOW_FILE" --limit 10 --json databaseId --jq '.[].databaseId' 2>/dev/null || echo "") + gh workflow run $WORKFLOW_FILE --repo $REPOSITORY --ref $CURRENT_BRANCH -f services="$SERVICES" + monitor_workflow "$REPOSITORY" "$WORKFLOW_FILE" "$EXISTING_IDS" } function deploy_production() { @@ -276,7 +329,11 @@ function deploy_production() { echo "${SELECTED_SERVICES[*]}" )" + local EXISTING_IDS + EXISTING_IDS=$(gh run list --repo "$REPOSITORY" --workflow "$WORKFLOW_FILE" --limit 10 --json databaseId --jq '.[].databaseId' 2>/dev/null || echo "") + gh workflow run $WORKFLOW_FILE --repo $REPOSITORY --ref $CURRENT_BRANCH -f services="$SERVICES" + monitor_workflow "$REPOSITORY" "$WORKFLOW_FILE" "$EXISTING_IDS" } function reset_selected_services() { diff --git a/services/apps/packages_worker/package.json b/services/apps/packages_worker/package.json index 04a6f35efd..f030b1af59 100644 --- a/services/apps/packages_worker/package.json +++ b/services/apps/packages_worker/package.json @@ -39,6 +39,8 @@ "backfill:stewardship:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=stewardship-backfill LOG_LEVEL=info tsx src/bin/stewardship-backfill.ts", "monitor:osspckgs": "SERVICE=bq-dataset-ingest tsx src/scripts/monitorOsspckgs.ts", "monitor:osspckgs:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=bq-dataset-ingest tsx src/scripts/monitorOsspckgs.ts", + "dedup-package-deps": "SERVICE=bq-dataset-ingest tsx src/scripts/dedupPackageDeps.ts", + "dedup-package-deps:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=bq-dataset-ingest tsx src/scripts/dedupPackageDeps.ts", "lint": "npx eslint --ext .ts src --max-warnings=0", "format": "npx prettier --write \"src/**/*.ts\"", "format-check": "npx prettier --check .", diff --git a/services/apps/packages_worker/src/criticality/activities.ts b/services/apps/packages_worker/src/criticality/activities.ts index 1f1ef1a1d8..dc0c18d1ec 100644 --- a/services/apps/packages_worker/src/criticality/activities.ts +++ b/services/apps/packages_worker/src/criticality/activities.ts @@ -78,6 +78,7 @@ export async function rankPackages(): Promise<{ scoredRows: number; rankedRows: const jobId = existing?.id ?? (await createIngestJob(qx, 'ranking', 'ranking', null)) try { + await markJobStatus(qx, jobId, 'merging') const [result] = await qx.select(`SELECT * FROM rank_packages()`) const scoredRows = Number(result.scored_rows ?? 0) const rankedRows = Number(result.ranked_rows ?? 0) diff --git a/services/apps/packages_worker/src/deps-dev/activities/bqExportToGcs.ts b/services/apps/packages_worker/src/deps-dev/activities/bqExportToGcs.ts index da5c89c2cc..a594295af0 100644 --- a/services/apps/packages_worker/src/deps-dev/activities/bqExportToGcs.ts +++ b/services/apps/packages_worker/src/deps-dev/activities/bqExportToGcs.ts @@ -24,6 +24,7 @@ export interface BqExportToGcsInput { maxBytesGb: number reuseExports?: boolean exportName?: string + ecosystems?: string[] } export interface BqExportToGcsOutput { @@ -34,7 +35,17 @@ export interface BqExportToGcsOutput { } export async function bqExportToGcs(input: BqExportToGcsInput): Promise { - const { jobKind, sql, runId, syncMode, snapshotAt, maxBytesGb, reuseExports, exportName } = input + const { + jobKind, + sql, + runId, + syncMode, + snapshotAt, + maxBytesGb, + reuseExports, + exportName, + ecosystems, + } = input // Named exports use a stable GCS path independent of runId so they survive across bootstrap runs. const namedGcsPrefix = exportName @@ -93,7 +104,10 @@ export async function bqExportToGcs(input: BqExportToGcsInput): Promise = await qx.select(NON_CONSTRAINT_INDEXES_SQL) const existingDefs = existing.map((r: { indexdef: string }) => r.indexdef.toLowerCase()) + const toRebuild = SECONDARY_INDEXES.filter( + (idx) => !existingDefs.some((def: string) => def.includes(`(${idx.columns})`)), + ) const rebuilt: string[] = [] - for (const idx of SECONDARY_INDEXES) { - const alreadyExists = existingDefs.some((def: string) => def.includes(`(${idx.columns})`)) - if (alreadyExists) { - log.info({ columns: idx.columns }, 'Index already exists, skipping') - continue - } - log.info({ columns: idx.columns }, 'Creating index on package_dependencies') - await qx.result(idx.createSql) - rebuilt.push(idx.columns) - } + // Build indexes in parallel — each on its own connection so they run concurrently. + await Promise.all( + toRebuild.map(async (idx) => { + const conn = await getPackagesDb() + log.info({ columns: idx.columns }, 'Creating index on package_dependencies') + await conn.result(idx.createSql) + rebuilt.push(idx.columns) + }), + ) // Remove cross-chunk duplicates before rebuilding the UNIQUE constraint. // DISTINCT ON deduplicates within a single chunk, but the same (root, dep) pair can appear in @@ -118,27 +120,43 @@ export async function rebuildPackageDepsIndexes(): Promise<{ // Run per partition (depends_on_id % 64 = p) so each iteration prunes to one of the 64 // partitions instead of scanning all 1.15B rows at once. Same total rows read; each pass // fits in work_mem and avoids cross-partition sort. + // Run in parallel batches of DEDUP_CONCURRENCY to cut wall-clock from ~10h to ~1-2h. const NUM_PARTITIONS = 64 + const DEDUP_CONCURRENCY = 8 let totalDedupDeleted = 0 - for (let p = 0; p < NUM_PARTITIONS; p++) { - const result = await qx.result(` - DELETE FROM package_dependencies pd - USING ( - SELECT id, depends_on_id - FROM ( - SELECT id, depends_on_id, - ROW_NUMBER() OVER ( - PARTITION BY version_id, depends_on_id, dependency_kind - ORDER BY id - ) AS rn - FROM package_dependencies - WHERE depends_on_id % ${NUM_PARTITIONS} = ${p} - ) sub - WHERE rn > 1 - ) dupes - WHERE pd.id = dupes.id AND pd.depends_on_id = dupes.depends_on_id - `) - totalDedupDeleted += result + for (let batch = 0; batch < NUM_PARTITIONS; batch += DEDUP_CONCURRENCY) { + const partitions = Array.from( + { length: Math.min(DEDUP_CONCURRENCY, NUM_PARTITIONS - batch) }, + (_, i) => batch + i, + ) + const counts = await Promise.all( + partitions.map(async (p) => { + const conn = await getPackagesDb() + // work_mem (not maintenance_work_mem) controls sort memory for window functions. + // 2GB avoids disk spill during the ROW_NUMBER() sort on each partition. + return conn.tx(async (tx) => { + await tx.result(`SET LOCAL work_mem = '2GB'`) + return tx.result(` + DELETE FROM package_dependencies pd + USING ( + SELECT id, depends_on_id + FROM ( + SELECT id, depends_on_id, + ROW_NUMBER() OVER ( + PARTITION BY version_id, depends_on_id, dependency_kind + ORDER BY id + ) AS rn + FROM package_dependencies + WHERE depends_on_id % ${NUM_PARTITIONS} = ${p} + ) sub + WHERE rn > 1 + ) dupes + WHERE pd.id = dupes.id AND pd.depends_on_id = dupes.depends_on_id + `) + }) + }), + ) + totalDedupDeleted += counts.reduce((sum, n) => sum + n, 0) } log.info( { rowsDeleted: totalDedupDeleted }, diff --git a/services/apps/packages_worker/src/deps-dev/activities/manageVersionsIndexes.ts b/services/apps/packages_worker/src/deps-dev/activities/manageVersionsIndexes.ts index 0d9eda343a..239385e609 100644 --- a/services/apps/packages_worker/src/deps-dev/activities/manageVersionsIndexes.ts +++ b/services/apps/packages_worker/src/deps-dev/activities/manageVersionsIndexes.ts @@ -86,20 +86,25 @@ export async function rebuildVersionsIndexes(): Promise<{ const existing: Array<{ indexdef: string }> = await qx.select(NON_CONSTRAINT_INDEXES_SQL) const existingDefs = existing.map((r) => r.indexdef.toLowerCase()) + const toRebuild = SECONDARY_INDEXES.filter( + (idx) => !existingDefs.some((def) => def.includes(idx.matchString ?? `(${idx.columns})`)), + ) const rebuilt: string[] = [] - for (const idx of SECONDARY_INDEXES) { - const alreadyExists = existingDefs.some((def) => - def.includes(idx.matchString ?? `(${idx.columns})`), - ) - if (alreadyExists) { - log.info({ columns: idx.columns }, 'Index already exists, skipping') - continue - } - log.info({ columns: idx.columns }, 'Creating index on versions') - await qx.result(idx.createSql) - rebuilt.push(idx.columns) - } + // Build indexes in parallel — each on its own connection so they run concurrently. + // maintenance_work_mem per connection: with 32 partitions and default 64MB, PG spills to + // disk on every partition; 2GB lets the sort fit in RAM and cuts build time dramatically. + await Promise.all( + toRebuild.map(async (idx) => { + const conn = await getPackagesDb() + await conn.tx(async (t) => { + await t.result(`SET LOCAL maintenance_work_mem = '2GB'`) + log.info({ columns: idx.columns }, 'Creating index on versions') + await t.result(idx.createSql) + }) + rebuilt.push(idx.columns) + }), + ) // Remove cross-chunk duplicates before rebuilding the UNIQUE constraint. // versions is HASH-partitioned by package_id (32 partitions). Loop over each partition table diff --git a/services/apps/packages_worker/src/deps-dev/activities/setJobStep.ts b/services/apps/packages_worker/src/deps-dev/activities/setJobStep.ts new file mode 100644 index 0000000000..e54ad673ce --- /dev/null +++ b/services/apps/packages_worker/src/deps-dev/activities/setJobStep.ts @@ -0,0 +1,8 @@ +import { mergeJobTableRowCounts } from '@crowd/data-access-layer' + +import { getPackagesDb } from '../../db' + +export async function setJobStep(input: { jobId: number; step: string }): Promise { + const qx = await getPackagesDb() + await mergeJobTableRowCounts(qx, input.jobId, { 'meta:step': input.step }) +} diff --git a/services/apps/packages_worker/src/deps-dev/queries/depsSql.ts b/services/apps/packages_worker/src/deps-dev/queries/depsSql.ts index 1ef88bf138..b713ed343c 100644 --- a/services/apps/packages_worker/src/deps-dev/queries/depsSql.ts +++ b/services/apps/packages_worker/src/deps-dev/queries/depsSql.ts @@ -1,6 +1,49 @@ -// ADR-0003 Option A: DependencyGraphEdgesLatest — has version_constraint; no NUGET support. -export function buildDepsFullSqlA(systems: string): string { - return ` +// NPM, MAVEN, PYPI, CARGO are in DependencyGraphEdgesLatest / DependenciesLatest. +// GO uses GoRequirementsLatest (DirectDependencies, no resolved to_version). +// NUGET uses NuGetRequirementsLatest (DependencyGroups → Dependencies, no resolved to_version). +// Confirmed via BQ query 2026-06-17: DependencyGraphEdgesLatest and DependenciesLatest +// both contain exactly {NPM, MAVEN, PYPI, CARGO} — GO and NUGET absent from both. + +const EDGE_SYSTEMS = new Set(['NPM', 'MAVEN', 'PYPI', 'CARGO']) + +export const DEPS_DEFAULT_ECOSYSTEMS = ['NPM', 'GO', 'MAVEN', 'PYPI', 'NUGET', 'CARGO'] + +// --- Full SQL helpers --- + +const GO_FULL_PART = ` +SELECT + 'go' AS ecosystem, + g.Name AS root_name, + g.Version AS root_version, + d.Name AS to_name, + CAST(NULL AS STRING) AS to_version, + d.Requirement AS version_constraint +FROM \`bigquery-public-data.deps_dev_v1.GoRequirementsLatest\` g, +UNNEST(g.DirectDependencies) AS d` + +// NuGet groups deps by TargetFramework — flatten all groups, dedup handled downstream +// by DISTINCT ON in MERGE_SQL_FULL and ON CONFLICT in MERGE_SQL. +const NUGET_FULL_PART = ` +SELECT + 'nuget' AS ecosystem, + n.Name AS root_name, + n.Version AS root_version, + dep.Name AS to_name, + CAST(NULL AS STRING) AS to_version, + dep.Requirement AS version_constraint +FROM \`bigquery-public-data.deps_dev_v1.NuGetRequirementsLatest\` n, +UNNEST(n.DependencyGroups) AS grp, +UNNEST(grp.Dependencies) AS dep` + +// ADR-0003 Option A: DependencyGraphEdgesLatest for NPM/MAVEN/PYPI/CARGO — has version_constraint. +// GO + NUGET always come from their ecosystem-specific tables regardless of option. +export function buildDepsFullSqlA(ecosystems: string[]): string { + const parts: string[] = [] + + const edgeSystems = ecosystems.filter((s) => EDGE_SYSTEMS.has(s)) + if (edgeSystems.length > 0) { + const filter = edgeSystems.map((s) => `'${s}'`).join(', ') + parts.push(` SELECT LOWER(e.System) AS ecosystem, e.Name AS root_name, @@ -9,19 +52,64 @@ SELECT e.To.Version AS to_version, e.Requirement AS version_constraint FROM \`bigquery-public-data.deps_dev_v1.DependencyGraphEdgesLatest\` e -WHERE e.System IN (${systems}) +WHERE e.System IN (${filter}) AND e.From.Name = e.Name - AND e.From.Version = e.Version -` + AND e.From.Version = e.Version`) + } + + if (ecosystems.includes('GO')) parts.push(GO_FULL_PART) + if (ecosystems.includes('NUGET')) parts.push(NUGET_FULL_PART) + + return parts.join('\nUNION ALL\n') +} + +// ADR-0003 Option B: DependenciesLatest for NPM/MAVEN/PYPI/CARGO — cheaper, no version_constraint. +// GO + NUGET same as Option A (ecosystem-specific tables, version_constraint available). +export function buildDepsFullSqlB(ecosystems: string[]): string { + const parts: string[] = [] + + const depsSystems = ecosystems.filter((s) => EDGE_SYSTEMS.has(s)) + if (depsSystems.length > 0) { + const filter = depsSystems.map((s) => `'${s}'`).join(', ') + parts.push(` +SELECT + LOWER(d.System) AS ecosystem, + d.Name AS root_name, + d.Version AS root_version, + d.Dependency.Name AS to_name, + d.Dependency.Version AS to_version, + CAST(NULL AS STRING) AS version_constraint +FROM \`bigquery-public-data.deps_dev_v1.DependenciesLatest\` d +WHERE d.System IN (${filter}) + AND d.MinimumDepth = 1`) + } + + if (ecosystems.includes('GO')) parts.push(GO_FULL_PART) + if (ecosystems.includes('NUGET')) parts.push(NUGET_FULL_PART) + + return parts.join('\nUNION ALL\n') } +// --- Incremental SQL helpers --- +// Uses base tables (GoRequirements, NuGetRequirements, DependencyGraphEdges) with SnapshotAt filter. +// All CTEs combined in one WITH clause so UNION ALL can reference them freely. + export function buildDepsIncrementalSqlA( today: string, watermark: string, - systems: string, + ecosystems: string[], ): string { - return ` -WITH today AS ( + const edgeSystems = ecosystems.filter((s) => EDGE_SYSTEMS.has(s)) + const includeGo = ecosystems.includes('GO') + const includeNuget = ecosystems.includes('NUGET') + + const ctes: string[] = [] + const selects: string[] = [] + + if (edgeSystems.length > 0) { + const filter = edgeSystems.map((s) => `'${s}'`).join(', ') + ctes.push( + `today_edges AS ( SELECT LOWER(e.System) AS ecosystem, e.Name AS root_name, @@ -32,50 +120,118 @@ WITH today AS ( FROM \`bigquery-public-data.deps_dev_v1.DependencyGraphEdges\` e WHERE e.SnapshotAt >= TIMESTAMP('${today}') AND e.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${today}', INTERVAL 1 DAY)) - AND e.System IN (${systems}) + AND e.System IN (${filter}) AND e.From.Name = e.Name AND e.From.Version = e.Version -), -last_watermark AS ( +)`, + `watermark_edges AS ( SELECT e.System, e.Name, e.Version, e.To.Name AS to_name, e.To.Version AS to_version FROM \`bigquery-public-data.deps_dev_v1.DependencyGraphEdges\` e WHERE e.SnapshotAt >= TIMESTAMP('${watermark}') AND e.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${watermark}', INTERVAL 1 DAY)) - AND e.System IN (${systems}) + AND e.System IN (${filter}) AND e.From.Name = e.Name AND e.From.Version = e.Version GROUP BY e.System, e.Name, e.Version, e.To.Name, e.To.Version -) -SELECT t.* -FROM today t -LEFT JOIN last_watermark l +)`, + ) + selects.push( + `SELECT t.* +FROM today_edges t +LEFT JOIN watermark_edges l ON LOWER(l.System) = t.ecosystem AND l.Name = t.root_name AND l.Version = t.root_version AND l.to_name = t.to_name AND l.to_version = t.to_version -WHERE l.to_name IS NULL -` -} +WHERE l.to_name IS NULL`, + ) + } -// ADR-0003 Option B: DependenciesLatest — cheaper (~65%), covers NUGET, but no version_constraint. -export function buildDepsFullSqlB(systems: string): string { - return ` -SELECT - LOWER(d.System) AS ecosystem, - d.Name AS root_name, - d.Version AS root_version, - d.Dependency.Name AS to_name, - d.Dependency.Version AS to_version, - CAST(NULL AS STRING) AS version_constraint -FROM \`bigquery-public-data.deps_dev_v1.DependenciesLatest\` d -WHERE d.System IN (${systems}) - AND d.MinimumDepth = 1 -` + if (includeGo) { + ctes.push( + `today_go AS ( + SELECT + 'go' AS ecosystem, + g.Name AS root_name, + g.Version AS root_version, + d.Name AS to_name, + CAST(NULL AS STRING) AS to_version, + d.Requirement AS version_constraint + FROM \`bigquery-public-data.deps_dev_v1.GoRequirements\` g, + UNNEST(g.DirectDependencies) AS d + WHERE g.SnapshotAt >= TIMESTAMP('${today}') + AND g.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${today}', INTERVAL 1 DAY)) +)`, + `watermark_go AS ( + SELECT g.Name, g.Version, d.Name AS to_name, d.Requirement + FROM \`bigquery-public-data.deps_dev_v1.GoRequirements\` g, + UNNEST(g.DirectDependencies) AS d + WHERE g.SnapshotAt >= TIMESTAMP('${watermark}') + AND g.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${watermark}', INTERVAL 1 DAY)) + GROUP BY g.Name, g.Version, d.Name, d.Requirement +)`, + ) + selects.push( + `SELECT t.* +FROM today_go t +LEFT JOIN watermark_go l + ON l.Name = t.root_name AND l.Version = t.root_version AND l.to_name = t.to_name + AND l.Requirement = t.version_constraint +WHERE l.to_name IS NULL`, + ) + } + + if (includeNuget) { + ctes.push( + `today_nuget AS ( + SELECT + 'nuget' AS ecosystem, + n.Name AS root_name, + n.Version AS root_version, + dep.Name AS to_name, + CAST(NULL AS STRING) AS to_version, + dep.Requirement AS version_constraint + FROM \`bigquery-public-data.deps_dev_v1.NuGetRequirements\` n, + UNNEST(n.DependencyGroups) AS grp, + UNNEST(grp.Dependencies) AS dep + WHERE n.SnapshotAt >= TIMESTAMP('${today}') + AND n.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${today}', INTERVAL 1 DAY)) +)`, + `watermark_nuget AS ( + SELECT n.Name, n.Version, dep.Name AS to_name, dep.Requirement + FROM \`bigquery-public-data.deps_dev_v1.NuGetRequirements\` n, + UNNEST(n.DependencyGroups) AS grp, + UNNEST(grp.Dependencies) AS dep + WHERE n.SnapshotAt >= TIMESTAMP('${watermark}') + AND n.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${watermark}', INTERVAL 1 DAY)) + GROUP BY n.Name, n.Version, dep.Name, dep.Requirement +)`, + ) + selects.push( + `SELECT t.* +FROM today_nuget t +LEFT JOIN watermark_nuget l + ON l.Name = t.root_name AND l.Version = t.root_version AND l.to_name = t.to_name + AND l.Requirement = t.version_constraint +WHERE l.to_name IS NULL`, + ) + } + + return `WITH\n${ctes.join(',\n')}\n${selects.join('\nUNION ALL\n')}` } export function buildDepsIncrementalSqlB( today: string, watermark: string, - systems: string, + ecosystems: string[], ): string { - return ` -WITH today AS ( + const depsSystems = ecosystems.filter((s) => EDGE_SYSTEMS.has(s)) + const includeGo = ecosystems.includes('GO') + const includeNuget = ecosystems.includes('NUGET') + + const ctes: string[] = [] + const selects: string[] = [] + + if (depsSystems.length > 0) { + const filter = depsSystems.map((s) => `'${s}'`).join(', ') + ctes.push( + `today_deps AS ( SELECT LOWER(d.System) AS ecosystem, d.Name AS root_name, @@ -86,38 +242,113 @@ WITH today AS ( FROM \`bigquery-public-data.deps_dev_v1.Dependencies\` d WHERE d.SnapshotAt >= TIMESTAMP('${today}') AND d.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${today}', INTERVAL 1 DAY)) - AND d.System IN (${systems}) + AND d.System IN (${filter}) AND d.MinimumDepth = 1 -), -last_watermark AS ( +)`, + `watermark_deps AS ( SELECT d.System, d.Name, d.Version, d.Dependency.Name AS to_name, d.Dependency.Version AS to_version FROM \`bigquery-public-data.deps_dev_v1.Dependencies\` d WHERE d.SnapshotAt >= TIMESTAMP('${watermark}') AND d.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${watermark}', INTERVAL 1 DAY)) - AND d.System IN (${systems}) + AND d.System IN (${filter}) AND d.MinimumDepth = 1 GROUP BY d.System, d.Name, d.Version, d.Dependency.Name, d.Dependency.Version -) -SELECT t.* -FROM today t -LEFT JOIN last_watermark l +)`, + ) + selects.push( + `SELECT t.* +FROM today_deps t +LEFT JOIN watermark_deps l ON LOWER(l.System) = t.ecosystem AND l.Name = t.root_name AND l.Version = t.root_version AND l.to_name = t.to_name AND l.to_version = t.to_version -WHERE l.to_name IS NULL -` +WHERE l.to_name IS NULL`, + ) + } + + if (includeGo) { + ctes.push( + `today_go AS ( + SELECT + 'go' AS ecosystem, + g.Name AS root_name, + g.Version AS root_version, + d.Name AS to_name, + CAST(NULL AS STRING) AS to_version, + d.Requirement AS version_constraint + FROM \`bigquery-public-data.deps_dev_v1.GoRequirements\` g, + UNNEST(g.DirectDependencies) AS d + WHERE g.SnapshotAt >= TIMESTAMP('${today}') + AND g.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${today}', INTERVAL 1 DAY)) +)`, + `watermark_go AS ( + SELECT g.Name, g.Version, d.Name AS to_name, d.Requirement + FROM \`bigquery-public-data.deps_dev_v1.GoRequirements\` g, + UNNEST(g.DirectDependencies) AS d + WHERE g.SnapshotAt >= TIMESTAMP('${watermark}') + AND g.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${watermark}', INTERVAL 1 DAY)) + GROUP BY g.Name, g.Version, d.Name, d.Requirement +)`, + ) + selects.push( + `SELECT t.* +FROM today_go t +LEFT JOIN watermark_go l + ON l.Name = t.root_name AND l.Version = t.root_version AND l.to_name = t.to_name + AND l.Requirement = t.version_constraint +WHERE l.to_name IS NULL`, + ) + } + + if (includeNuget) { + ctes.push( + `today_nuget AS ( + SELECT + 'nuget' AS ecosystem, + n.Name AS root_name, + n.Version AS root_version, + dep.Name AS to_name, + CAST(NULL AS STRING) AS to_version, + dep.Requirement AS version_constraint + FROM \`bigquery-public-data.deps_dev_v1.NuGetRequirements\` n, + UNNEST(n.DependencyGroups) AS grp, + UNNEST(grp.Dependencies) AS dep + WHERE n.SnapshotAt >= TIMESTAMP('${today}') + AND n.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${today}', INTERVAL 1 DAY)) +)`, + `watermark_nuget AS ( + SELECT n.Name, n.Version, dep.Name AS to_name, dep.Requirement + FROM \`bigquery-public-data.deps_dev_v1.NuGetRequirements\` n, + UNNEST(n.DependencyGroups) AS grp, + UNNEST(grp.Dependencies) AS dep + WHERE n.SnapshotAt >= TIMESTAMP('${watermark}') + AND n.SnapshotAt < TIMESTAMP(DATE_ADD(DATE '${watermark}', INTERVAL 1 DAY)) + GROUP BY n.Name, n.Version, dep.Name, dep.Requirement +)`, + ) + selects.push( + `SELECT t.* +FROM today_nuget t +LEFT JOIN watermark_nuget l + ON l.Name = t.root_name AND l.Version = t.root_version AND l.to_name = t.to_name + AND l.Requirement = t.version_constraint +WHERE l.to_name IS NULL`, + ) + } + + return `WITH\n${ctes.join(',\n')}\n${selects.join('\nUNION ALL\n')}` } -export function buildDepsFullSql(systems: string, tableOption: 'A' | 'B' = 'A'): string { - return tableOption === 'B' ? buildDepsFullSqlB(systems) : buildDepsFullSqlA(systems) +export function buildDepsFullSql(ecosystems: string[], tableOption: 'A' | 'B' = 'A'): string { + return tableOption === 'B' ? buildDepsFullSqlB(ecosystems) : buildDepsFullSqlA(ecosystems) } export function buildDepsIncrementalSql( today: string, watermark: string, - systems: string, + ecosystems: string[], tableOption: 'A' | 'B' = 'A', ): string { return tableOption === 'B' - ? buildDepsIncrementalSqlB(today, watermark, systems) - : buildDepsIncrementalSqlA(today, watermark, systems) + ? buildDepsIncrementalSqlB(today, watermark, ecosystems) + : buildDepsIncrementalSqlA(today, watermark, ecosystems) } diff --git a/services/apps/packages_worker/src/deps-dev/workflows/bootstrapOsspckgs.ts b/services/apps/packages_worker/src/deps-dev/workflows/bootstrapOsspckgs.ts index d185b4fc8d..0258fa40e7 100644 --- a/services/apps/packages_worker/src/deps-dev/workflows/bootstrapOsspckgs.ts +++ b/services/apps/packages_worker/src/deps-dev/workflows/bootstrapOsspckgs.ts @@ -63,6 +63,7 @@ export async function bootstrapOsspckgs(opts: { depsTableOption?: 'A' | 'B' exportName?: string snapshotDate?: string // YYYY-MM-DD — override BQ snapshot resolution for all partition-filtered kinds + fillConstraints?: boolean // re-export full deps BQ data, upsert version_constraint where NULL }): Promise { // B3: deterministic timestamps — workflowInfo().startTime is replay-stable; new Date() is not. const start = workflowInfo().startTime @@ -221,6 +222,7 @@ export async function bootstrapOsspckgs(opts: { reuseExports: opts.reuseExports, depsTableOption: opts.depsTableOption, exportName: opts.exportName, + fillConstraints: opts.fillConstraints, }, ], }) diff --git a/services/apps/packages_worker/src/deps-dev/workflows/ingestAdvisories.ts b/services/apps/packages_worker/src/deps-dev/workflows/ingestAdvisories.ts index 4669eba8ef..543e79b118 100644 --- a/services/apps/packages_worker/src/deps-dev/workflows/ingestAdvisories.ts +++ b/services/apps/packages_worker/src/deps-dev/workflows/ingestAdvisories.ts @@ -135,9 +135,10 @@ export async function ingestAdvisories(opts: { runId: opts.runId, syncMode: opts.syncMode, snapshotAt: opts.today, - maxBytesGb: 10, + maxBytesGb: 20, reuseExports: opts.reuseExports, exportName: opts.exportName, + ecosystems: opts.ecosystems, }) const { fileNames: advFileNames, rowCounts: advRowCounts } = await listParquetFiles({ @@ -211,6 +212,7 @@ export async function ingestAdvisories(opts: { maxBytesGb: 1500, reuseExports: opts.reuseExports, exportName: opts.exportName, + ecosystems: opts.ecosystems, }) const { fileNames: pkgFileNames, rowCounts: pkgRowCounts } = await listParquetFiles({ diff --git a/services/apps/packages_worker/src/deps-dev/workflows/ingestDependencies.ts b/services/apps/packages_worker/src/deps-dev/workflows/ingestDependencies.ts index cb19d55485..0309c079bb 100644 --- a/services/apps/packages_worker/src/deps-dev/workflows/ingestDependencies.ts +++ b/services/apps/packages_worker/src/deps-dev/workflows/ingestDependencies.ts @@ -1,8 +1,11 @@ import { proxyActivities } from '@temporalio/workflow' import type * as depsDevActivities from '../activities' -import { buildDepsFullSql, buildDepsIncrementalSql } from '../queries/depsSql' -import { toSystemsFilter } from '../queries/systems' +import { + DEPS_DEFAULT_ECOSYSTEMS, + buildDepsFullSql, + buildDepsIncrementalSql, +} from '../queries/depsSql' const { bqExportToGcs } = proxyActivities({ startToCloseTimeout: '2 hours', @@ -33,8 +36,8 @@ const { createVersionsLookup } = proxyActivities({ const { dropPackageDepsIndexes, rebuildPackageDepsIndexes } = proxyActivities< typeof depsDevActivities >({ - // Index builds on 1B+ rows can take hours — long timeout required. - startToCloseTimeout: '12 hours', + // Index builds + parallel dedup on 1B+ rows — 24h covers worst-case sequential retry. + startToCloseTimeout: '24 hours', retry: { maximumAttempts: 2, initialInterval: '1 minute' }, }) @@ -42,10 +45,15 @@ const { dropPackageDepsConstraints, rebuildPackageDepsConstraints } = proxyActiv typeof depsDevActivities >({ // FK validation on 1B+ rows can take hours. - startToCloseTimeout: '12 hours', + startToCloseTimeout: '24 hours', retry: { maximumAttempts: 2, initialInterval: '1 minute' }, }) +const { setJobStep } = proxyActivities({ + startToCloseTimeout: '30 seconds', + retry: { maximumAttempts: 3 }, +}) + const STAGING_TABLE = 'staging.osspckgs_deps_raw' const STAGING_DDL = ` @@ -135,6 +143,45 @@ LEFT JOIN versions dv ON dv.package_id = pd.id AND dv.number = sp.to_version ORDER BY pv.id, pd.id, sp.to_version DESC NULLS LAST ` +// Fill-constraints variant: UNIQUE constraint stays in place (not dropped), so ON CONFLICT is valid. +// Upserts version_constraint only for rows where it is currently NULL — safe to run against a table +// already populated by --deps-table-b (which sets version_constraint = NULL for all rows). +// DISTINCT ON matches MERGE_SQL_FULL to resolve duplicate (root, dep) pairs from BQ before the upsert. +const MERGE_SQL_FILL_CONSTRAINTS = ` +INSERT INTO package_dependencies ( + package_id, version_id, depends_on_id, depends_on_version_id, + version_constraint, dependency_kind, is_optional, created_at, updated_at +) +SELECT DISTINCT ON (pv.id, pd.id) + pv.package_id, pv.id, pd.id, dv.id, + sp.version_constraint, 'direct', FALSE, NOW(), NOW() +FROM staging.osspckgs_deps_raw sp +JOIN staging.osspckgs_versions_lookup pv ON pv.ecosystem = sp.ecosystem + AND pv.ns = CASE + WHEN sp.ecosystem = 'maven' THEN SPLIT_PART(sp.root_name, ':', 1) + WHEN sp.root_name LIKE '@%/%' THEN SPLIT_PART(sp.root_name, '/', 1) + ELSE '' END + AND pv.name = CASE + WHEN sp.ecosystem = 'maven' THEN SPLIT_PART(sp.root_name, ':', 2) + WHEN sp.root_name LIKE '@%/%' THEN SPLIT_PART(sp.root_name, '/', 2) + ELSE sp.root_name END + AND pv.number = sp.root_version +JOIN packages pd ON pd.ecosystem = sp.ecosystem + AND COALESCE(pd.namespace, '') = CASE + WHEN sp.ecosystem = 'maven' THEN SPLIT_PART(sp.to_name, ':', 1) + WHEN sp.to_name LIKE '@%/%' THEN SPLIT_PART(sp.to_name, '/', 1) + ELSE '' END + AND pd.name = CASE + WHEN sp.ecosystem = 'maven' THEN SPLIT_PART(sp.to_name, ':', 2) + WHEN sp.to_name LIKE '@%/%' THEN SPLIT_PART(sp.to_name, '/', 2) + ELSE sp.to_name END +LEFT JOIN versions dv ON dv.package_id = pd.id AND dv.number = sp.to_version +ORDER BY pv.id, pd.id, sp.to_version DESC NULLS LAST +ON CONFLICT (version_id, depends_on_id, dependency_kind) DO UPDATE + SET version_constraint = EXCLUDED.version_constraint + WHERE package_dependencies.version_constraint IS NULL +` + // SET LOCAL scopes settings to this transaction only. // synchronous_commit=off skips WAL flush wait — safe for plain INSERT on full loads. // max_parallel_workers_per_gather parallelises the SELECT side of INSERT...SELECT. @@ -166,13 +213,17 @@ export async function ingestDependencies(opts: { reuseExports?: boolean depsTableOption?: 'A' | 'B' exportName?: string + fillConstraints?: boolean // re-export full BQ data, upsert version_constraint only where NULL }): Promise<{ rowCountBq: number }> { - const systems = toSystemsFilter(opts.ecosystems) - const tableOption = opts.depsTableOption ?? 'A' + const ecosystems = opts.ecosystems ?? DEPS_DEFAULT_ECOSYSTEMS + const isFill = opts.fillConstraints === true + // Fill mode forces Option A — Option B selects NULL for version_constraint, making the fill a no-op. + const tableOption = isFill ? 'A' : (opts.depsTableOption ?? 'A') + // Fill mode always uses full SQL — needs all rows to find which have NULL version_constraint in DB. const sql = - opts.syncMode === 'full' - ? buildDepsFullSql(systems, tableOption) - : buildDepsIncrementalSql(opts.today, opts.watermark ?? '', systems, tableOption) + opts.syncMode === 'full' || isFill + ? buildDepsFullSql(ecosystems, tableOption) + : buildDepsIncrementalSql(opts.today, opts.watermark ?? '', ecosystems, tableOption) const exportResult = await bqExportToGcs({ jobKind: 'package_dependencies', @@ -180,9 +231,10 @@ export async function ingestDependencies(opts: { runId: opts.runId, syncMode: opts.syncMode, snapshotAt: opts.today, - maxBytesGb: 10000, + maxBytesGb: opts.syncMode === 'full' || isFill ? 25000 : 10000, reuseExports: opts.reuseExports, exportName: opts.exportName, + ecosystems, }) const { fileNames, rowCounts } = await listParquetFiles({ gcsPrefix: exportResult.gcsPrefix }) @@ -199,10 +251,13 @@ export async function ingestDependencies(opts: { return { rowCountBq: exportResult.rowCount } } - await createVersionsLookup({ ecosystems: opts.ecosystems }) + await setJobStep({ jobId: exportResult.jobId, step: 'creating_lookup' }) + await createVersionsLookup({ ecosystems }) - if (opts.syncMode === 'full') { + if (opts.syncMode === 'full' && !isFill) { + await setJobStep({ jobId: exportResult.jobId, step: 'drop_constraints' }) await dropPackageDepsConstraints() + await setJobStep({ jobId: exportResult.jobId, step: 'drop_indexes' }) await dropPackageDepsIndexes() } @@ -219,7 +274,9 @@ export async function ingestDependencies(opts: { for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) { const start = chunkIndex * filesPerChunk const chunk = fileNames.slice(start, start + filesPerChunk) - const isFinal = chunkIndex === totalChunks - 1 + const isLastChunk = chunkIndex === totalChunks - 1 + // For full-load (non-fill), don't mark done here — rebuild runs after the chunk loop + const isFinal = isLastChunk && !(opts.syncMode === 'full' && !isFill) const { rowsLoaded } = await gcsParquetToStaging({ jobId: exportResult.jobId, @@ -236,7 +293,11 @@ export async function ingestDependencies(opts: { const { rowsAffected, tableRowCounts } = await mergeStagingToTable({ jobId: exportResult.jobId, prepareSql: MERGE_PREPARE_SQL, - mergeSql: opts.syncMode === 'full' ? MERGE_SQL_FULL : MERGE_SQL, + mergeSql: isFill + ? MERGE_SQL_FILL_CONSTRAINTS + : opts.syncMode === 'full' + ? MERGE_SQL_FULL + : MERGE_SQL, tableNames: 'package_dependencies', isFinal, priorRowsAffected, @@ -252,18 +313,31 @@ export async function ingestDependencies(opts: { } } - if (opts.syncMode === 'full') { + if (opts.syncMode === 'full' && !isFill) { + await setJobStep({ jobId: exportResult.jobId, step: 'rebuild_indexes' }) await rebuildPackageDepsIndexes() + await setJobStep({ jobId: exportResult.jobId, step: 'rebuild_constraints' }) await rebuildPackageDepsConstraints() + // Finalize after rebuild — marks done with correct finishedAt + await mergeStagingToTable({ + jobId: exportResult.jobId, + mergeSql: [], + tableNames: [], + isFinal: true, + priorRowsAffected, + priorTableRowCounts, + }) } } catch (err) { - if (opts.syncMode === 'full') { + if (opts.syncMode === 'full' && !isFill) { try { + await setJobStep({ jobId: exportResult.jobId, step: 'rebuild_indexes' }) await rebuildPackageDepsIndexes() } catch (_) { /* best-effort */ } try { + await setJobStep({ jobId: exportResult.jobId, step: 'rebuild_constraints' }) await rebuildPackageDepsConstraints() } catch (_) { /* best-effort */ diff --git a/services/apps/packages_worker/src/deps-dev/workflows/ingestDependentCounts.ts b/services/apps/packages_worker/src/deps-dev/workflows/ingestDependentCounts.ts index 030ef45869..e9df9b9ad8 100644 --- a/services/apps/packages_worker/src/deps-dev/workflows/ingestDependentCounts.ts +++ b/services/apps/packages_worker/src/deps-dev/workflows/ingestDependentCounts.ts @@ -29,6 +29,11 @@ const { checkDependentCountsGuard } = proxyActivities( retry: { maximumAttempts: 3 }, }) +const { setJobStep } = proxyActivities({ + startToCloseTimeout: '30 seconds', + retry: { maximumAttempts: 3 }, +}) + const STAGING_TABLE = 'staging.osspckgs_dependent_counts_raw' // Two-statement DDL: DROP before CREATE so an existing table with the old dependent_packages_count @@ -86,6 +91,7 @@ export async function ingestDependentCounts(opts: { const totalFiles = fileNames.length const totalRows = totalFiles > 0 ? rowCounts.reduce((a, b) => a + b, 0) : 0 + await setJobStep({ jobId: exportResult.jobId, step: 'guard_check' }) const guard = await checkDependentCountsGuard({ currentRowCount: totalRows, snapshotDate: opts.snapshotDate, diff --git a/services/apps/packages_worker/src/deps-dev/workflows/ingestPackages.ts b/services/apps/packages_worker/src/deps-dev/workflows/ingestPackages.ts index e87a0674bf..083ec0c44c 100644 --- a/services/apps/packages_worker/src/deps-dev/workflows/ingestPackages.ts +++ b/services/apps/packages_worker/src/deps-dev/workflows/ingestPackages.ts @@ -110,6 +110,7 @@ export async function ingestPackages(opts: { maxBytesGb: opts.syncMode === 'full' ? 6000 : 400, reuseExports: opts.reuseExports, exportName: opts.exportName, + ecosystems: opts.ecosystems, }) const { fileNames, rowCounts } = await listParquetFiles({ gcsPrefix: exportResult.gcsPrefix }) diff --git a/services/apps/packages_worker/src/deps-dev/workflows/ingestRepos.ts b/services/apps/packages_worker/src/deps-dev/workflows/ingestRepos.ts index c5a8cb7a6e..f09f6ff3ba 100644 --- a/services/apps/packages_worker/src/deps-dev/workflows/ingestRepos.ts +++ b/services/apps/packages_worker/src/deps-dev/workflows/ingestRepos.ts @@ -117,6 +117,7 @@ export async function ingestRepos(opts: { maxBytesGb: 2000, reuseExports: opts.reuseExports, exportName: opts.exportName, + ecosystems: opts.ecosystems, }) const { fileNames: repoFileNames, rowCounts: repoRowCounts } = await listParquetFiles({ @@ -187,6 +188,7 @@ export async function ingestRepos(opts: { maxBytesGb: 2000, reuseExports: opts.reuseExports, exportName: opts.exportName, + ecosystems: opts.ecosystems, }) const { fileNames: pkgRepoFileNames, rowCounts: pkgRepoRowCounts } = await listParquetFiles({ diff --git a/services/apps/packages_worker/src/deps-dev/workflows/ingestVersions.ts b/services/apps/packages_worker/src/deps-dev/workflows/ingestVersions.ts index 57debeb1f8..af0cd716cb 100644 --- a/services/apps/packages_worker/src/deps-dev/workflows/ingestVersions.ts +++ b/services/apps/packages_worker/src/deps-dev/workflows/ingestVersions.ts @@ -39,6 +39,11 @@ const { dropVersionsConstraints, rebuildVersionsConstraints } = proxyActivities< retry: { maximumAttempts: 2, initialInterval: '1 minute' }, }) +const { setJobStep } = proxyActivities({ + startToCloseTimeout: '30 seconds', + retry: { maximumAttempts: 3 }, +}) + const STAGING_TABLE = 'staging.osspckgs_versions_raw' const STAGING_DDL = ` @@ -126,9 +131,10 @@ export async function ingestVersions(opts: { runId: opts.runId, syncMode: opts.syncMode, snapshotAt: opts.today, - maxBytesGb: 400, + maxBytesGb: opts.syncMode === 'full' ? 800 : 400, reuseExports: opts.reuseExports, exportName: opts.exportName, + ecosystems: opts.ecosystems, }) const { fileNames, rowCounts } = await listParquetFiles({ gcsPrefix: exportResult.gcsPrefix }) @@ -145,7 +151,9 @@ export async function ingestVersions(opts: { } if (opts.syncMode === 'full') { + await setJobStep({ jobId: exportResult.jobId, step: 'drop_constraints' }) await dropVersionsConstraints() + await setJobStep({ jobId: exportResult.jobId, step: 'drop_indexes' }) await dropVersionsIndexes() } @@ -163,7 +171,9 @@ export async function ingestVersions(opts: { for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) { const start = chunkIndex * filesPerChunk const chunk = fileNames.slice(start, start + filesPerChunk) - const isFinal = chunkIndex === totalChunks - 1 + const isLastChunk = chunkIndex === totalChunks - 1 + // For full-load, don't mark done here — rebuild runs after the chunk loop + const isFinal = isLastChunk && opts.syncMode !== 'full' const { rowsLoaded } = await gcsParquetToStaging({ jobId: exportResult.jobId, @@ -198,17 +208,30 @@ export async function ingestVersions(opts: { } if (opts.syncMode === 'full') { + await setJobStep({ jobId: exportResult.jobId, step: 'rebuild_indexes' }) await rebuildVersionsIndexes() + await setJobStep({ jobId: exportResult.jobId, step: 'rebuild_constraints' }) await rebuildVersionsConstraints() + // Finalize after rebuild — marks done with correct finishedAt + await mergeStagingToTable({ + jobId: exportResult.jobId, + mergeSql: [], + tableNames: [], + isFinal: true, + priorRowsAffected, + priorTableRowCounts, + }) } } catch (err) { if (opts.syncMode === 'full') { try { + await setJobStep({ jobId: exportResult.jobId, step: 'rebuild_indexes' }) await rebuildVersionsIndexes() } catch (_) { /* best-effort */ } try { + await setJobStep({ jobId: exportResult.jobId, step: 'rebuild_constraints' }) await rebuildVersionsConstraints() } catch (_) { /* best-effort */ diff --git a/services/apps/packages_worker/src/scorecard/workflows/ingestScorecard.ts b/services/apps/packages_worker/src/scorecard/workflows/ingestScorecard.ts index de4b2773a2..8c56fc96eb 100644 --- a/services/apps/packages_worker/src/scorecard/workflows/ingestScorecard.ts +++ b/services/apps/packages_worker/src/scorecard/workflows/ingestScorecard.ts @@ -38,21 +38,26 @@ CREATE UNLOGGED TABLE IF NOT EXISTS staging.osspckgs_scorecard_repos_raw ( // Cast to timestamptz happens in merge SQL. const SCORECARD_REPOS_PG_COLUMNS = ['repo_url', 'score', 'scanned_at'] +// Two-CTE pattern prevents deadlocks with concurrent repos UPDATE transactions +// (e.g. github-repos-enricher). `locked` materialises candidates then acquires +// row-level locks in ascending repos.id order — consistent ordering eliminates +// the circular-wait condition. The outer UPDATE touches already-locked rows only. const SCORECARD_REPOS_MERGE_SQL = ` -UPDATE repos r -SET scorecard_score = CASE +WITH candidates AS ( + SELECT + r.id, + CASE WHEN s.score IS NULL OR s.score = 'NaN'::float8 OR s.score = 'Infinity'::float8 OR s.score = '-Infinity'::float8 THEN NULL ELSE s.score::numeric(3,1) - END, - scorecard_last_run_at = s.scanned_at::timestamptz, - updated_at = NOW() -FROM (SELECT * FROM staging.osspckgs_scorecard_repos_raw ORDER BY repo_url) s -WHERE r.url = s.repo_url - AND ( + END AS new_score, + s.scanned_at::timestamptz AS new_scanned_at + FROM staging.osspckgs_scorecard_repos_raw s + JOIN repos r ON r.url = s.repo_url + WHERE r.scorecard_score IS DISTINCT FROM CASE WHEN s.score IS NULL OR s.score = 'NaN'::float8 @@ -62,7 +67,16 @@ WHERE r.url = s.repo_url ELSE s.score::numeric(3,1) END OR r.scorecard_last_run_at IS DISTINCT FROM s.scanned_at::timestamptz - ) +), +locked AS ( + SELECT * FROM candidates ORDER BY id FOR UPDATE +) +UPDATE repos r +SET scorecard_score = locked.new_score, + scorecard_last_run_at = locked.new_scanned_at, + updated_at = NOW() +FROM locked +WHERE r.id = locked.id ` const SCORECARD_CHECKS_STAGING_TABLE = 'staging.osspckgs_scorecard_checks_raw' diff --git a/services/apps/packages_worker/src/scripts/dedupPackageDeps.ts b/services/apps/packages_worker/src/scripts/dedupPackageDeps.ts new file mode 100644 index 0000000000..64501037b1 --- /dev/null +++ b/services/apps/packages_worker/src/scripts/dedupPackageDeps.ts @@ -0,0 +1,114 @@ +#!/usr/bin/env tsx + +/** + * Dedup cross-chunk duplicate rows from package_dependencies, then rebuild + * the UNIQUE constraint (version_id, depends_on_id, dependency_kind). + * + * Run after a full-load where the constraint was dropped before bulk INSERT. + * + * Usage: + * pnpm dedup-package-deps [--concurrency ] [--dry-run] + * + * --concurrency Partitions to process in parallel (default: 8). + * --dry-run Count duplicates without deleting. + */ +import { getServiceChildLogger } from '@crowd/logging' + +import { getPackagesDb } from '../db' + +const log = getServiceChildLogger('dedupPackageDeps') + +const NUM_PARTITIONS = 64 + +async function processPartition(p: number, dryRun: boolean): Promise { + log.info({ partition: p }, dryRun ? 'counting partition' : 'deduping partition') + const conn = await getPackagesDb() + const count = await conn.tx(async (tx) => { + await tx.result(`SET LOCAL work_mem = '2GB'`) + if (dryRun) { + const row = await tx.selectOne(` + SELECT COUNT(*) AS cnt + FROM ( + SELECT ROW_NUMBER() OVER ( + PARTITION BY version_id, depends_on_id, dependency_kind + ORDER BY id + ) AS rn + FROM package_dependencies + WHERE depends_on_id % ${NUM_PARTITIONS} = ${p} + ) sub + WHERE rn > 1 + `) + return Number(row.cnt) + } + return tx.result(` + DELETE FROM package_dependencies pd + USING ( + SELECT id, depends_on_id FROM ( + SELECT id, depends_on_id, + ROW_NUMBER() OVER ( + PARTITION BY version_id, depends_on_id, dependency_kind + ORDER BY id + ) AS rn + FROM package_dependencies + WHERE depends_on_id % ${NUM_PARTITIONS} = ${p} + ) sub + WHERE rn > 1 + ) dupes + WHERE pd.id = dupes.id AND pd.depends_on_id = dupes.depends_on_id + `) + }) + log.info({ partition: p, count }, dryRun ? 'partition counted' : 'partition done') + return count +} + +async function main(): Promise { + const args = process.argv.slice(2) + const concurrencyIdx = args.indexOf('--concurrency') + const concurrency = concurrencyIdx !== -1 ? Number(args[concurrencyIdx + 1]) : 8 + if (!Number.isInteger(concurrency) || concurrency <= 0) { + throw new Error(`--concurrency must be a positive integer, got: ${args[concurrencyIdx + 1]}`) + } + const dryRun = args.includes('--dry-run') + + if (dryRun) log.info('DRY RUN — counting only') + log.info({ concurrency, numPartitions: NUM_PARTITIONS }, 'Starting dedup') + + let total = 0 + for (let batch = 0; batch < NUM_PARTITIONS; batch += concurrency) { + const partitions = Array.from( + { length: Math.min(concurrency, NUM_PARTITIONS - batch) }, + (_, i) => batch + i, + ) + const counts = await Promise.all(partitions.map((p) => processPartition(p, dryRun))) + total += counts.reduce((s, n) => s + n, 0) + log.info({ partitions, total }, dryRun ? 'batch counted' : 'batch done') + } + + log.info({ total }, dryRun ? 'duplicates found' : 'dedup complete') + + if (dryRun) return + + const qx = await getPackagesDb() + const existing = await qx.selectOneOrNone(` + SELECT conname FROM pg_constraint c + JOIN pg_class t ON t.oid = c.conrelid + WHERE t.relname = 'package_dependencies' AND c.contype = 'u' + LIMIT 1 + `) + if (existing) { + log.info({ constraint: existing.conname }, 'UNIQUE constraint already exists') + } else { + log.info('Rebuilding UNIQUE constraint') + await qx.result( + `ALTER TABLE package_dependencies ADD UNIQUE (version_id, depends_on_id, dependency_kind)`, + ) + log.info('UNIQUE constraint rebuilt') + } +} + +main() + .then(() => process.exit(0)) + .catch((err) => { + log.error(err, 'Fatal error') + process.exit(1) + }) diff --git a/services/apps/packages_worker/src/scripts/exportToBucket.ts b/services/apps/packages_worker/src/scripts/exportToBucket.ts index 62d60b987d..60c4054f97 100644 --- a/services/apps/packages_worker/src/scripts/exportToBucket.ts +++ b/services/apps/packages_worker/src/scripts/exportToBucket.ts @@ -10,7 +10,7 @@ import { extractBqStats } from '../deps-dev/bqStats' import { GCS_BUCKET, bigquery, bucket } from '../deps-dev/config' import { ADVISORIES_SQL, buildAdvisoryPackagesSql } from '../deps-dev/queries/advisoriesSql' import { buildDependentCountsSql } from '../deps-dev/queries/dependentCountsSql' -import { buildDepsFullSql } from '../deps-dev/queries/depsSql' +import { DEPS_DEFAULT_ECOSYSTEMS, buildDepsFullSql } from '../deps-dev/queries/depsSql' import { buildPackageReposSql } from '../deps-dev/queries/packageReposSql' import { buildPackagesFullSql } from '../deps-dev/queries/packagesSql' import { buildReposSql } from '../deps-dev/queries/reposSql' @@ -292,7 +292,7 @@ async function main(): Promise { const SQL_BY_PART: Record = { packages: buildPackagesFullSql(systems), versions: buildVersionsFullSql(systems), - deps: buildDepsFullSql(systems, depsTableOption), + deps: buildDepsFullSql(ecosystems ?? DEPS_DEFAULT_ECOSYSTEMS, depsTableOption), repos: buildReposSql(reposSnapshotDate ?? today, systems), package_repos: buildPackageReposSql(reposSnapshotDate ?? today, systems), counts: buildDependentCountsSql(countsSnapshotDate ?? today), diff --git a/services/apps/packages_worker/src/scripts/monitorOsspckgs.ts b/services/apps/packages_worker/src/scripts/monitorOsspckgs.ts index 8df57a4622..e2e325daaf 100644 --- a/services/apps/packages_worker/src/scripts/monitorOsspckgs.ts +++ b/services/apps/packages_worker/src/scripts/monitorOsspckgs.ts @@ -110,10 +110,12 @@ const STATUS_ICON = { cleaned: '–', } -function statusStr(status: string) { - const c = STATUS_COLOR[status as keyof typeof STATUS_COLOR] ?? '' - const i = STATUS_ICON[status as keyof typeof STATUS_ICON] ?? '?' - return `${c}${i} ${status}${A.reset}` +function statusStr(status: string, step?: string | null, stuck?: boolean) { + const c = stuck ? A.yellow : (STATUS_COLOR[status as keyof typeof STATUS_COLOR] ?? '') + const i = stuck ? '⚠' : (STATUS_ICON[status as keyof typeof STATUS_ICON] ?? '?') + const label = + step && !['done', 'failed', 'cleaned'].includes(status) ? `${status}·${step}` : status + return `${c}${i} ${label}${A.reset}` } function fmtNum(n: unknown) { @@ -197,10 +199,11 @@ function padCell(s: string, len: number, bg: string) { // eslint-disable-next-line no-control-regex const visible = s.replace(/\x1b\[[0-9;]*m/g, '') if (visible.length > len) { - // Walk raw string, skip escape sequences, cut at len-1 visible chars then add … + // Walk raw string, skip escape sequences, cut at len-2 visible chars then add "… " + // (space after ellipsis ensures visual gap between columns even when truncated) let vis = 0 let i = 0 - while (i < s.length && vis < len - 1) { + while (i < s.length && vis < len - 2) { if (s[i] === '\x1b') { const end = s.indexOf('m', i) i = end !== -1 ? end + 1 : i + 1 @@ -209,7 +212,7 @@ function padCell(s: string, len: number, bg: string) { i++ } } - return s.slice(0, i) + A.reset + bg + '…' + return s.slice(0, i) + A.reset + bg + '… ' } const pad = Math.max(0, len - visible.length) return s + bg + ' '.repeat(pad) @@ -265,10 +268,16 @@ async function fetchTableCounts(): Promise> { const KNOWN_ECOSYSTEMS = ['npm', 'go', 'maven', 'pypi', 'nuget', 'cargo'] -// Parses ecosystem names from gcs_prefix or export_name. -// Looks for -- or -/ patterns so "go" doesn't match inside "cargo". +// Reads ecosystems from table_row_counts['meta:ecosystems'] (new jobs) or falls back to +// parsing gcs_prefix/export_name for jobs created before the meta key was added. // eslint-disable-next-line @typescript-eslint/no-explicit-any function extractEcosystem(job: any) { + const trc = job.table_row_counts ?? {} + const meta = trc['meta:ecosystems'] + if (Array.isArray(meta) && meta.length > 0) { + return meta.map((e: string) => e.toLowerCase()).join(',') + } + // Fallback: parse GCS path for old rows const src = [job.gcs_prefix ?? '', job.export_name ?? ''].join(' ').toLowerCase() const found = KNOWN_ECOSYSTEMS.filter((e) => new RegExp(`(^|[-/,])${e}([-/,]|$)`).test(src)) return found.length > 0 ? found.join(',') : null @@ -277,18 +286,18 @@ function extractEcosystem(job: any) { // ── Layout ──────────────────────────────────────────────────────────────────── const COL = { - id: 12, + id: 18, kind: 20, - eco: 10, - status: 18, + eco: 15, + status: 26, mode: 6, bq: 10, - cost: 10, + cost: 8, files: 24, - staging: 12, + staging: 10, pg: 14, table: 18, - elapsed: 12, + elapsed: 10, chunk: 26, total: 24, } @@ -393,7 +402,17 @@ function tableRow(job: any, selected: boolean) { const startedTime = job.started_at ? new Date(job.started_at as string).toISOString().slice(11, 16) : '—' - const idCell = `${String(job.id)} ${A.dim}${startedTime}${A.reset}${bg}` + const isFinished = ['done', 'failed', 'cleaned'].includes(job.status) + const jobDuration = + isFinished && job.started_at && job.finished_at + ? fmtEtaStr( + new Date(job.finished_at as string).getTime() - + new Date(job.started_at as string).getTime(), + ) + : null + const idCell = jobDuration + ? `${String(job.id)} ${A.dim}${startedTime} ${jobDuration}${A.reset}${bg}` + : `${String(job.id)} ${A.dim}${startedTime}${A.reset}${bg}` return ( bg + @@ -408,7 +427,17 @@ function tableRow(job: any, selected: boolean) { COL.eco, bg, ) + - padCell(statusStr(job.status), COL.status, bg) + + padCell( + statusStr( + job.status, + typeof (job.table_row_counts ?? {})['meta:step'] === 'string' + ? (job.table_row_counts['meta:step'] as string) + : null, + stuckIds.has(job.id as number), + ), + COL.status, + bg, + ) + padCell(fmtMode(job.sync_mode), COL.mode, bg) + padCell(fmtCompact(job.row_count_bq), COL.bq, bg) + padCell(fmtUsd(job.bq_bytes_billed), COL.cost, bg) + @@ -436,8 +465,13 @@ function renderDetail(job: any, cols: number) { ? A.dim + String(job.provisional_snapshot_at).slice(0, 10) + ' (provisional)' + A.reset : A.dim + '—' + A.reset + const trcStep = + typeof (job.table_row_counts ?? {})['meta:step'] === 'string' + ? (job.table_row_counts['meta:step'] as string) + : null + const isStuckJob = stuckIds.has(job.id as number) lines.push( - ` ${A.bold}Job #${job.id} — ${job.job_kind}${A.reset} ${statusStr(job.status)} ${A.dim}${job.sync_mode}${A.reset}`, + ` ${A.bold}Job #${job.id} — ${job.job_kind}${A.reset} ${statusStr(job.status, trcStep, isStuckJob)} ${A.dim}${job.sync_mode}${A.reset}`, ) lines.push(` ${sep}`) lines.push(` ${A.dim}snapshot:${A.reset} ${snapshotDate}`) @@ -507,7 +541,11 @@ function renderDetail(job: any, cols: number) { const ref = stagRows || bqRows // prefer staging as denominator for final rows const finalKeys = Object.keys(trc).filter( - (k) => !k.startsWith('bq:') && !k.startsWith('staging:') && !k.startsWith('progress:'), + (k) => + !k.startsWith('bq:') && + !k.startsWith('staging:') && + !k.startsWith('progress:') && + !k.startsWith('meta:'), ) if (finalKeys.length > 0) { for (const k of finalKeys) { @@ -578,7 +616,33 @@ function renderDetail(job: any, cols: number) { let jobs: any[] = [] let tableCounts: Record = {} let watermarks: Record = {} +let stuckIds: Set = new Set() + +// A job is "stuck" when it's still in a non-terminal state but a newer job of the same kind +// has already completed — meaning the workflow died without ever finishing this job. +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function computeStuckIds(jobList: any[]): Set { + const latestDone = new Map() + for (const job of jobList) { + if (['done', 'cleaned'].includes(job.status) && job.started_at) { + const t = new Date(job.started_at as string).getTime() + if (!latestDone.has(job.job_kind) || t > (latestDone.get(job.job_kind) ?? 0)) { + latestDone.set(job.job_kind, t) + } + } + } + const result = new Set() + for (const job of jobList) { + if (['done', 'failed', 'cleaned', 'pending'].includes(job.status)) continue + const latest = latestDone.get(job.job_kind) + if (latest && job.started_at && new Date(job.started_at as string).getTime() < latest) { + result.add(job.id as number) + } + } + return result +} let selected = 0 +let scrollOffset = 0 let detailOpen = false let lastRefresh: string | null = null let error: string | null = null @@ -654,7 +718,7 @@ function computeChunkEta(job: any) { const hist = chunkMergeHistory.get(job.id) if (!hist || hist.mergeStart == null) return null - let rateRowsPerMs + let rateRowsPerMs: number if (hist.completedChunks.length > 0) { // Exponential recency weighting: chunk i gets weight 2^i (oldest=0, newest=n-1). // Most recent chunk contributes ~50% of the rate; history stabilises it. @@ -774,13 +838,19 @@ function render() { const detailHeight = detailOpen ? Math.min(detailLines.length + 2, Math.floor(rows * 0.55)) : 0 const listHeight = rows - listStart - detailHeight - 1 + // Keep scrollOffset in bounds so selected row is always visible + if (selected < scrollOffset) scrollOffset = selected + if (selected >= scrollOffset + listHeight) scrollOffset = selected - listHeight + 1 + scrollOffset = Math.max(0, Math.min(scrollOffset, Math.max(0, jobs.length - listHeight))) + const listEnd = listStart + listHeight for (let i = 0; i < listHeight; i++) { - const job = jobs[i] + const jobIndex = scrollOffset + i + const job = jobs[jobIndex] if (!job) { writeln(listStart + i, 1, '') } else { - writeln(listStart + i, 1, tableRow(job, i === selected)) + writeln(listStart + i, 1, tableRow(job, jobIndex === selected)) } } @@ -813,6 +883,8 @@ async function refresh() { tableCounts = newTableCounts watermarks = newWatermarks if (selected >= jobs.length) selected = Math.max(0, jobs.length - 1) + scrollOffset = Math.max(0, Math.min(scrollOffset, Math.max(0, jobs.length - 1))) + stuckIds = computeStuckIds(jobs) lastRefresh = new Date().toLocaleTimeString() error = null } catch (e) { diff --git a/services/apps/packages_worker/src/scripts/triggerBootstrap.ts b/services/apps/packages_worker/src/scripts/triggerBootstrap.ts index 1732862d35..e688cb42a5 100644 --- a/services/apps/packages_worker/src/scripts/triggerBootstrap.ts +++ b/services/apps/packages_worker/src/scripts/triggerBootstrap.ts @@ -37,6 +37,10 @@ Options: --deps-table-b Use DependenciesLatest instead of DependencyGraphEdgesLatest for deps. Cheaper (~$4.69 vs $12.67 for CARGO) but loses version_constraint. ADR-0003 Option B. Good for local testing. + --fill-constraints Re-export full BQ deps data and upsert version_constraint for + rows where it is currently NULL. Use after a --deps-table-b run + to backfill missing version_constraint values without re-inserting + rows. Only affects package_dependencies. --help Show this help Examples: @@ -63,6 +67,7 @@ async function main(): Promise { const reuseExports = args.includes('--reuse-exports') const depsTableOption: 'A' | 'B' = args.includes('--deps-table-b') ? 'B' : 'A' + const fillConstraints = args.includes('--fill-constraints') const exportNameIdx = args.indexOf('--export-name') if ( @@ -148,7 +153,18 @@ async function main(): Promise { const handle = await client.workflow.start(bootstrapOsspckgs, { taskQueue: 'bq-dataset-ingest', workflowId, - args: [{ mode, ecosystems, kinds, reuseExports, depsTableOption, exportName, snapshotDate }], + args: [ + { + mode, + ecosystems, + kinds, + reuseExports, + depsTableOption, + exportName, + snapshotDate, + fillConstraints, + }, + ], }) const flags = [ @@ -157,6 +173,7 @@ async function main(): Promise { reuseExports ? '--reuse-exports' : '', depsTableOption === 'B' ? '--deps-table-b' : '', exportName ? `--export-name ${exportName}` : '', + fillConstraints ? '--fill-constraints' : '', ].filter(Boolean) console.log( `Started workflow ${handle.workflowId}${ecosystems ? ` (ecosystems: ${ecosystems.join(', ')})` : ''}${flags.length ? ` [${flags.join(' ')}]` : ''}`, diff --git a/services/libs/data-access-layer/src/osspckgs/ingestJobs.ts b/services/libs/data-access-layer/src/osspckgs/ingestJobs.ts index ad7305cc3b..9606035e23 100644 --- a/services/libs/data-access-layer/src/osspckgs/ingestJobs.ts +++ b/services/libs/data-access-layer/src/osspckgs/ingestJobs.ts @@ -44,7 +44,7 @@ export interface MarkJobStatusFields { rowCountBq?: number rowCountStaging?: number rowCountPg?: number - tableRowCounts?: Record + tableRowCounts?: Record bqBytesBilled?: number bqJobId?: string bqStats?: BqStats @@ -302,3 +302,18 @@ export async function markJobStatus( await qx.result(`UPDATE osspckgs_ingest_jobs SET ${sets.join(', ')} WHERE id = $(jobId)`, params) } + +// Merges key-value pairs into table_row_counts without changing status. +// Used to write meta:step and similar display-only tracking keys mid-job. +export async function mergeJobTableRowCounts( + qx: QueryExecutor, + jobId: number, + kv: Record, +): Promise { + await qx.result( + `UPDATE osspckgs_ingest_jobs + SET table_row_counts = COALESCE(table_row_counts, '{}'::jsonb) || $(kv)::jsonb + WHERE id = $(jobId)`, + { jobId, kv }, + ) +}