diff --git a/vulnerabilities/migrations/0121_advisoryv2_is_latest_alter_advisoryv2_advisory_id_and_more.py b/vulnerabilities/migrations/0121_advisoryv2_is_latest_alter_advisoryv2_advisory_id_and_more.py new file mode 100644 index 000000000..be1db5016 --- /dev/null +++ b/vulnerabilities/migrations/0121_advisoryv2_is_latest_alter_advisoryv2_advisory_id_and_more.py @@ -0,0 +1,65 @@ +# Generated by Django 5.2.11 on 2026-04-13 19:05 + +from django.db import migrations +from django.db import models +from django.db.models import F + + +class Migration(migrations.Migration): + def add_is_latest_on_existing_advisory(apps, schema_editor): + Advisory = apps.get_model("vulnerabilities", "AdvisoryV2") + + print(f"\nUpdating is_latest on existing V2 Advisory.") + latest_qs = Advisory.objects.order_by( + "avid", + F("date_collected").desc(nulls_last=True), + "-id", + ).distinct("avid") + + Advisory.objects.filter(id__in=latest_qs.values("id")).update(is_latest=True) + + dependencies = [ + ("vulnerabilities", "0120_impactedpackage_last_range_unfurl_at_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="advisoryv2", + name="is_latest", + field=models.BooleanField( + db_index=True, + default=False, + help_text="Indicates whether this is the latest version of the advisory identified by its AVID.", + ), + ), + migrations.AlterField( + model_name="advisoryv2", + name="advisory_id", + field=models.CharField( + db_index=True, + help_text="An advisory is a unique vulnerability identifier in some database, such as PYSEC-2020-2233", + max_length=500, + ), + ), + migrations.AlterField( + model_name="advisoryv2", + name="datasource_id", + field=models.CharField( + db_index=True, + help_text="Unique ID for the datasource used for this advisory .e.g.: nginx_importer_v2", + max_length=200, + ), + ), + migrations.AddConstraint( + model_name="advisoryv2", + constraint=models.UniqueConstraint( + condition=models.Q(("is_latest", True)), + fields=("avid",), + name="unique_latest_per_avid", + ), + ), + migrations.RunPython( + code=add_is_latest_on_existing_advisory, + reverse_code=migrations.RunPython.noop, + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index e00f067c5..896da7c76 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2886,11 +2886,7 @@ def latest_for_avid(self, avid: str): ) def latest_per_avid(self): - return self.order_by( - "avid", - F("date_collected").desc(nulls_last=True), - "-id", - ).distinct("avid") + return self.filter(is_latest=True) def latest_for_avids(self, avids): return self.filter(avid__in=avids).latest_per_avid() @@ -3007,6 +3003,7 @@ class AdvisoryV2(models.Model): max_length=200, blank=False, null=False, + db_index=True, help_text="Unique ID for the datasource used for this advisory ." "e.g.: nginx_importer_v2", ) @@ -3016,6 +3013,7 @@ class AdvisoryV2(models.Model): blank=False, null=False, unique=False, + db_index=True, help_text="An advisory is a unique vulnerability identifier in some database, " "such as PYSEC-2020-2233", ) @@ -3090,6 +3088,14 @@ class AdvisoryV2(models.Model): help_text="UTC Date on which the advisory was collected", ) + is_latest = models.BooleanField( + default=False, + blank=False, + null=False, + db_index=True, + help_text="Indicates whether this is the latest version of the advisory identified by its AVID.", + ) + original_advisory_text = models.TextField( blank=True, null=True, @@ -3142,6 +3148,11 @@ class AdvisoryV2(models.Model): class Meta: unique_together = ["datasource_id", "advisory_id", "unique_content_id"] ordering = ["datasource_id", "advisory_id", "date_published", "unique_content_id"] + constraints = [ + models.UniqueConstraint( + fields=["avid"], condition=Q(is_latest=True), name="unique_latest_per_avid" + ) + ] indexes = [ models.Index( fields=["avid", "-date_collected", "-id"], diff --git a/vulnerabilities/pipes/advisory.py b/vulnerabilities/pipes/advisory.py index bcdd95075..9250f2679 100644 --- a/vulnerabilities/pipes/advisory.py +++ b/vulnerabilities/pipes/advisory.py @@ -334,6 +334,13 @@ def insert_advisory_v2( if not created: return advisory_obj + AdvisoryV2.objects.filter( + avid=f"{pipeline_id}/{advisory.advisory_id}", + is_latest=True, + ).update(is_latest=False) + advisory_obj.is_latest = True + advisory_obj.save() + aliases = get_or_create_advisory_aliases(aliases=advisory.aliases) references = get_or_create_advisory_references(references=advisory.references) severities = get_or_create_advisory_severities(severities=advisory.severities) diff --git a/vulnerabilities/tests/pipelines/v2_improvers/test_collect_ssvc_trees.py b/vulnerabilities/tests/pipelines/v2_improvers/test_collect_ssvc_trees.py index fa6719311..ad4a6bcb6 100644 --- a/vulnerabilities/tests/pipelines/v2_improvers/test_collect_ssvc_trees.py +++ b/vulnerabilities/tests/pipelines/v2_improvers/test_collect_ssvc_trees.py @@ -36,6 +36,7 @@ def vulnrichment_advisory(db): url="https://example.com/advisory/TEST-2024-0001", unique_content_id="unique-1234", date_collected=datetime.now(), + is_latest=True, ) @@ -59,6 +60,7 @@ def related_advisory(db): url="https://example.com/related/TEST-2024-0001", unique_content_id="unique-5678", date_collected=datetime.now(), + is_latest=True, ) diff --git a/vulnerabilities/tests/pipelines/v2_improvers/test_compute_package_risk_v2.py b/vulnerabilities/tests/pipelines/v2_improvers/test_compute_package_risk_v2.py index db6ffd5d3..305abf429 100644 --- a/vulnerabilities/tests/pipelines/v2_improvers/test_compute_package_risk_v2.py +++ b/vulnerabilities/tests/pipelines/v2_improvers/test_compute_package_risk_v2.py @@ -34,6 +34,7 @@ def test_simple_risk_pipeline(): unique_content_id="ajkef", url="https://test.com", date_collected=datetime.now(), + is_latest=True, ) adv.save() diff --git a/vulnerabilities/tests/pipelines/v2_improvers/test_relate_severities.py b/vulnerabilities/tests/pipelines/v2_improvers/test_relate_severities.py index 2dadbc679..27cf1f849 100644 --- a/vulnerabilities/tests/pipelines/v2_improvers/test_relate_severities.py +++ b/vulnerabilities/tests/pipelines/v2_improvers/test_relate_severities.py @@ -25,6 +25,7 @@ def test_relate_severities_by_advisory_id(): unique_content_id="ab1", url="https://example.com/advisory/CVE-2024-0001", date_collected="2024-01-01", + is_latest=True, ) severity_advisory = AdvisoryV2.objects.create( @@ -34,6 +35,7 @@ def test_relate_severities_by_advisory_id(): unique_content_id="ab2", url="https://example.com/epss/CVE-2024-0001", date_collected="2024-01-02", + is_latest=True, ) severity_advisory.severities.create( scoring_system=EPSS.identifier, @@ -59,6 +61,7 @@ def test_relate_severities_via_alias(): unique_content_id="ab3", url="https://example.com/advisory/CVE-2024-0002", date_collected="2024-01-01", + is_latest=True, ) base.aliases.create(alias="CVE-2024-ALIAS") @@ -70,6 +73,7 @@ def test_relate_severities_via_alias(): unique_content_id="ab4", url="https://example.com/epss/CVE-2024-ALIAS", date_collected="2024-01-02", + is_latest=True, ) severity_advisory.severities.create( scoring_system=EPSS.identifier, @@ -91,6 +95,7 @@ def test_no_self_relation_created(): url="https://example.com/advisory/CVE-2024-0003", date_collected="2024-01-03", avid="epss/CVE-2024-0003", + is_latest=True, ) advisory.severities.create( scoring_system=EPSS.identifier, @@ -112,6 +117,7 @@ def test_unsupported_severity_system_is_ignored(): url="https://example.com/advisory/CVE-2024-0004", date_collected="2024-01-01", avid="nvd/CVE-2024-0004", + is_latest=True, ) severity_advisory = AdvisoryV2.objects.create( @@ -121,6 +127,7 @@ def test_unsupported_severity_system_is_ignored(): url="https://example.com/epss/CVE-2024-0004", date_collected="2024-01-02", avid="epss/CVE-2024-0004", + is_latest=True, ) severity_advisory.severities.create( scoring_system="UNKNOWN_SYSTEM", @@ -142,6 +149,7 @@ def test_pipeline_is_idempotent(): url="https://example.com/advisory/CVE-2024-0005", date_collected="2024-01-01", avid="nvd/CVE-2024-0005", + is_latest=True, ) severity = AdvisoryV2.objects.create( @@ -150,6 +158,7 @@ def test_pipeline_is_idempotent(): unique_content_id="ab9", url="https://example.com/epss/CVE-2024-0005", date_collected="2024-01-02", + is_latest=True, avid="epss/CVE-2024-0005", ) severity.severities.create( diff --git a/vulnerabilities/tests/pipes/test_advisory.py b/vulnerabilities/tests/pipes/test_advisory.py index 8710b2ea4..67c073b61 100644 --- a/vulnerabilities/tests/pipes/test_advisory.py +++ b/vulnerabilities/tests/pipes/test_advisory.py @@ -18,12 +18,15 @@ from vulnerabilities import models from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AdvisoryDataV2 from vulnerabilities.importer import AffectedPackage +from vulnerabilities.importer import AffectedPackageV2 from vulnerabilities.importer import PackageCommitPatchData from vulnerabilities.importer import Reference from vulnerabilities.models import AdvisoryAlias from vulnerabilities.models import AdvisoryReference from vulnerabilities.models import AdvisorySeverity +from vulnerabilities.models import AdvisoryV2 from vulnerabilities.models import AdvisoryWeakness from vulnerabilities.models import PackageCommitPatch from vulnerabilities.pipes.advisory import get_or_create_advisory_aliases @@ -33,6 +36,8 @@ from vulnerabilities.pipes.advisory import get_or_create_advisory_weaknesses from vulnerabilities.pipes.advisory import get_or_create_aliases from vulnerabilities.pipes.advisory import import_advisory +from vulnerabilities.pipes.advisory import insert_advisory_v2 +from vulnerabilities.tests.pipelines import TestLogger from vulnerabilities.utils import compute_content_id @@ -257,3 +262,76 @@ def test_get_or_create_advisory_commit(advisory_commit): assert isinstance(commit, PackageCommitPatch) assert commit.commit_hash in [c.commit_hash for c in advisory_commit] assert commit.vcs_url in [c.vcs_url for c in advisory_commit] + + +class TestLatestAdvisoryV2(TestCase): + def setUp(self): + self.logger = TestLogger() + self.advisory1 = AdvisoryDataV2( + summary="Test advisory old", + aliases=["CVE-2025-0001"], + references=[], + severities=[], + weaknesses=[], + affected_packages=[ + AffectedPackageV2( + package=PackageURL.from_string("pkg:npm/foobar"), + affected_version_range=VersionRange.from_string("vers:npm/>3.2.1|<4.0.0"), + fixed_version_range=VersionRange.from_string("vers:npm/4.0.0"), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + ], + patches=[], + advisory_id="GHSA-1234", + url="https://example.com/advisory", + ) + + self.advisory2 = AdvisoryDataV2( + summary="Test advisory new", + aliases=["CVE-2025-0001"], + references=[], + severities=[], + weaknesses=[], + affected_packages=[ + AffectedPackageV2( + package=PackageURL.from_string("pkg:npm/foobar"), + affected_version_range=VersionRange.from_string("vers:npm/>3.2.1|<4.0.0"), + fixed_version_range=VersionRange.from_string("vers:npm/4.0.0"), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL.from_string("pkg:npm/foobar"), + affected_version_range=None, + fixed_version_range=None, + introduced_by_commit_patches=[], + fixed_by_commit_patches=[ + PackageCommitPatchData( + vcs_url="https://foobar.vcs/", + commit_hash="982f801f", + ), + ], + ), + ], + patches=[], + advisory_id="GHSA-1234", + url="https://example.com/advisory", + ) + + insert_advisory_v2( + advisory=self.advisory1, + pipeline_id="test_pipeline_v2", + logger=self.logger.write, + ) + + def test_latest_advisory_update_on_advisory_insert(self): + adv_old = AdvisoryV2.objects.get(avid="test_pipeline_v2/GHSA-1234", is_latest=True) + insert_advisory_v2( + advisory=self.advisory2, + pipeline_id="test_pipeline_v2", + logger=self.logger.write, + ) + adv_new = AdvisoryV2.objects.get(avid="test_pipeline_v2/GHSA-1234", is_latest=True) + self.assertEqual("Test advisory old", adv_old.summary) + self.assertEqual("Test advisory new", adv_new.summary) diff --git a/vulnerabilities/tests/test_api_v3.py b/vulnerabilities/tests/test_api_v3.py index 36dd7fba1..137692abf 100644 --- a/vulnerabilities/tests/test_api_v3.py +++ b/vulnerabilities/tests/test_api_v3.py @@ -14,22 +14,26 @@ from rest_framework.test import APITestCase from univers.version_range import PypiVersionRange +from vulnerabilities.importer import AdvisoryDataV2 from vulnerabilities.models import AdvisoryV2 from vulnerabilities.models import PackageV2 from vulnerabilities.pipes.advisory import insert_advisory_v2 +from vulnerabilities.tests.pipelines import TestLogger class APIV3TestCase(APITestCase): def setUp(self): from vulnerabilities.models import ImpactedPackage - self.advisory = AdvisoryV2.objects.create( - datasource_id="ghsa", - advisory_id="GHSA-1234", - avid="ghsa/GHSA-1234", - unique_content_id="f" * 64, - url="https://example.com/advisory", - date_collected="2025-07-01T00:00:00Z", + self.logger = TestLogger() + self.advisory = insert_advisory_v2( + advisory=AdvisoryDataV2( + summary="summary", + advisory_id="GHSA-1234", + url="https://example.com/advisory", + ), + pipeline_id="ghsa", + logger=self.logger.write, ) self.package = PackageV2.objects.from_purl(purl="pkg:pypi/sample@1.0.0") diff --git a/vulnerabilities/tests/test_data_migrations.py b/vulnerabilities/tests/test_data_migrations.py index 8303c4003..c32abb83f 100644 --- a/vulnerabilities/tests/test_data_migrations.py +++ b/vulnerabilities/tests/test_data_migrations.py @@ -12,6 +12,7 @@ from django.db import IntegrityError from django.db import connection from django.db.migrations.executor import MigrationExecutor +from django.db.models import Count from django.test import TestCase from django.utils import timezone from packageurl import PackageURL @@ -1031,3 +1032,59 @@ def test_m2m_relationships_work(self): self.assertIn(commit1, impacted.affecting_commits.all()) self.assertIn(commit2, impacted.fixed_by_commits.all()) + + +class TestLatestAdvisoryV2Migration(TestMigrations): + """Tests is_latest field population on existing v2 advisory.""" + + app_name = "vulnerabilities" + migrate_from = "0120_impactedpackage_last_range_unfurl_at_and_more" + migrate_to = "0121_advisoryv2_is_latest_alter_advisoryv2_advisory_id_and_more" + + def setUpBeforeMigration(self, apps): + AdvisoryV2 = apps.get_model("vulnerabilities", "AdvisoryV2") + + AdvisoryV2.objects.create( + unique_content_id="content_id_old", + url="https://old.example.com", + summary="Old advisory", + advisory_id="test_adv", + avid="test_pipeline/test_adv", + datasource_id="test_pipeline", + ) + + AdvisoryV2.objects.create( + unique_content_id="content_id_old2", + url="https://old.example.com", + summary="Old 2 advisory", + advisory_id="test_adv", + avid="test_pipeline/test_adv", + datasource_id="test_pipeline", + ) + + AdvisoryV2.objects.create( + unique_content_id="content_id_new", + url="https://old.example.com", + summary="New advisory", + advisory_id="test_adv", + avid="test_pipeline/test_adv", + datasource_id="test_pipeline", + ) + + def test_no_duplicate_is_latest_for_avid(self): + AdvisoryV2 = apps.get_model("vulnerabilities", "AdvisoryV2") + + duplicate = ( + AdvisoryV2.objects.filter(is_latest=True) + .values("avid") + .annotate(cnt=Count("id")) + .filter(cnt__gt=1) + ) + + self.assertFalse(duplicate.exists()) + + def test_latest_is_actually_recent(self): + AdvisoryV2 = apps.get_model("vulnerabilities", "AdvisoryV2") + + latest = AdvisoryV2.objects.get(avid="test_pipeline/test_adv", is_latest=True) + self.assertEqual("New advisory", latest.summary) diff --git a/vulnerabilities/tests/test_same_avid_different_content_id.py b/vulnerabilities/tests/test_same_avid_different_content_id.py index a366d1872..1dc6dd686 100644 --- a/vulnerabilities/tests/test_same_avid_different_content_id.py +++ b/vulnerabilities/tests/test_same_avid_different_content_id.py @@ -7,13 +7,14 @@ # See https://aboutcode.org for more information about nexB OSS projects. # -import uuid -from datetime import timedelta - import pytest -from django.utils.timezone import now +from vulnerabilities.importer import AdvisoryDataV2 from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.pipes.advisory import insert_advisory_v2 +from vulnerabilities.tests.pipelines import TestLogger + +logger = TestLogger() @pytest.fixture @@ -22,45 +23,29 @@ def advisory_factory(db): Factory to create AdvisoryV2 objects with minimal required fields. """ - def _create(*, avid, advisory_id, collected_at): - return AdvisoryV2.objects.create( - datasource_id="test_source", - advisory_id=advisory_id, - avid=avid, - unique_content_id=str(uuid.uuid4()), - url="https://example.com/advisory", - date_collected=collected_at, + def _create(*, advisory_id, summary): + + return insert_advisory_v2( + advisory=AdvisoryDataV2( + summary=summary, + advisory_id=advisory_id, + url="https://example.com/advisory", + ), + pipeline_id="source", + logger=logger.write, ) return _create -@pytest.fixture -def timestamps(): - now_ts = now() - return { - "old": now_ts - timedelta(days=3), - "mid": now_ts - timedelta(days=1), - "new": now_ts, - } - - @pytest.mark.django_db def test_latest_for_avid_returns_latest_by_date_collected( - advisory_factory, timestamps, django_assert_num_queries + advisory_factory, django_assert_num_queries ): avid = "source/ADV-1" - older = advisory_factory( - avid=avid, - advisory_id="ADV-1", - collected_at=timestamps["old"], - ) - newer = advisory_factory( - avid=avid, - advisory_id="ADV-1", - collected_at=timestamps["new"], - ) + older = advisory_factory(advisory_id="ADV-1", summary="old advisory") + newer = advisory_factory(advisory_id="ADV-1", summary="new advisory") with django_assert_num_queries(1): result = AdvisoryV2.objects.latest_for_avid(avid) @@ -70,20 +55,11 @@ def test_latest_for_avid_returns_latest_by_date_collected( @pytest.mark.django_db -def test_latest_for_avid_tie_breaks_by_id(advisory_factory, timestamps, django_assert_num_queries): +def test_latest_for_avid_tie_breaks_by_id(advisory_factory, django_assert_num_queries): avid = "source/ADV-2" - ts = timestamps["mid"] - - first = advisory_factory( - avid=avid, - advisory_id="ADV-2", - collected_at=ts, - ) - second = advisory_factory( - avid=avid, - advisory_id="ADV-2", - collected_at=ts, - ) + + first = advisory_factory(advisory_id="ADV-2", summary="old advisory") + second = advisory_factory(advisory_id="ADV-2", summary="new advisory") with django_assert_num_queries(1): result = AdvisoryV2.objects.latest_for_avid(avid) @@ -92,25 +68,11 @@ def test_latest_for_avid_tie_breaks_by_id(advisory_factory, timestamps, django_a @pytest.mark.django_db -def test_latest_per_avid_returns_one_row_per_avid( - advisory_factory, timestamps, django_assert_num_queries -): - advisory_factory( - avid="source/A", - advisory_id="A", - collected_at=timestamps["old"], - ) - latest_a = advisory_factory( - avid="source/A", - advisory_id="A", - collected_at=timestamps["new"], - ) - - latest_b = advisory_factory( - avid="source/B", - advisory_id="B", - collected_at=timestamps["mid"], - ) +def test_latest_per_avid_returns_one_row_per_avid(advisory_factory, django_assert_num_queries): + advisory_factory(advisory_id="A", summary="old advisory") + latest_a = advisory_factory(advisory_id="A", summary="new advisory") + + latest_b = advisory_factory(advisory_id="B", summary="new advisory") with django_assert_num_queries(1): qs = AdvisoryV2.objects.latest_per_avid() @@ -122,19 +84,11 @@ def test_latest_per_avid_returns_one_row_per_avid( @pytest.mark.django_db -def test_latest_per_avid_excludes_older_versions(advisory_factory, timestamps): +def test_latest_per_avid_excludes_older_versions(advisory_factory): avid = "source/C" - older = advisory_factory( - avid=avid, - advisory_id="C", - collected_at=timestamps["old"], - ) - latest = advisory_factory( - avid=avid, - advisory_id="C", - collected_at=timestamps["new"], - ) + older = advisory_factory(advisory_id="C", summary="old advisory") + latest = advisory_factory(advisory_id="C", summary="new advisory") results = list(AdvisoryV2.objects.latest_per_avid()) @@ -144,30 +98,14 @@ def test_latest_per_avid_excludes_older_versions(advisory_factory, timestamps): @pytest.mark.django_db def test_latest_for_avids_filters_and_collapses_correctly( - advisory_factory, timestamps, django_assert_num_queries + advisory_factory, django_assert_num_queries ): - advisory_factory( - avid="source/A", - advisory_id="A", - collected_at=timestamps["old"], - ) - latest_a = advisory_factory( - avid="source/A", - advisory_id="A", - collected_at=timestamps["new"], - ) - - latest_b = advisory_factory( - avid="source/B", - advisory_id="B", - collected_at=timestamps["mid"], - ) - - advisory_factory( - avid="source/C", - advisory_id="C", - collected_at=timestamps["new"], - ) + + advisory_factory(advisory_id="A", summary="old advisory") + latest_a = advisory_factory(advisory_id="A", summary="new advisory") + + advisory_factory(advisory_id="B", summary="old advisory") + latest_b = advisory_factory(advisory_id="B", summary="new advisory") with django_assert_num_queries(1): qs = AdvisoryV2.objects.latest_for_avids({"source/A", "source/B"})