Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f6249a2
fix: trying to prevent a deadlock
themarolt Jun 17, 2026
eafcb53
fix: using proper tables for nuget and go
themarolt Jun 17, 2026
9b77640
fix: flag to update constraints
themarolt Jun 17, 2026
9fd28f7
chore: improvements to monitoring
themarolt Jun 17, 2026
584cf90
chore: more states
themarolt Jun 17, 2026
c4e31ac
fix: use the same env as in deployment
themarolt Jun 17, 2026
ba96498
fix: better deploy script and monitor fix
themarolt Jun 17, 2026
954a751
fix: monitor column spacing fix
themarolt Jun 17, 2026
1f3fda0
fix: dont be done prematurely
themarolt Jun 17, 2026
fd18287
fix: index creation improvements
themarolt Jun 18, 2026
755a317
chore: script to fix unique index on packages_dependents
themarolt Jun 18, 2026
73ad88b
fix: better index creation
themarolt Jun 19, 2026
078ccae
Merge branch 'main' into fix/bq-depsdev-deadlock
themarolt Jun 19, 2026
aac2ccd
Merge remote-tracking branch 'origin/main' into fix/bq-depsdev-deadlock
themarolt Jun 19, 2026
0f33398
fix: comments
themarolt Jun 19, 2026
719ce84
Merge remote-tracking branch 'origin/main' into fix/bq-depsdev-deadlock
themarolt Jun 19, 2026
e2ca951
fix: comments
themarolt Jun 19, 2026
826b42e
Merge remote-tracking branch 'origin/main' into fix/bq-depsdev-deadlock
themarolt Jun 19, 2026
d369171
fix: comments
themarolt Jun 19, 2026
61b4b31
Merge remote-tracking branch 'origin/main' into fix/bq-depsdev-deadlock
themarolt Jun 19, 2026
f56818a
fix: comments
themarolt Jun 19, 2026
26f0680
fix: comments
themarolt Jun 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions docs/adr/0003-deps-bq-table-selection.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ADR-0003: Use DependencyGraphEdgesLatest for deps ingestion; defer DependenciesLatest until NUGET needed
# ADR-0003: Use DependencyGraphEdgesLatest for deps ingestion; defer DependenciesLatest until NUGET or GO needed

**Date**: 2026-05-29
**Status**: accepted
Expand All @@ -14,9 +14,9 @@ NPM + MAVEN only.
## Decision
Use `DependencyGraphEdgesLatest` (Option A) with `From.Name = Name AND
From.Version = Version` as the depth-1 filter. Switch to `DependenciesLatest`
(Option B) only when NUGET ingestion is required, since Option A does not
support NUGET. At that point evaluate whether to migrate all ecosystems or
add NUGET via Option B only.
(Option B) only when NUGET or GO ingestion is required, since Option A does not
support either ecosystem. At that point evaluate whether to migrate all ecosystems
or add GO/NUGET via Option B only.

## Alternatives Considered

Expand All @@ -27,7 +27,7 @@ add NUGET via Option B only.
e.g. `^1.2.3`); only exposes resolved version
- **Why not**: NPM and MAVEN are the only ecosystems needed now. Option A
retains `version_constraint` which has security feature value. Cost delta is
acceptable for the current scope. Re-evaluate when NUGET is required.
acceptable for the current scope. Re-evaluate when NUGET or GO is required.

## Consequences

Expand All @@ -36,11 +36,15 @@ add NUGET via Option B only.
- No migration risk — simpler to stay on the table already coded

### Negative
- NUGET not supported; must revisit when NUGET ingestion is added
- Only NPM, MAVEN, PYPI, CARGO are present in `DependencyGraphEdgesLatest`.
GO and NUGET are absent entirely — confirmed via BQ query on the live table
(`SELECT System, COUNT(*) … GROUP BY System` returns exactly those 4).
GO uses Minimal Version Selection (no graph resolution by deps.dev);
NUGET is simply not covered. Must revisit when either ecosystem is added.
- Full bootstrap ~$1,291 vs ~$494 for NPM+MAVEN (Option B cheaper)
- Weekly incremental ~$26 vs ~$7 (Option B cheaper)

### Risks
- If NUGET is added before a re-evaluation, the team may miss that Option B
- If NUGET or GO is added before a re-evaluation, the team may miss that Option B
is required. Mitigation: this ADR and the note in
`personal/osspckgs-bq-cost-report.txt` flag the trigger condition.
2 changes: 1 addition & 1 deletion docs/adr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Use the `/adr` skill in Claude Code to record new ADRs or query past decisions.
| ADR | Title | Status | Date |
| --------------------------------------------------- | ---------------------------------------- | ------ | ---------- |
| [ADR-0001](./0001-oss-packages-design-decisions.md) | OSS packages — design decisions (living) | living | 2026-05-27 |
| [ADR-0003](./0003-deps-bq-table-selection.md) | Use DependencyGraphEdgesLatest for deps ingestion | accepted | 2026-05-29 |
| [ADR-0003](./0003-deps-bq-table-selection.md) | Use DependencyGraphEdgesLatest for deps ingestion; defer DependenciesLatest until NUGET or GO needed | accepted | 2026-05-29 |

## Why ADRs?

Expand Down
57 changes: 57 additions & 0 deletions scripts/cli
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,55 @@ function select_services() {
echo "Selected services: $selected_services"
}

function monitor_workflow() {
local REPOSITORY="$1"
local WORKFLOW_FILE="$2"
local EXISTING_IDS="${3:-}"

yell "Waiting for workflow run to appear..."

local RUN_ID=""
local ATTEMPTS=0

while [[ -z "$RUN_ID" && $ATTEMPTS -lt 24 ]]; do
sleep 5
local CANDIDATE
CANDIDATE=$(gh run list \
--repo "$REPOSITORY" \
--workflow "$WORKFLOW_FILE" \
--limit 1 \
--json databaseId \
--jq '.[0].databaseId // empty' 2>/dev/null || echo "")

if [[ -n "$CANDIDATE" ]] && ! echo "$EXISTING_IDS" | grep -q "^${CANDIDATE}$"; then
RUN_ID="$CANDIDATE"
fi
Comment thread
themarolt marked this conversation as resolved.
Comment thread
themarolt marked this conversation as resolved.

ATTEMPTS=$((ATTEMPTS + 1))
done

if [[ -z "$RUN_ID" ]]; then
error "Could not find new workflow run after $((ATTEMPTS * 5)) seconds."
return 1
fi

say "Monitoring run #$RUN_ID — https://github.com/$REPOSITORY/actions/runs/$RUN_ID"
nl

gh run watch "$RUN_ID" --repo "$REPOSITORY" || true

local CONCLUSION
CONCLUSION=$(gh run view "$RUN_ID" --repo "$REPOSITORY" --json conclusion --jq '.conclusion')

nl
if [[ "$CONCLUSION" == "success" ]]; then
say "✓ Deployment succeeded!"
else
error "✗ Deployment failed: $CONCLUSION"
return 1
fi
}

function deploy_staging() {
REPOSITORY="CrowdDotDev/crowd.dev"
WORKFLOW_FILE="lf-oracle-staging-deploy.yaml"
Expand All @@ -264,7 +313,11 @@ function deploy_staging() {
echo "${SELECTED_SERVICES[*]}"
)"

local EXISTING_IDS
EXISTING_IDS=$(gh run list --repo "$REPOSITORY" --workflow "$WORKFLOW_FILE" --limit 10 --json databaseId --jq '.[].databaseId' 2>/dev/null || echo "")

gh workflow run $WORKFLOW_FILE --repo $REPOSITORY --ref $CURRENT_BRANCH -f services="$SERVICES"
monitor_workflow "$REPOSITORY" "$WORKFLOW_FILE" "$EXISTING_IDS"
}

function deploy_production() {
Expand All @@ -276,7 +329,11 @@ function deploy_production() {
echo "${SELECTED_SERVICES[*]}"
)"

local EXISTING_IDS
EXISTING_IDS=$(gh run list --repo "$REPOSITORY" --workflow "$WORKFLOW_FILE" --limit 10 --json databaseId --jq '.[].databaseId' 2>/dev/null || echo "")

gh workflow run $WORKFLOW_FILE --repo $REPOSITORY --ref $CURRENT_BRANCH -f services="$SERVICES"
monitor_workflow "$REPOSITORY" "$WORKFLOW_FILE" "$EXISTING_IDS"
}

function reset_selected_services() {
Expand Down
2 changes: 2 additions & 0 deletions services/apps/packages_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
"backfill:stewardship:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=stewardship-backfill LOG_LEVEL=info tsx src/bin/stewardship-backfill.ts",
"monitor:osspckgs": "SERVICE=bq-dataset-ingest tsx src/scripts/monitorOsspckgs.ts",
"monitor:osspckgs:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=bq-dataset-ingest tsx src/scripts/monitorOsspckgs.ts",
"dedup-package-deps": "SERVICE=bq-dataset-ingest tsx src/scripts/dedupPackageDeps.ts",
"dedup-package-deps:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=bq-dataset-ingest tsx src/scripts/dedupPackageDeps.ts",
"lint": "npx eslint --ext .ts src --max-warnings=0",
"format": "npx prettier --write \"src/**/*.ts\"",
"format-check": "npx prettier --check .",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export async function rankPackages(): Promise<{ scoredRows: number; rankedRows:

const jobId = existing?.id ?? (await createIngestJob(qx, 'ranking', 'ranking', null))
try {
await markJobStatus(qx, jobId, 'merging')
const [result] = await qx.select(`SELECT * FROM rank_packages()`)
const scoredRows = Number(result.scored_rows ?? 0)
const rankedRows = Number(result.ranked_rows ?? 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface BqExportToGcsInput {
maxBytesGb: number
reuseExports?: boolean
exportName?: string
ecosystems?: string[]
}

export interface BqExportToGcsOutput {
Expand All @@ -34,7 +35,17 @@ export interface BqExportToGcsOutput {
}

export async function bqExportToGcs(input: BqExportToGcsInput): Promise<BqExportToGcsOutput> {
const { jobKind, sql, runId, syncMode, snapshotAt, maxBytesGb, reuseExports, exportName } = input
const {
jobKind,
sql,
runId,
syncMode,
snapshotAt,
maxBytesGb,
reuseExports,
exportName,
ecosystems,
} = input

// Named exports use a stable GCS path independent of runId so they survive across bootstrap runs.
const namedGcsPrefix = exportName
Expand Down Expand Up @@ -93,7 +104,10 @@ export async function bqExportToGcs(input: BqExportToGcsInput): Promise<BqExport
gcsPrefix: namedPrefix,
rowCountBq: 0,
bqBytesBilled: 0,
tableRowCounts: { 'bq:export': 0 },
tableRowCounts: {
'bq:export': 0,
...(ecosystems ? { 'meta:ecosystems': ecosystems } : {}),
},
})
return { gcsPrefix: namedPrefix, rowCount: 0, bqBytesBilled: 0, jobId }
}
Expand Down Expand Up @@ -177,8 +191,10 @@ export async function bqExportToGcs(input: BqExportToGcsInput): Promise<BqExport
const provisionalDate = snapshotAt ? new Date(snapshotAt) : null
const jobId = await createIngestJob(qx, jobKind, syncMode, provisionalDate, exportName)

// H7: mark exporting before we start the BQ job
await markJobStatus(qx, jobId, 'exporting')
// H7: mark exporting before we start the BQ job; store ecosystems filter in table_row_counts JSONB.
await markJobStatus(qx, jobId, 'exporting', {
...(ecosystems ? { tableRowCounts: { 'meta:ecosystems': ecosystems } } : {}),
})
Comment thread
themarolt marked this conversation as resolved.

// B9: wrap in SELECT * FROM (...) so QUALIFY / top-level set ops don't break EXPORT DATA syntax.
// CREATE TEMP TABLE first so BQ materializes the result before exporting — direct EXPORT DATA
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './bqExportToGcs'
export * from './setJobStep'
export * from './createVersionsLookup'
export * from './managePackageDepsConstraints'
export * from './managePackageDepsIndexes'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,20 @@ export async function rebuildPackageDepsIndexes(): Promise<{
const existing: Array<{ indexdef: string }> = await qx.select(NON_CONSTRAINT_INDEXES_SQL)
const existingDefs = existing.map((r: { indexdef: string }) => r.indexdef.toLowerCase())

const toRebuild = SECONDARY_INDEXES.filter(
(idx) => !existingDefs.some((def: string) => def.includes(`(${idx.columns})`)),
)
const rebuilt: string[] = []

for (const idx of SECONDARY_INDEXES) {
const alreadyExists = existingDefs.some((def: string) => def.includes(`(${idx.columns})`))
if (alreadyExists) {
log.info({ columns: idx.columns }, 'Index already exists, skipping')
continue
}
log.info({ columns: idx.columns }, 'Creating index on package_dependencies')
await qx.result(idx.createSql)
rebuilt.push(idx.columns)
}
// Build indexes in parallel — each on its own connection so they run concurrently.
await Promise.all(
toRebuild.map(async (idx) => {
const conn = await getPackagesDb()
log.info({ columns: idx.columns }, 'Creating index on package_dependencies')
await conn.result(idx.createSql)
rebuilt.push(idx.columns)
}),
)

// Remove cross-chunk duplicates before rebuilding the UNIQUE constraint.
// DISTINCT ON deduplicates within a single chunk, but the same (root, dep) pair can appear in
Expand All @@ -118,27 +120,43 @@ export async function rebuildPackageDepsIndexes(): Promise<{
// Run per partition (depends_on_id % 64 = p) so each iteration prunes to one of the 64
// partitions instead of scanning all 1.15B rows at once. Same total rows read; each pass
// fits in work_mem and avoids cross-partition sort.
// Run in parallel batches of DEDUP_CONCURRENCY to cut wall-clock from ~10h to ~1-2h.
const NUM_PARTITIONS = 64
const DEDUP_CONCURRENCY = 8
let totalDedupDeleted = 0
for (let p = 0; p < NUM_PARTITIONS; p++) {
const result = await qx.result(`
DELETE FROM package_dependencies pd
USING (
SELECT id, depends_on_id
FROM (
SELECT id, depends_on_id,
ROW_NUMBER() OVER (
PARTITION BY version_id, depends_on_id, dependency_kind
ORDER BY id
) AS rn
FROM package_dependencies
WHERE depends_on_id % ${NUM_PARTITIONS} = ${p}
) sub
WHERE rn > 1
) dupes
WHERE pd.id = dupes.id AND pd.depends_on_id = dupes.depends_on_id
`)
totalDedupDeleted += result
for (let batch = 0; batch < NUM_PARTITIONS; batch += DEDUP_CONCURRENCY) {
Comment thread
themarolt marked this conversation as resolved.
const partitions = Array.from(
Comment thread
themarolt marked this conversation as resolved.
{ length: Math.min(DEDUP_CONCURRENCY, NUM_PARTITIONS - batch) },
(_, i) => batch + i,
)
const counts = await Promise.all(
partitions.map(async (p) => {
const conn = await getPackagesDb()
// work_mem (not maintenance_work_mem) controls sort memory for window functions.
// 2GB avoids disk spill during the ROW_NUMBER() sort on each partition.
return conn.tx(async (tx) => {
await tx.result(`SET LOCAL work_mem = '2GB'`)
return tx.result(`
DELETE FROM package_dependencies pd
USING (
SELECT id, depends_on_id
FROM (
SELECT id, depends_on_id,
ROW_NUMBER() OVER (
PARTITION BY version_id, depends_on_id, dependency_kind
ORDER BY id
) AS rn
FROM package_dependencies
WHERE depends_on_id % ${NUM_PARTITIONS} = ${p}
) sub
WHERE rn > 1
) dupes
WHERE pd.id = dupes.id AND pd.depends_on_id = dupes.depends_on_id
`)
})
}),
)
totalDedupDeleted += counts.reduce((sum, n) => sum + n, 0)
}
log.info(
{ rowsDeleted: totalDedupDeleted },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,25 @@ export async function rebuildVersionsIndexes(): Promise<{
const existing: Array<{ indexdef: string }> = await qx.select(NON_CONSTRAINT_INDEXES_SQL)
const existingDefs = existing.map((r) => r.indexdef.toLowerCase())

const toRebuild = SECONDARY_INDEXES.filter(
(idx) => !existingDefs.some((def) => def.includes(idx.matchString ?? `(${idx.columns})`)),
)
const rebuilt: string[] = []

for (const idx of SECONDARY_INDEXES) {
const alreadyExists = existingDefs.some((def) =>
def.includes(idx.matchString ?? `(${idx.columns})`),
)
if (alreadyExists) {
log.info({ columns: idx.columns }, 'Index already exists, skipping')
continue
}
log.info({ columns: idx.columns }, 'Creating index on versions')
await qx.result(idx.createSql)
rebuilt.push(idx.columns)
}
// Build indexes in parallel — each on its own connection so they run concurrently.
// maintenance_work_mem per connection: with 32 partitions and default 64MB, PG spills to
// disk on every partition; 2GB lets the sort fit in RAM and cuts build time dramatically.
await Promise.all(
toRebuild.map(async (idx) => {
const conn = await getPackagesDb()
await conn.tx(async (t) => {
await t.result(`SET LOCAL maintenance_work_mem = '2GB'`)
log.info({ columns: idx.columns }, 'Creating index on versions')
await t.result(idx.createSql)
Comment thread
themarolt marked this conversation as resolved.
})
rebuilt.push(idx.columns)
}),
)
Comment thread
themarolt marked this conversation as resolved.

// Remove cross-chunk duplicates before rebuilding the UNIQUE constraint.
// versions is HASH-partitioned by package_id (32 partitions). Loop over each partition table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { mergeJobTableRowCounts } from '@crowd/data-access-layer'

import { getPackagesDb } from '../../db'

export async function setJobStep(input: { jobId: number; step: string }): Promise<void> {
const qx = await getPackagesDb()
await mergeJobTableRowCounts(qx, input.jobId, { 'meta:step': input.step })
}
Loading
Loading