diff --git a/vulnerabilities/migrations/0122_advisoryv2_remove_malformed_aliases_and_dvisory_id.py b/vulnerabilities/migrations/0122_advisoryv2_remove_malformed_aliases_and_dvisory_id.py new file mode 100644 index 000000000..816a4a523 --- /dev/null +++ b/vulnerabilities/migrations/0122_advisoryv2_remove_malformed_aliases_and_dvisory_id.py @@ -0,0 +1,31 @@ +from django.db import migrations +from django.db.models import Q + + +class Migration(migrations.Migration): + dependencies = [ + ("vulnerabilities", "0121_advisoryv2_is_latest_alter_advisoryv2_advisory_id_and_more"), + ] + + def drop_malformed_advisory_v2(apps, _): + AdvisoryV2 = apps.get_model("vulnerabilities", "AdvisoryV2") + AdvisoryAlias = apps.get_model("vulnerabilities", "AdvisoryAlias") + + valid_alias_prefix = [ + "cve-", "osv-", "xsa-", "vsv", "zbx-", "zf2", "vu#", "gms-", "usn-", + "sw-", "ss-", "ts-", "osvdb-", "ysa-", "se-core-", "pysec-", "alpine-", + "dw2", "go-", "mal-", "zdi-can", "asa-", "ezsa-", "ghsl-", "ghsa-", + "talos-", "srcclr-sid-", "bit-", "gnutls-", "rustsec-", "snyk-", + "temp-", "TYPO3-", "wnpa-sec-", "sa-core-", "skcsirt-", "flow-", "gsd-" + ] + query = Q() + for alias_prefix in valid_alias_prefix: + query |= Q(alias__istartswith=alias_prefix) + + malformed_aliases = AdvisoryAlias.objects.exclude(query) + AdvisoryV2.objects.filter(aliases__in=malformed_aliases).delete() + malformed_aliases.delete() + + operations = [ + migrations.RunPython(drop_malformed_advisory_v2, reverse_code=migrations.RunPython.noop), + ] diff --git a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py index 4c176732e..a9ff749b3 100644 --- a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py +++ b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py @@ -9,6 +9,7 @@ import json import logging +import re from pathlib import Path from typing import Any from typing import Iterable @@ -28,6 +29,7 @@ from vulnerabilities.references import XsaReferenceV2 from vulnerabilities.references import ZbxReferenceV2 from vulnerabilities.utils import get_advisory_url +from vulnerabilities.utils import is_cve from vulnerabilities.utils import load_json @@ -163,8 +165,10 @@ def load_advisories( # fixed_vulns is a list of strings and each string is a space-separated # list of aliases and CVES for vuln_ids in fixed_vulns: - aliases = vuln_ids.strip().split(" ") - vuln_id = aliases[0] + vuln_id, aliases = parse_vuln_ids(vuln_ids, logger=logger) + + if not vuln_id: + continue references = [] if vuln_id.startswith("XSA"): @@ -248,3 +252,51 @@ def load_advisories( url=url, original_advisory_text=json.dumps(pkg_infos, indent=2, ensure_ascii=False), ) + + +PARENTHESES_RE = re.compile(r"\(.*?\)") + + +def parse_vuln_ids(vuln_ids_string, logger=print): + """ + Parses a raw vulnerability ids, removes parentheses and returns the vuln_id and a list of all valid aliases. + """ + vuln_ids = PARENTHESES_RE.sub("", vuln_ids_string) + if not vuln_ids_string: + return None, [] + + cleaned_vuln_ids = [] + for alias in vuln_ids.split(): + clean_alias = alias.replace("_", "-").replace(".patch", "") + cleaned_vuln_ids.append(clean_alias) + + aliases = [] + valid_prefixes = ( + "XSA-", + "GHSL-", + "TALOS-", + "RUSTSEC-", + "GHSA-", + "GNUTLS-", + "VSV", + "ZDI-CAN-", + "DW", + "YSA-", + "ZBX-", + "ALPINE-", + "TS-", + "wnpa-sec-", + ) + for alias in cleaned_vuln_ids: + if alias and ( + (alias.startswith("CVE-") and is_cve(alias)) or alias.startswith(valid_prefixes) + ): + aliases.append(alias) + else: + logger(f"Malformed aliases found: {alias}") + + if not aliases: + return None, [] + + vuln_id = aliases[0] + return vuln_id, aliases diff --git a/vulnerabilities/pipelines/v2_importers/fireeye_importer_v2.py b/vulnerabilities/pipelines/v2_importers/fireeye_importer_v2.py index b1283d132..3e9dc2c40 100644 --- a/vulnerabilities/pipelines/v2_importers/fireeye_importer_v2.py +++ b/vulnerabilities/pipelines/v2_importers/fireeye_importer_v2.py @@ -104,7 +104,7 @@ def parse_advisory_data(raw_data, file_path, base_path) -> AdvisoryDataV2: cve_refs = md_dict.get("## CVE Reference") or [] cve_ids = md_dict.get("## CVE ID") or [] cleaned_cve_ids = [] - for line in cve_ids: + for line in cve_ids + cve_refs: found_cves = find_all_cve(line) cleaned_cve_ids.extend(found_cves) @@ -112,7 +112,8 @@ def parse_advisory_data(raw_data, file_path, base_path) -> AdvisoryDataV2: cwe_data = md_dict.get("## Common Weakness Enumeration") or [] advisory_id = file_path.stem - aliases = dedupe([cve.strip() for cve in cleaned_cve_ids + cve_refs]) + aliases = dedupe([cve.strip() for cve in cleaned_cve_ids]) + aliases = [aliase for aliase in aliases if aliase != advisory_id] advisory_url = get_advisory_url( file=file_path, diff --git a/vulnerabilities/pipelines/v2_importers/istio_importer.py b/vulnerabilities/pipelines/v2_importers/istio_importer.py index ce7b7d342..019ea3d6a 100644 --- a/vulnerabilities/pipelines/v2_importers/istio_importer.py +++ b/vulnerabilities/pipelines/v2_importers/istio_importer.py @@ -28,6 +28,7 @@ from vulnerabilities.importer import ReferenceV2 from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 from vulnerabilities.utils import get_advisory_url +from vulnerabilities.utils import is_cve from vulnerabilities.utils import split_markdown_front_matter is_release = re.compile(r"^[\d.]+$", re.IGNORECASE).match @@ -88,6 +89,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: data.get("releases", []), GolangVersion ) cves = data.get("cves", []) + aliases = [cve for cve in cves if is_cve(cve)] affected_packages = [] if semver_constraints: @@ -107,6 +109,9 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: ) title = data.get("title") or "" + if not title.startswith("ISTIO-"): + self.log(f"Invalid advisory_id: {title}") + summary = data.get("description") or "" references = [] if title: @@ -119,7 +124,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: yield AdvisoryDataV2( advisory_id=title, - aliases=cves, + aliases=aliases, summary=summary, affected_packages=affected_packages, references=references, diff --git a/vulnerabilities/pipelines/v2_importers/mattermost_importer.py b/vulnerabilities/pipelines/v2_importers/mattermost_importer.py index 852939fd8..7bfa7280e 100644 --- a/vulnerabilities/pipelines/v2_importers/mattermost_importer.py +++ b/vulnerabilities/pipelines/v2_importers/mattermost_importer.py @@ -20,6 +20,7 @@ from vulnerabilities.importer import VulnerabilitySeverity from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 from vulnerabilities.utils import fetch_response +from vulnerabilities.utils import is_cve MM_REPO = { "Mattermost Mobile Apps": "mattermost-mobile", @@ -62,11 +63,17 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: return for advisory in data: - vuln_id = advisory.get("issue_id") - if not vuln_id or not vuln_id.startswith("MMSA-"): - self.log(f"Skipping advisory with missing issue_id. {vuln_id}") + issue_id = advisory.get("issue_id") or "" + cve_id = advisory.get("cve_id") or "" + + advisory_id, aliases = parse_vuln_ids(issue_id, cve_id) + + if not advisory_id: + self.log( + f"Skipping advisory with missing advisory_id. issue_id:{issue_id} cve_id:{cve_id}" + ) continue - cve_id = advisory.get("cve_id") + details = advisory.get("details") platform = advisory.get("platform") @@ -78,7 +85,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: affected_packages = [] severity = advisory.get("severity") if not package_name: - self.log(f"Unknown platform '{platform}' in advisory '{vuln_id}'.") + self.log(f"Unknown platform '{platform}' in advisory '{advisory_id}'.") else: package = PackageURL( @@ -105,7 +112,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: ) except Exception as e: self.log( - f"Error processing fixed versions '{fixed_versions}' for advisory '{vuln_id}': {e}" + f"Error processing fixed versions '{fixed_versions}' for advisory '{advisory_id}': {e}" ) severities = [] @@ -118,8 +125,8 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: ) yield AdvisoryDataV2( - advisory_id=vuln_id, - aliases=[cve_id], + advisory_id=advisory_id, + aliases=aliases, summary=details, references=[reference], affected_packages=affected_packages, @@ -127,3 +134,27 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: url=self.url, original_advisory_text=json.dumps(advisory, indent=2, ensure_ascii=False), ) + + +def parse_vuln_ids(issue_id, cve_id): + """ + Parses a raw issue_id, cve_id, validate and returns the advisory id and a list of all valid aliases. + """ + advisory_id = None + aliases = [] + + cve_id = cve_id.strip() + issue_id = issue_id.strip() + + for vuln_id in issue_id.split(","): + vuln_id = vuln_id.strip() + if vuln_id.startswith("MMSA-") or vuln_id.startswith("CVE-"): + aliases.append(vuln_id) + + if cve_id and is_cve(cve_id): + aliases.append(cve_id) + + if aliases: + advisory_id = aliases.pop(0) + + return advisory_id, aliases diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py index f67e26f6d..cdf65067e 100644 --- a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py @@ -13,6 +13,7 @@ import pytest from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import load_advisories +from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import parse_vuln_ids from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import process_record from vulnerabilities.tests import util_tests from vulnerabilities.tests.pipelines import TestLogger @@ -95,3 +96,82 @@ def test_load_advisories_package_with_invalid_alpine_version(test_case): } result = list(load_advisories(package, "v3.11", "main", archs=[], url="", logger=logger.write)) assert result != [] + + +@pytest.mark.parametrize( + "raw_input, expected_vuln_id, expected_aliases", + [ + ("CVE-2022-42332 XSA-427", "CVE-2022-42332", ["CVE-2022-42332", "XSA-427"]), + ( + "CVE-2022-42333 CVE-2022-43334 XSA-428", + "CVE-2022-42333", + ["CVE-2022-42333", "CVE-2022-43334", "XSA-428"], + ), + ( + "CVE-2020-11501 GNUTLS-SA-2020-03-31 CVE-2020-11501", + "CVE-2020-11501", + ["CVE-2020-11501", "GNUTLS-SA-2020-03-31", "CVE-2020-11501"], + ), + ("CVE_2019-2426", "CVE-2019-2426", ["CVE-2019-2426"]), + ( + "CVE-2024-22195 GHSA-h5c8-rqwp-cp95", + "CVE-2024-22195", + ["CVE-2024-22195", "GHSA-h5c8-rqwp-cp95"], + ), + ("CVE-2023-44441 ZDI-CAN-22093", "CVE-2023-44441", ["CVE-2023-44441", "ZDI-CAN-22093"]), + ("CVE-2022-45059 VSV00010", "CVE-2022-45059", ["CVE-2022-45059", "VSV00010"]), + ("CVE-2021-35940.patch", "CVE-2021-35940", ["CVE-2021-35940"]), + ("XSA-207", "XSA-207", ["XSA-207"]), + ("ALPINE-13661", "ALPINE-13661", ["ALPINE-13661"]), + ("GHSA-vv2x-vrpj-qqpq", "GHSA-vv2x-vrpj-qqpq", ["GHSA-vv2x-vrpj-qqpq"]), + ("CVE N/A ZBX-11023", "ZBX-11023", ["ZBX-11023"]), + ("CVE-2017-2616 (+ regression fix)", "CVE-2017-2616", ["CVE-2017-2616"]), + ( + "CVE-2020-14342 (Not affected, requires --with-systemd)", + "CVE-2020-14342", + ["CVE-2020-14342"], + ), + ("CVE-2017-16808 (AoE)", "CVE-2017-16808", ["CVE-2017-16808"]), + ("CVE-2018-14468 (FrameRelay)", "CVE-2018-14468", ["CVE-2018-14468"]), + ("CVE-2018-14469 (IKEv1)", "CVE-2018-14469", ["CVE-2018-14469"]), + ("CVE-2018-14470 (BABEL)", "CVE-2018-14470", ["CVE-2018-14470"]), + ("CVE-2018-14466 (AFS/RX)", "CVE-2018-14466", ["CVE-2018-14466"]), + ("CVE-2018-14461 (LDP)", "CVE-2018-14461", ["CVE-2018-14461"]), + ("CVE-2018-14462 (ICMP)", "CVE-2018-14462", ["CVE-2018-14462"]), + ("CVE-2018-14465 (RSVP)", "CVE-2018-14465", ["CVE-2018-14465"]), + ("CVE-2018-14881 (BGP)", "CVE-2018-14881", ["CVE-2018-14881"]), + ("CVE-2018-14464 (LMP)", "CVE-2018-14464", ["CVE-2018-14464"]), + ("CVE-2018-14463 (VRRP)", "CVE-2018-14463", ["CVE-2018-14463"]), + ("CVE-2018-14467 (BGP)", "CVE-2018-14467", ["CVE-2018-14467"]), + ( + "CVE-2018-10103 (SMB - partially fixed, but SMB printing disabled)", + "CVE-2018-10103", + ["CVE-2018-10103"], + ), + ( + "CVE-2018-10105 (SMB - too unreliably reproduced, SMB printing disabled)", + "CVE-2018-10105", + ["CVE-2018-10105"], + ), + ("CVE-2018-14880 (OSPF6)", "CVE-2018-14880", ["CVE-2018-14880"]), + ("CVE-2018-16451 (SMB)", "CVE-2018-16451", ["CVE-2018-16451"]), + ("CVE-2018-14882 (RPL)", "CVE-2018-14882", ["CVE-2018-14882"]), + ("CVE-2018-16227 (802.11)", "CVE-2018-16227", ["CVE-2018-16227"]), + ("CVE-2018-16229 (DCCP)", "CVE-2018-16229", ["CVE-2018-16229"]), + ("CVE-2018-16301 (was fixed in libpcap)", "CVE-2018-16301", ["CVE-2018-16301"]), + ("CVE-2018-16230 (BGP)", "CVE-2018-16230", ["CVE-2018-16230"]), + ("CVE-2018-16452 (SMB)", "CVE-2018-16452", ["CVE-2018-16452"]), + ("CVE-2018-16300 (BGP)", "CVE-2018-16300", ["CVE-2018-16300"]), + ("CVE-2018-16228 (HNCP)", "CVE-2018-16228", ["CVE-2018-16228"]), + ("CVE-2019-15166 (LMP)", "CVE-2019-15166", ["CVE-2019-15166"]), + ("CVE-2019-15167 (VRRP)", "CVE-2019-15167", ["CVE-2019-15167"]), + ("CVE-????-????? TS-2024-005", "TS-2024-005", ["TS-2024-005"]), + ("CVE-????-????? TS-2024-005", "TS-2024-005", ["TS-2024-005"]), + ("CVE-2018-14879 (tcpdump -V)", "CVE-2018-14879", ["CVE-2018-14879"]), + ("CVE-46838", None, []), # invalid CVE + ], +) +def test_parse_vuln_ids(raw_input, expected_vuln_id, expected_aliases): + vuln_id, aliases = parse_vuln_ids(raw_input) + assert vuln_id == expected_vuln_id + assert aliases == expected_aliases diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_istio_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_istio_importer_v2.py index 40fdf0812..eb815b5fe 100644 --- a/vulnerabilities/tests/pipelines/v2_importers/test_istio_importer_v2.py +++ b/vulnerabilities/tests/pipelines/v2_importers/test_istio_importer_v2.py @@ -19,7 +19,7 @@ @pytest.mark.django_db -def test_istio_advisory_parsing(): +def test_istio_advisory_parsing1(): sample_md = dedent( """\ --- @@ -79,3 +79,55 @@ def test_istio_advisory_parsing(): str(advisory.affected_packages[1].affected_version_range) == "vers:golang/>=1.0.0|<=1.0.8|>=1.1.0|<=1.1.9|>=1.2.0|<=1.2.1" ) + + +@pytest.mark.django_db +def test_istio_advisory_parsing2(): + sample_md = dedent( + """\ +--- +title: ISTIO-SECURITY-2021-004 +subtitle: Security Bulletin +description: Potential misuse of mTLS-only fields in AuthorizationPolicy with plain text traffic. +cves: [N/A] +cvss: "N/A" +vector: "" +releases: ["All releases 1.5 and later"] +publishdate: 2021-04-15 +keywords: [CVE] +skip_seealso: true +--- + +{{< security_bulletin >}} + +This is a security advisory for customers to check the authorization policy to make sure [mTLS (STRICT mode) is enabled](/docs/tasks/security/authentication/authn-policy/#globally-enabling-istio-mutual-tls-in-strict-mode) +when using [mTLS-only fields](/docs/concepts/security/#dependency-on-mutual-tls) in the authorization policy. +... + """ + ) + + with tempfile.TemporaryDirectory() as tmp_dir: + base_path = Path(tmp_dir) + advisory_dir = base_path / "content/en/news/security" + advisory_dir.mkdir(parents=True) + advisory_file = advisory_dir / "ISTIO-SECURITY-2021-004.md" + advisory_file.write_text(sample_md, encoding="utf-8") + + importer = IstioImporterPipeline() + importer.vcs_response = type( + "FakeVCS", (), {"dest_dir": tmp_dir, "delete": lambda x: None} + )() + + advisories = list(importer.collect_advisories()) + + assert len(advisories) == 1 + advisory = advisories[0] + + assert isinstance(advisory, AdvisoryDataV2) + assert advisory.advisory_id == "ISTIO-SECURITY-2021-004" + assert advisory.aliases == [] + assert advisory.summary.startswith( + "Potential misuse of mTLS-only fields in AuthorizationPolicy with plain text traffic" + ) + assert advisory.date_published.isoformat() == "2021-04-15T00:00:00+00:00" + assert advisory.url.endswith("ISTIO-SECURITY-2021-004.md") diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_mattermost_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_mattermost_importer_v2.py index f54deae28..06549b921 100644 --- a/vulnerabilities/tests/pipelines/v2_importers/test_mattermost_importer_v2.py +++ b/vulnerabilities/tests/pipelines/v2_importers/test_mattermost_importer_v2.py @@ -14,6 +14,7 @@ from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AdvisoryDataV2 from vulnerabilities.pipelines.v2_importers.mattermost_importer import MattermostImporterPipeline +from vulnerabilities.pipelines.v2_importers.mattermost_importer import parse_vuln_ids @pytest.fixture @@ -212,3 +213,24 @@ def json(self): importer.collect_advisories() assert call_count["count"] == 1 + + +@pytest.mark.parametrize( + "issue_id, cve_id, expected_advisory_id, expected_aliases", + [ + ("MMSA-2026-00624", "CVE-2026-3590", "MMSA-2026-00624", ["CVE-2026-3590"]), + ("MMSA-2026-00605", "", "MMSA-2026-00605", []), + ("MMSA-2021-0055,CVE-2021-37859", "", "MMSA-2021-0055", ["CVE-2021-37859"]), + ("MMSA-2021-0055, CVE-2021-37859", "", "MMSA-2021-0055", ["CVE-2021-37859"]), + ("MMSA-2024-00344", "n/a", "MMSA-2024-00344", []), + ("5.17.2.4", "", None, []), + ("na", "", None, []), + ("N/A", "", None, []), + ("Issue Identifier", "", None, []), + ("", "", None, []), + ], +) +def test_parse_vuln_ids(issue_id, cve_id, expected_advisory_id, expected_aliases): + advisory_id, aliases = parse_vuln_ids(issue_id, cve_id) + assert advisory_id == expected_advisory_id + assert aliases == expected_aliases diff --git a/vulnerabilities/tests/test_data_migrations.py b/vulnerabilities/tests/test_data_migrations.py index c32abb83f..406ace381 100644 --- a/vulnerabilities/tests/test_data_migrations.py +++ b/vulnerabilities/tests/test_data_migrations.py @@ -6,6 +6,7 @@ # See https://github.com/aboutcode-org/vulnerablecode for support or download. # See https://aboutcode.org for more information about nexB OSS projects. # + from datetime import datetime from django.apps import apps @@ -1088,3 +1089,98 @@ def test_latest_is_actually_recent(self): latest = AdvisoryV2.objects.get(avid="test_pipeline/test_adv", is_latest=True) self.assertEqual("New advisory", latest.summary) + + +class TestMalformedAliasesAVIDMigration(TestMigrations): + app_name = "vulnerabilities" + migrate_from = "0121_advisoryv2_is_latest_alter_advisoryv2_advisory_id_and_more" + migrate_to = "0122_advisoryv2_remove_malformed_aliases_and_dvisory_id" + raw_alias_inputs = [ + ("CVE-2023-1111", True), + ("GHSA-abcd-1234", True), + ("", False), + ("(not", False), + ("applicable)", False), + ("(BABEL)", False), + ("(was", False), + ("--with-systemd)", False), + ("fixed", False), + ("printing", False), + ("(AFS/RX)", False), + ("unreliably", False), + ("(ICMP)", False), + ("CVE", False), + ("(Not", False), + ("(RSVP)", False), + ("libpcap)", False), + ("(SMB", False), + ("fix)", False), + ("(DCCP)", False), + ("(HNCP)", False), + ("(+", False), + ("(IKEv1)", False), + ("(FrameRelay)", False), + ("XPTI", False), + ("CVE_2019-2426", False), + ("(BGP)", False), + ("disabled)", False), + ("(RPL)", False), + ("regression", False), + ("actually", False), + ("(VRRP)", False), + ("-V)", False), + ("2025-48379", False), + ("fixed,", False), + ("(802.11)", False), + ("affected,", False), + ("SMB", False), + ("(OSPF6)", False), + ("too", False), + ("partially", False), + ("in", False), + ("(SMB)", False), + ("but", False), + ("-", False), + ("(LDP)", False), + ("reproduced,", False), + ("N/A", False), + ("(tcpdump", False), + ("requires", False), + ("(AoE)", False), + ("(LMP)", False), + (" CVE-2025-55070", False), + ("n/a", False), + ("No CVE assigned", False), + ("- CVE-2026-26365", False), + ] + + def setUpBeforeMigration(self, apps): + AdvisoryV2 = apps.get_model("vulnerabilities", "AdvisoryV2") + AdvisoryAlias = apps.get_model("vulnerabilities", "AdvisoryAlias") + + for i, (raw_input, _) in enumerate(self.raw_alias_inputs): + adv = AdvisoryV2.objects.create( + unique_content_id=f"content_{i}", + url="https://example.com", + summary=f"Advisory for {raw_input}", + advisory_id=raw_input, + avid=f"test_pipeline/{raw_input}", + datasource_id="test_pipeline", + ) + alias = AdvisoryAlias.objects.create(alias=raw_input) + adv.aliases.add(alias) + + def test_migration_processes_malformed_aliases(self): + AdvisoryV2 = self.apps.get_model("vulnerabilities", "AdvisoryV2") + AdvisoryAlias = self.apps.get_model("vulnerabilities", "AdvisoryAlias") + + for i, (raw_input, expected_to_survive) in enumerate(self.raw_alias_inputs): + adv_exists = AdvisoryV2.objects.filter(unique_content_id=f"content_{i}").exists() + alias_exists = AdvisoryAlias.objects.filter(alias=raw_input).exists() + + if expected_to_survive: + assert adv_exists == True + assert alias_exists == True + else: + assert adv_exists == False + assert alias_exists == False