8000 Optimize vulnerabilities view by TG1999 · Pull Request #1728 · aboutcode-org/vulnerablecode · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Optimize vulnerabilities view #1728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vulnerabilities/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def to_dict(self):
def from_dict(cls, ref: dict):
return cls(
reference_id=ref["reference_id"],
reference_type=ref["reference_type"],
reference_type=ref.get("reference_type") or "",
url=ref["url"],
severities=[
VulnerabilitySeverity.from_dict(severity) for severity in ref["severities"]
Expand Down
144 changes: 141 additions & 3 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,23 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#

import csv
import hashlib
import json
import logging
import xml.etree.ElementTree as ET
from contextlib import suppress
from functools import cached_property
from itertools import groupby
from operator import attrgetter
from typing import Union

from cvss.exceptions import CVSS2MalformedError
from cvss.exceptions import CVSS3MalformedError
from cvss.exceptions import CVSS4MalformedError
from cwe2.database import Database
from cwe2.mappings import xml_database_path
from cwe2.weakness import Weakness as DBWeakness
from django.contrib.auth import get_user_model
from django.contrib.auth.models import UserManager
from django.core import exceptions
Expand All @@ -41,8 +50,8 @@
from univers.version_range import AlpineLinuxVersionRange
from univers.versions import Version

from aboutcode import hashid
from vulnerabilities import utils
from vulnerabilities.severity_systems import EPSS
from vulnerabilities.severity_systems import SCORING_SYSTEMS
from vulnerabilities.utils import normalize_purl
from vulnerabilities.utils import purl_to_dict
Expand Down Expand Up @@ -371,6 +380,127 @@ def get_related_purls(self):
"""
return [p.package_url for p in self.packages.distinct().all()]

def aggregate_fixed_and_affected_packages(self):
from vulnerabilities.utils import get_purl_version_class

sorted_fixed_by_packages = self.fixed_by_packages.filter(is_ghost=False).order_by(
"type", "namespace", "name", "qualifiers", "subpath"
)

if sorted_fixed_by_packages:
sorted_fixed_by_packages.first().calculate_version_rank

sorted_affected_packages = self.affected_packages.all()

if sorted_affected_packages:
sorted_affected_packages.first().calculate_version_rank

grouped_fixed_by_packages = {
key: list(group)
for key, group in groupby(
sorted_fixed_by_packages,
key=attrgetter("type", "namespace", "name", "qualifiers", "subpath"),
)
}

all_affected_fixed_by_matches = []

for sorted_affected_package in sorted_affected_packages:
affected_fixed_by_matches = {
"affected_package": sorted_affected_package,
"matched_fixed_by_packages": [],
}

# Build the key to find matching group
key = (
sorted_affected_package.type,
sorted_affected_package.namespace,
sorted_affected_package.name,
sorted_affected_package.qualifiers,
sorted_affected_package.subpath,
)

# Get matching group from pre-grouped fixed_by_packages
matching_fixed_packages = grouped_fixed_by_packages.get(key, [])

# Get version classes for comparison
affected_version_class = get_purl_version_class(sorted_affected_package)
affected_version = affected_version_class(sorted_affected_package.version)

# Compare versions and filter valid matches
matched_fixed_by_packages = [
fixed_by_package.purl
for fixed_by_package in matching_fixed_packages
if get_purl_version_class(fixed_by_package)(fixed_by_package.version)
> affected_version
]

affected_fixed_by_matches["matched_fixed_by_packages"] = matched_fixed_by_packages
all_affected_fixed_by_matches.append(affected_fixed_by_matches)
return sorted_fixed_by_packages, sorted_affected_packages, all_affected_fixed_by_matches

def get_severity_vectors_and_values(self):
"""
Collect severity vectors and values, excluding EPSS scoring systems and handling errors gracefully.
"""
severity_vectors = []
severity_values = set()

# Exclude EPSS scoring system
base_severities = self.severities.exclude(scoring_system=EPSS.identifier)

# QuerySet for severities with valid scoring_elements and scoring_system in SCORING_SYSTEMS
valid_scoring_severities = base_severities.filter(
scoring_elements__isnull=False, scoring_system__in=SCORING_SYSTEMS.keys()
)

for severity in valid_scoring_severities:
try:
vector_values = SCORING_SYSTEMS[severity.scoring_system].get(
severity.scoring_elements
)
if vector_values:
severity_vectors.append(vector_values)
except (
CVSS2MalformedError,
CVSS3MalformedError,
CVSS4MalformedError,
NotImplementedError,
) as e:
logging.error(f"CVSSMalformedError for {severity.scoring_elements}: {e}")

valid_value_severities = base_severities.filter(value__isnull=False).exclude(value="")

severity_values.update(valid_value_severities.values_list("value", flat=True))

return severity_vectors, severity_values


def get_cwes(self):
"""Yield CWE Weakness objects"""
for cwe_category in self.cwe_files:
cwe_category.seek(0)
reader = csv.DictReader(cwe_category)
for row in reader:
yield DBWeakness(*list(row.values())[0:-1])
tree = ET.parse(xml_database_path)
root = tree.getroot()
for tag_num in [1, 2]: # Categories , Views
tag = root[tag_num]
for child in tag:
yield DBWeakness(
*[
child.attrib["ID"],
child.attrib.get("Name"),
None,
child.attrib.get("Status"),
child[0].text,
]
)


Database.get_cwes = get_cwes


class Weakness(models.Model):
"""
Expand All @@ -379,7 +509,15 @@ class Weakness(models.Model):

cwe_id = models.IntegerField(help_text="CWE id")
vulnerabilities = models.ManyToManyField(Vulnerability, related_name="weaknesses")
db = Database()

cwe_by_id = {}

def get_cwe(self, cwe_id):
if not self.cwe_by_id:
db = Database()
for weakness in db.get_cwes():
self.cwe_by_id[str(weakness.cwe_id)] = weakness
return self.cwe_by_id[cwe_id]

@property
def cwe(self):
Expand All @@ -391,7 +529,7 @@ def weakness(self):
Return a queryset of Weakness for this vulnerability.
"""
try:
weakness = self.db.get(self.cwe_id)
weakness = self.get_cwe(str(self.cwe_id))
return weakness
except Exception as e:
logger.warning(f"Could not find CWE {self.cwe_id}: {e}")
Expand Down
Loading
0