Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from django.db import migrations
from django.db.models import Q


class Migration(migrations.Migration):
dependencies = [
("vulnerabilities", "0121_advisoryv2_is_latest_alter_advisoryv2_advisory_id_and_more"),
]

def drop_malformed_advisory_v2(apps, _):
AdvisoryV2 = apps.get_model("vulnerabilities", "AdvisoryV2")
AdvisoryAlias = apps.get_model("vulnerabilities", "AdvisoryAlias")

valid_alias_prefix = [
"cve-", "osv-", "xsa-", "vsv", "zbx-", "zf2", "vu#", "gms-", "usn-",
"sw-", "ss-", "ts-", "osvdb-", "ysa-", "se-core-", "pysec-", "alpine-",
"dw2", "go-", "mal-", "zdi-can", "asa-", "ezsa-", "ghsl-", "ghsa-",
"talos-", "srcclr-sid-", "bit-", "gnutls-", "rustsec-", "snyk-",
"temp-", "TYPO3-", "wnpa-sec-", "sa-core-", "skcsirt-", "flow-", "gsd-"
]
query = Q()
for alias_prefix in valid_alias_prefix:
query |= Q(alias__istartswith=alias_prefix)

malformed_aliases = AdvisoryAlias.objects.exclude(query)
AdvisoryV2.objects.filter(aliases__in=malformed_aliases).delete()
malformed_aliases.delete()

operations = [
migrations.RunPython(drop_malformed_advisory_v2, reverse_code=migrations.RunPython.noop),
]
56 changes: 54 additions & 2 deletions vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import json
import logging
import re
from pathlib import Path
from typing import Any
from typing import Iterable
Expand All @@ -28,6 +29,7 @@
from vulnerabilities.references import XsaReferenceV2
from vulnerabilities.references import ZbxReferenceV2
from vulnerabilities.utils import get_advisory_url
from vulnerabilities.utils import is_cve
from vulnerabilities.utils import load_json


Expand Down Expand Up @@ -163,8 +165,10 @@ def load_advisories(
# fixed_vulns is a list of strings and each string is a space-separated
# list of aliases and CVES
for vuln_ids in fixed_vulns:
aliases = vuln_ids.strip().split(" ")
vuln_id = aliases[0]
vuln_id, aliases = parse_vuln_ids(vuln_ids, logger=logger)

if not vuln_id:
continue

references = []
if vuln_id.startswith("XSA"):
Expand Down Expand Up @@ -248,3 +252,51 @@ def load_advisories(
url=url,
original_advisory_text=json.dumps(pkg_infos, indent=2, ensure_ascii=False),
)


PARENTHESES_RE = re.compile(r"\(.*?\)")


def parse_vuln_ids(vuln_ids_string, logger=print):
"""
Parses a raw vulnerability ids, removes parentheses and returns the vuln_id and a list of all valid aliases.
"""
vuln_ids = PARENTHESES_RE.sub("", vuln_ids_string)
if not vuln_ids_string:
return None, []

cleaned_vuln_ids = []
for alias in vuln_ids.split():
clean_alias = alias.replace("_", "-").replace(".patch", "")
cleaned_vuln_ids.append(clean_alias)

aliases = []
valid_prefixes = (
"XSA-",
"GHSL-",
"TALOS-",
"RUSTSEC-",
"GHSA-",
"GNUTLS-",
"VSV",
"ZDI-CAN-",
"DW",
"YSA-",
"ZBX-",
"ALPINE-",
"TS-",
"wnpa-sec-",
)
for alias in cleaned_vuln_ids:
if alias and (
(alias.startswith("CVE-") and is_cve(alias)) or alias.startswith(valid_prefixes)
):
aliases.append(alias)
else:
logger(f"Malformed aliases found: {alias}")

if not aliases:
return None, []

vuln_id = aliases[0]
return vuln_id, aliases
5 changes: 3 additions & 2 deletions vulnerabilities/pipelines/v2_importers/fireeye_importer_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,16 @@ def parse_advisory_data(raw_data, file_path, base_path) -> AdvisoryDataV2:
cve_refs = md_dict.get("## CVE Reference") or []
cve_ids = md_dict.get("## CVE ID") or []
cleaned_cve_ids = []
for line in cve_ids:
for line in cve_ids + cve_refs:
found_cves = find_all_cve(line)
cleaned_cve_ids.extend(found_cves)

references = md_dict.get("## References") or []
cwe_data = md_dict.get("## Common Weakness Enumeration") or []

advisory_id = file_path.stem
aliases = dedupe([cve.strip() for cve in cleaned_cve_ids + cve_refs])
aliases = dedupe([cve.strip() for cve in cleaned_cve_ids])

aliases = [aliase for aliase in aliases if aliase != advisory_id]
advisory_url = get_advisory_url(
file=file_path,
Expand Down
7 changes: 6 additions & 1 deletion vulnerabilities/pipelines/v2_importers/istio_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import get_advisory_url
from vulnerabilities.utils import is_cve
from vulnerabilities.utils import split_markdown_front_matter

is_release = re.compile(r"^[\d.]+$", re.IGNORECASE).match
Expand Down Expand Up @@ -88,6 +89,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
data.get("releases", []), GolangVersion
)
cves = data.get("cves", [])
aliases = [cve for cve in cves if is_cve(cve)]

affected_packages = []
if semver_constraints:
Expand All @@ -107,6 +109,9 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
)

title = data.get("title") or ""
if not title.startswith("ISTIO-"):
self.log(f"Invalid advisory_id: {title}")

summary = data.get("description") or ""
references = []
if title:
Expand All @@ -119,7 +124,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:

yield AdvisoryDataV2(
advisory_id=title,
aliases=cves,
aliases=aliases,
summary=summary,
affected_packages=affected_packages,
references=references,
Expand Down
47 changes: 39 additions & 8 deletions vulnerabilities/pipelines/v2_importers/mattermost_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import is_cve

MM_REPO = {
"Mattermost Mobile Apps": "mattermost-mobile",
Expand Down Expand Up @@ -62,11 +63,17 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
return

for advisory in data:
vuln_id = advisory.get("issue_id")
if not vuln_id or not vuln_id.startswith("MMSA-"):
self.log(f"Skipping advisory with missing issue_id. {vuln_id}")
issue_id = advisory.get("issue_id") or ""
cve_id = advisory.get("cve_id") or ""

advisory_id, aliases = parse_vuln_ids(issue_id, cve_id)

if not advisory_id:
self.log(
f"Skipping advisory with missing advisory_id. issue_id:{issue_id} cve_id:{cve_id}"
)
continue
cve_id = advisory.get("cve_id")

details = advisory.get("details")

platform = advisory.get("platform")
Expand All @@ -78,7 +85,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
affected_packages = []
severity = advisory.get("severity")
if not package_name:
self.log(f"Unknown platform '{platform}' in advisory '{vuln_id}'.")
self.log(f"Unknown platform '{platform}' in advisory '{advisory_id}'.")

else:
package = PackageURL(
Expand All @@ -105,7 +112,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
)
except Exception as e:
self.log(
f"Error processing fixed versions '{fixed_versions}' for advisory '{vuln_id}': {e}"
f"Error processing fixed versions '{fixed_versions}' for advisory '{advisory_id}': {e}"
)

severities = []
Expand All @@ -118,12 +125,36 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
)

yield AdvisoryDataV2(
advisory_id=vuln_id,
aliases=[cve_id],
advisory_id=advisory_id,
aliases=aliases,
summary=details,
references=[reference],
affected_packages=affected_packages,
severities=severities,
url=self.url,
original_advisory_text=json.dumps(advisory, indent=2, ensure_ascii=False),
)


def parse_vuln_ids(issue_id, cve_id):
"""
Parses a raw issue_id, cve_id, validate and returns the advisory id and a list of all valid aliases.
"""
advisory_id = None
aliases = []

cve_id = cve_id.strip()
issue_id = issue_id.strip()

for vuln_id in issue_id.split(","):
vuln_id = vuln_id.strip()
if vuln_id.startswith("MMSA-") or vuln_id.startswith("CVE-"):
aliases.append(vuln_id)

if cve_id and is_cve(cve_id):
aliases.append(cve_id)

if aliases:
advisory_id = aliases.pop(0)

return advisory_id, aliases
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pytest

from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import load_advisories
from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import parse_vuln_ids
from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import process_record
from vulnerabilities.tests import util_tests
from vulnerabilities.tests.pipelines import TestLogger
Expand Down Expand Up @@ -95,3 +96,82 @@ def test_load_advisories_package_with_invalid_alpine_version(test_case):
}
result = list(load_advisories(package, "v3.11", "main", archs=[], url="", logger=logger.write))
assert result != []


@pytest.mark.parametrize(
"raw_input, expected_vuln_id, expected_aliases",
[
("CVE-2022-42332 XSA-427", "CVE-2022-42332", ["CVE-2022-42332", "XSA-427"]),
(
"CVE-2022-42333 CVE-2022-43334 XSA-428",
"CVE-2022-42333",
["CVE-2022-42333", "CVE-2022-43334", "XSA-428"],
),
(
"CVE-2020-11501 GNUTLS-SA-2020-03-31 CVE-2020-11501",
"CVE-2020-11501",
["CVE-2020-11501", "GNUTLS-SA-2020-03-31", "CVE-2020-11501"],
),
("CVE_2019-2426", "CVE-2019-2426", ["CVE-2019-2426"]),
(
"CVE-2024-22195 GHSA-h5c8-rqwp-cp95",
"CVE-2024-22195",
["CVE-2024-22195", "GHSA-h5c8-rqwp-cp95"],
),
("CVE-2023-44441 ZDI-CAN-22093", "CVE-2023-44441", ["CVE-2023-44441", "ZDI-CAN-22093"]),
("CVE-2022-45059 VSV00010", "CVE-2022-45059", ["CVE-2022-45059", "VSV00010"]),
("CVE-2021-35940.patch", "CVE-2021-35940", ["CVE-2021-35940"]),
("XSA-207", "XSA-207", ["XSA-207"]),
("ALPINE-13661", "ALPINE-13661", ["ALPINE-13661"]),
("GHSA-vv2x-vrpj-qqpq", "GHSA-vv2x-vrpj-qqpq", ["GHSA-vv2x-vrpj-qqpq"]),
("CVE N/A ZBX-11023", "ZBX-11023", ["ZBX-11023"]),
("CVE-2017-2616 (+ regression fix)", "CVE-2017-2616", ["CVE-2017-2616"]),
(
"CVE-2020-14342 (Not affected, requires --with-systemd)",
"CVE-2020-14342",
["CVE-2020-14342"],
),
("CVE-2017-16808 (AoE)", "CVE-2017-16808", ["CVE-2017-16808"]),
("CVE-2018-14468 (FrameRelay)", "CVE-2018-14468", ["CVE-2018-14468"]),
("CVE-2018-14469 (IKEv1)", "CVE-2018-14469", ["CVE-2018-14469"]),
("CVE-2018-14470 (BABEL)", "CVE-2018-14470", ["CVE-2018-14470"]),
("CVE-2018-14466 (AFS/RX)", "CVE-2018-14466", ["CVE-2018-14466"]),
("CVE-2018-14461 (LDP)", "CVE-2018-14461", ["CVE-2018-14461"]),
("CVE-2018-14462 (ICMP)", "CVE-2018-14462", ["CVE-2018-14462"]),
("CVE-2018-14465 (RSVP)", "CVE-2018-14465", ["CVE-2018-14465"]),
("CVE-2018-14881 (BGP)", "CVE-2018-14881", ["CVE-2018-14881"]),
("CVE-2018-14464 (LMP)", "CVE-2018-14464", ["CVE-2018-14464"]),
("CVE-2018-14463 (VRRP)", "CVE-2018-14463", ["CVE-2018-14463"]),
("CVE-2018-14467 (BGP)", "CVE-2018-14467", ["CVE-2018-14467"]),
(
"CVE-2018-10103 (SMB - partially fixed, but SMB printing disabled)",
"CVE-2018-10103",
["CVE-2018-10103"],
),
(
"CVE-2018-10105 (SMB - too unreliably reproduced, SMB printing disabled)",
"CVE-2018-10105",
["CVE-2018-10105"],
),
("CVE-2018-14880 (OSPF6)", "CVE-2018-14880", ["CVE-2018-14880"]),
("CVE-2018-16451 (SMB)", "CVE-2018-16451", ["CVE-2018-16451"]),
("CVE-2018-14882 (RPL)", "CVE-2018-14882", ["CVE-2018-14882"]),
("CVE-2018-16227 (802.11)", "CVE-2018-16227", ["CVE-2018-16227"]),
("CVE-2018-16229 (DCCP)", "CVE-2018-16229", ["CVE-2018-16229"]),
("CVE-2018-16301 (was fixed in libpcap)", "CVE-2018-16301", ["CVE-2018-16301"]),
("CVE-2018-16230 (BGP)", "CVE-2018-16230", ["CVE-2018-16230"]),
("CVE-2018-16452 (SMB)", "CVE-2018-16452", ["CVE-2018-16452"]),
("CVE-2018-16300 (BGP)", "CVE-2018-16300", ["CVE-2018-16300"]),
("CVE-2018-16228 (HNCP)", "CVE-2018-16228", ["CVE-2018-16228"]),
("CVE-2019-15166 (LMP)", "CVE-2019-15166", ["CVE-2019-15166"]),
("CVE-2019-15167 (VRRP)", "CVE-2019-15167", ["CVE-2019-15167"]),
("CVE-????-????? TS-2024-005", "TS-2024-005", ["TS-2024-005"]),
("CVE-????-????? TS-2024-005", "TS-2024-005", ["TS-2024-005"]),
("CVE-2018-14879 (tcpdump -V)", "CVE-2018-14879", ["CVE-2018-14879"]),
("CVE-46838", None, []), # invalid CVE
],
)
def test_parse_vuln_ids(raw_input, expected_vuln_id, expected_aliases):
vuln_id, aliases = parse_vuln_ids(raw_input)
assert vuln_id == expected_vuln_id
assert aliases == expected_aliases
Loading