diff --git a/vulnerabilities/importers/elixir_security.py b/vulnerabilities/importers/elixir_security.py index 3fe0ec15b..92143b754 100644 --- a/vulnerabilities/importers/elixir_security.py +++ b/vulnerabilities/importers/elixir_security.py @@ -6,14 +6,18 @@ # See https://github.com/aboutcode-org/vulnerablecode for support or download. # See https://aboutcode.org for more information about nexB OSS projects. # -import urllib.parse as urlparse +import logging +import os +import tempfile from pathlib import Path from typing import Set +import requests from dateutil import parser as dateparser from packageurl import PackageURL from univers.version_constraint import VersionConstraint from univers.version_range import HexVersionRange +from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -30,7 +34,22 @@ class ElixirSecurityImporter(Importer): spdx_license_expression = "CC0-1.0" importer_name = "Elixir Security Importer" + def __init__(self, purl=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl: + if self.purl.type != "hex": + print( + f"Warning: PURL type {self.purl.type} is not 'hex', may not match any advisories" + ) + def advisory_data(self) -> Set[AdvisoryData]: + if not self.purl: + return self._batch_advisory_data() + + return self._package_first_advisory_data() + + def _batch_advisory_data(self) -> Set[AdvisoryData]: try: self.clone(self.repo_url) base_path = Path(self.vcs_response.dest_dir) @@ -41,8 +60,77 @@ def advisory_data(self) -> Set[AdvisoryData]: if self.vcs_response: self.vcs_response.delete() - def process_file(self, file, base_path): - relative_path = str(file.relative_to(base_path)).strip("/") + def _package_first_advisory_data(self) -> Set[AdvisoryData]: + if self.purl.type != "hex": + logging.warning( + f"PURL type {self.purl.type} is not supported by Elixir Security importer" + ) + return [] + + package_name = self.purl.name + + try: + directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}" + response = requests.get(directory_url) + + if response.status_code != 200: + logging.info(f"No advisories found for {package_name} in Elixir Security Database") + return [] + + yaml_files = [file["path"] for file in response.json() if file["name"].endswith(".yml")] + + for file_path in yaml_files: + content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}" + content_response = requests.get( + content_url, headers={"Accept": "application/vnd.github.v3.raw"} + ) + + if content_response.status_code != 200: + logging.warning(f"Failed to fetch file content for {file_path}") + continue + + # Create a temporary file to store the content + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file: + temp_file.write(content_response.text) + temp_path = temp_file.name + + try: + for advisory in self.process_file(temp_path, Path(""), file_path=file_path): + if self.purl.version and not self._advisory_affects_version(advisory): + continue + + yield advisory + finally: + if os.path.exists(temp_path): + os.remove(temp_path) + + except Exception as e: + logging.error(f"Error fetching advisories for {self.purl}: {str(e)}") + return [] + + def _advisory_affects_version(self, advisory: AdvisoryData) -> bool: + if not self.purl.version: + return True + + for affected_package in advisory.affected_packages: + if affected_package.affected_version_range: + try: + purl_version = SemverVersion(self.purl.version) + + if purl_version in affected_package.affected_version_range: + return True + except Exception as e: + logging.warning(f"Failed to parse version {self.purl.version}: {str(e)}") + return True + + return False + + def process_file(self, file, base_path, file_path=None): + if file_path: + relative_path = file_path + else: + relative_path = str(Path(file).relative_to(base_path)).strip("/") + advisory_url = ( f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}" ) diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py index 384a2dafb..805941fdf 100644 --- a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py @@ -7,14 +7,18 @@ # See https://aboutcode.org for more information about nexB OSS projects. # +import os +import tempfile from pathlib import Path from typing import Iterable +import requests from dateutil import parser as dateparser from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL from univers.version_constraint import VersionConstraint from univers.version_range import HexVersionRange +from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -37,25 +41,65 @@ class ElixirSecurityImporterPipeline(VulnerableCodeBaseImporterPipelineV2): repo_url = "git+https://github.com/dependabot/elixir-security-advisories" unfurl_version_ranges = True + is_batch_run = True + + def __init__(self, *args, purl=None, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl: + ElixirSecurityImporterPipeline.is_batch_run = False + if self.purl.type != "hex": + self.log( + f"Warning: PURL type {self.purl.type} is not 'hex', may not match any advisories" + ) + @classmethod def steps(cls): + if not cls.is_batch_run: + return (cls.collect_and_store_advisories,) return (cls.clone, cls.collect_and_store_advisories, cls.clean_downloads) def clean_downloads(self): - if self.vcs_response: + if self.is_batch_run and self.vcs_response: self.log(f"Removing cloned repository") self.vcs_response.delete() def clone(self): - self.log(f"Cloning `{self.repo_url}`") - self.vcs_response = fetch_via_vcs(self.repo_url) + if self.is_batch_run: + self.log(f"Cloning `{self.repo_url}`") + self.vcs_response = fetch_via_vcs(self.repo_url) def advisories_count(self) -> int: + if not self.is_batch_run: + return self._count_package_advisories() + base_path = Path(self.vcs_response.dest_dir) count = len(list((base_path / "packages").glob("**/*.yml"))) return count + def _count_package_advisories(self) -> int: + if self.purl.type != "hex": + return 0 + + try: + directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{self.purl.name}" + response = requests.get(directory_url) + + if response.status_code != 200: + return 0 + + yaml_files = [file for file in response.json() if file["name"].endswith(".yml")] + return len(yaml_files) + except Exception: + return 0 + def collect_advisories(self) -> Iterable[AdvisoryData]: + if not self.is_batch_run: + return self._collect_package_advisories() + + return self._collect_batch_advisories() + + def _collect_batch_advisories(self) -> Iterable[AdvisoryData]: try: base_path = Path(self.vcs_response.dest_dir) vuln = base_path / "packages" @@ -65,11 +109,84 @@ def collect_advisories(self) -> Iterable[AdvisoryData]: if self.vcs_response: self.vcs_response.delete() - def process_file(self, file, base_path) -> Iterable[AdvisoryData]: - relative_path = str(file.relative_to(base_path)).strip("/") - path_segments = str(file).split("/") - # use the last two segments as the advisory ID - advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") + def _collect_package_advisories(self) -> Iterable[AdvisoryData]: + if self.purl.type != "hex": + self.log(f"PURL type {self.purl.type} is not supported by Elixir Security importer") + return [] + + package_name = self.purl.name + + try: + directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}" + response = requests.get(directory_url) + + if response.status_code != 200: + self.log(f"No advisories found for {package_name} in Elixir Security Database") + return [] + + yaml_files = [file["path"] for file in response.json() if file["name"].endswith(".yml")] + + for file_path in yaml_files: + content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}" + content_response = requests.get( + content_url, headers={"Accept": "application/vnd.github.v3.raw"} + ) + + if content_response.status_code != 200: + self.log(f"Failed to fetch file content for {file_path}") + continue + + # Create a temporary file to store the content + with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file: + temp_file.write(content_response.text) + temp_path = temp_file.name + + try: + for advisory in self.process_file( + Path(temp_path), Path(""), file_path=file_path + ): + if self.purl.version and not self._advisory_affects_version(advisory): + continue + + yield advisory + finally: + if os.path.exists(temp_path): + os.remove(temp_path) + + except Exception as e: + self.log(f"Error fetching advisories for {self.purl}: {str(e)}") + return [] + + def _advisory_affects_version(self, advisory: AdvisoryData) -> bool: + if not self.purl.version: + return True + + for affected_package in advisory.affected_packages: + if affected_package.affected_version_range: + try: + purl_version = SemverVersion(self.purl.version) + + if purl_version in affected_package.affected_version_range: + return True + except Exception as e: + self.log(f"Failed to parse version {self.purl.version}: {str(e)}") + return True + + return False + + def process_file(self, file, base_path, file_path=None) -> Iterable[AdvisoryData]: + if file_path: + relative_path = file_path + advisory_id = ( + file_path.replace(".yml", "").split("/")[-2] + + "/" + + file_path.replace(".yml", "").split("/")[-1] + ) + else: + relative_path = str(file.relative_to(base_path)).strip("/") + path_segments = str(file).split("/") + advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") + advisory_url = ( f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}" ) diff --git a/vulnerabilities/tests/test_elixir_security.py b/vulnerabilities/tests/test_elixir_security.py index 2531ed695..f4c59f0f5 100644 --- a/vulnerabilities/tests/test_elixir_security.py +++ b/vulnerabilities/tests/test_elixir_security.py @@ -10,8 +10,11 @@ import json import os from pathlib import Path +from unittest.mock import Mock from unittest.mock import patch +from packageurl import PackageURL + from vulnerabilities.importer import AdvisoryData from vulnerabilities.importers.elixir_security import ElixirSecurityImporter from vulnerabilities.improvers.default import DefaultImprover @@ -59,3 +62,105 @@ def test_elixir_improver(mock_response): result.extend(inference) expected_file = os.path.join(TEST_DIR, f"elixir-improver-expected.json") util_tests.check_results_against_json(result, expected_file) + + +@patch("requests.get") +def test_elixir_package_first_mode_success(mock_get): + directory_response = Mock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + test_file_path = os.path.join(TEST_DIR, "test_file.yml") + with open(test_file_path, "r") as f: + test_content = f.read() + + content_response = Mock() + content_response.status_code = 200 + content_response.text = test_content + + mock_get.side_effect = [directory_response, content_response] + + purl = PackageURL(type="hex", name="coherence") + importer = ElixirSecurityImporter(purl=purl) + + advisories = list(importer.advisory_data()) + + assert len(advisories) == 1 + advisory = advisories[0] + assert "CVE-2018-20301" in advisory.aliases + assert advisory.summary == 'The Coherence library has "Mass Assignment"-like vulnerabilities.' + assert len(advisory.affected_packages) == 1 + assert advisory.affected_packages[0].package.name == "coherence" + + +@patch("requests.get") +def test_elixir_package_first_mode_with_version_filter(mock_get): + directory_response = Mock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + test_file_path = os.path.join(TEST_DIR, "test_file.yml") + with open(test_file_path, "r") as f: + test_content = f.read() + + content_response = Mock() + content_response.status_code = 200 + content_response.text = test_content + + mock_get.side_effect = [directory_response, content_response] + + purl = PackageURL(type="hex", name="coherence", version="0.5.1") + importer = ElixirSecurityImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert len(advisories) == 1 + + mock_get.side_effect = [directory_response, content_response] + purl = PackageURL(type="hex", name="coherence", version="0.5.2") + importer = ElixirSecurityImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert len(advisories) == 0 + + +@patch("requests.get") +def test_elixir_package_first_mode_no_advisories(mock_get): + mock_response = Mock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + purl = PackageURL(type="hex", name="nonexistent-package") + importer = ElixirSecurityImporter(purl=purl) + + advisories = list(importer.advisory_data()) + assert len(advisories) == 0 + + +@patch("requests.get") +def test_elixir_package_first_mode_api_error(mock_get): + directory_response = Mock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + content_response = Mock() + content_response.status_code = 500 + + mock_get.side_effect = [directory_response, content_response] + + purl = PackageURL(type="hex", name="coherence") + importer = ElixirSecurityImporter(purl=purl) + + advisories = list(importer.advisory_data()) + assert len(advisories) == 0 + + +def test_elixir_package_first_mode_non_hex_purl(): + purl = PackageURL(type="npm", name="some-package") + importer = ElixirSecurityImporter(purl=purl) + + advisories = list(importer.advisory_data()) + assert len(advisories) == 0