8000 Feat: Add CVE support to Snyk datasource by shravankshenoy · Pull Request #1405 · aboutcode-org/vulnerablecode · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Feat: Add CVE support to Snyk datasource #1405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import logging
from typing import Iterable
from urllib.parse import quote
from urllib.parse import unquote_plus

import requests
from bs4 import BeautifulSoup
from packageurl import PackageURL

from vulntotal.validator import DataSource
from vulntotal.validator import InvalidCVEError
from vulntotal.validator import VendorData
from vulntotal.vulntotal_utils import snyk_constraints_satisfied

Expand Down Expand Up @@ -70,6 +72,38 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
if advisory_html:
yield parse_html_advisory(advisory_html, snyk_id, affected, purl)

def datasource_advisory_from_cve(self, cve: str) -> Iterable[VendorData]:
"""
Fetch advisories from Snyk for a given CVE.

Parameters:
cve : CVE ID

Yields:
VendorData instance containing advisory information.
"""
if not cve.upper().startswith("CVE-"):
raise InvalidCVEError

package_list = generate_payload_from_cve(cve)
response = self.fetch(package_list)
self._raw_dump = [response]

# get list of vulnerabilities for cve id
vulns_list = parse_cve_advisory_html(response)

# for each vulnerability get fixed version from snyk_id_url, get affected version from package_advisory_url
for snyk_id, package_advisory_url in vulns_list.items():
package_advisories_list = self.fetch(package_advisory_url)
package_advisories = extract_html_json_advisories(package_advisories_list)
affected_versions = package_advisories[snyk_id]
advisory_payload = generate_advisory_payload(snyk_id)
advisory_html = self.fetch(advisory_payload)
self._raw_dump.append(advisory_html)
purl = generate_purl(package_advisory_url)
if advisory_html and purl:
yield parse_html_advisory(advisory_html, snyk_id, affected_versions, purl)

@classmethod
def supported_ecosystem(cls):
return {
Expand Down Expand Up @@ -132,6 +166,61 @@ def generate_package_advisory_url(purl):
)


def generate_purl(package_advisory_url):
"""
Generates purl from Package advisory url.

Parameters:
package_advisory_url: URL of the package on Snyk.

Returns:
A PackageURL instance representing the package
"""
package_advisory_url = unquote_plus(
package_advisory_url.replace("https://security.snyk.io/package/", "")
)
supported_ecosystems = {v: k for (k, v) in SnykDataSource.supported_ecosystem().items()}

package_url_split = package_advisory_url.split("/")
pkg_type = package_url_split[0]

pkg_name = None
namespace = None
qualifiers = {}

if pkg_type == "maven":
pkg_name = package_url_split[1].split(":")[1]
namespace = package_url_split[1].split(":")[0]

elif pkg_type == "composer":
pkg_name = package_url_split[-1]
namespace = package_url_split[-2]

elif pkg_type == "golang":
pkg_name = package_url_split[-1]
namespace = "/".join(package_url_split[1:-1])

elif pkg_type == "npm":
# handle scoped npm packages
if "@" in package_advisory_url:
namespace = package_url_split[-2]

pkg_name = package_url_split[-1]

elif pkg_type == "linux":
pkg_name = package_url_split[-1]
qualifiers["distro"] = package_url_split[1]

elif pkg_type in ("cocoapods", "hex", "nuget", "pip", "rubygems", "unmanaged"):
pkg_name = package_url_split[-1]

if pkg_type is None or pkg_name is None:
logger.error("Invalid package advisory url, package type or name is missing")
return

return PackageURL(type=supported_ecosystems[pkg_type], name=pkg_name, namespace=namespace)


def extract_html_json_advisories(package_advisories):
"""
Extract vulnerability information from HTML or JSON advisories.
Expand Down Expand Up @@ -204,9 +293,41 @@ def parse_html_advisory(advisory_html, snyk_id, affected, purl) -> VendorData:
)


def parse_cve_advisory_html(cve_advisory_html):
"""
Parse CVE HTML advisory from Snyk and extract list of vulnerabilities and corresponding packages for that CVE.

Parameters:
advisory_html: A string of HTML containing the vulnerabilities for given CVE.

Returns:
A dictionary with each item representing a vulnerability. Key of each item is the SNYK_ID and value is the package advisory url on snyk website
"""
cve_advisory_soup = BeautifulSoup(cve_advisory_html, "html.parser")
vulns_table = cve_advisory_soup.find("tbody", class_="vue--table__tbody")
if not vulns_table:
return None
vulns_rows = vulns_table.find_all("tr", class_="vue--table__row")
vulns_list = {}

for row in vulns_rows:
anchors = row.find_all("a", {"class": "vue--anchor"})
if len(anchors) != 2:
continue
snyk_id = anchors[0]["href"].split("/")[1]
package_advisory_url = f"https://security.snyk.io{anchors[1]['href']}"
vulns_list[snyk_id] = package_advisory_url

return vulns_list


def is_purl_in_affected(version, affected):
return any(snyk_constraints_satisfied(affected_range, version) for affected_range in affected)


def generate_advisory_payload(snyk_id):
return f"https://security.snyk.io/vuln/{snyk_id}"


def generate_payload_from_cve(cve_id):
return f"https://security.snyk.io/vuln?search={cve_id}"
Loading
0