12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112 |
- #!/usr/bin/env python3
- # Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program; if not, write to the Free Software
- # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- import aiohttp
- import argparse
- import asyncio
- import datetime
- import fnmatch
- import os
- from collections import defaultdict
- import re
- import subprocess
- import requests # NVD database download
- import json
- import ijson
- import distutils.version
- import time
- import gzip
- import sys
- sys.path.append('utils/')
- from getdeveloperlib import parse_developers # noqa: E402
- NVD_START_YEAR = 2002
- NVD_JSON_VERSION = "1.0"
- NVD_BASE_URL = "https://nvd.nist.gov/feeds/json/cve/" + NVD_JSON_VERSION
- INFRA_RE = re.compile(r"\$\(eval \$\(([a-z-]*)-package\)\)")
- URL_RE = re.compile(r"\s*https?://\S*\s*$")
- RM_API_STATUS_ERROR = 1
- RM_API_STATUS_FOUND_BY_DISTRO = 2
- RM_API_STATUS_FOUND_BY_PATTERN = 3
- RM_API_STATUS_NOT_FOUND = 4
- CVE_AFFECTS = 1
- CVE_DOESNT_AFFECT = 2
- CVE_UNKNOWN = 3
- class Defconfig:
- def __init__(self, name, path):
- self.name = name
- self.path = path
- self.developers = None
- def set_developers(self, developers):
- """
- Fills in the .developers field
- """
- self.developers = [
- developer.name
- for developer in developers
- if developer.hasfile(self.path)
- ]
- def get_defconfig_list():
- """
- Builds the list of Buildroot defconfigs, returning a list of Defconfig
- objects.
- """
- return [
- Defconfig(name[:-len('_defconfig')], os.path.join('configs', name))
- for name in os.listdir('configs')
- if name.endswith('_defconfig')
- ]
- class Package:
- all_licenses = dict()
- all_license_files = list()
- all_versions = dict()
- all_ignored_cves = dict()
- # This is the list of all possible checks. Add new checks to this list so
- # a tool that post-processeds the json output knows the checks before
- # iterating over the packages.
- status_checks = ['cve', 'developers', 'hash', 'license',
- 'license-files', 'patches', 'pkg-check', 'url', 'version']
- def __init__(self, name, path):
- self.name = name
- self.path = path
- self.pkg_path = os.path.dirname(path)
- self.infras = None
- self.license = None
- self.has_license = False
- self.has_license_files = False
- self.has_hash = False
- self.patch_files = []
- self.warnings = 0
- self.current_version = None
- self.url = None
- self.url_worker = None
- self.cves = list()
- self.latest_version = {'status': RM_API_STATUS_ERROR, 'version': None, 'id': None}
- self.status = {}
- def pkgvar(self):
- return self.name.upper().replace("-", "_")
- def set_url(self):
- """
- Fills in the .url field
- """
- self.status['url'] = ("warning", "no Config.in")
- for filename in os.listdir(os.path.dirname(self.path)):
- if fnmatch.fnmatch(filename, 'Config.*'):
- fp = open(os.path.join(os.path.dirname(self.path), filename), "r")
- for config_line in fp:
- if URL_RE.match(config_line):
- self.url = config_line.strip()
- self.status['url'] = ("ok", "found")
- fp.close()
- return
- self.status['url'] = ("error", "missing")
- fp.close()
- @property
- def patch_count(self):
- return len(self.patch_files)
- @property
- def has_valid_infra(self):
- try:
- if self.infras[0][1] == 'virtual':
- return False
- except IndexError:
- return False
- return True
- def set_infra(self):
- """
- Fills in the .infras field
- """
- self.infras = list()
- with open(self.path, 'r') as f:
- lines = f.readlines()
- for l in lines:
- match = INFRA_RE.match(l)
- if not match:
- continue
- infra = match.group(1)
- if infra.startswith("host-"):
- self.infras.append(("host", infra[5:]))
- else:
- self.infras.append(("target", infra))
- def set_license(self):
- """
- Fills in the .status['license'] and .status['license-files'] fields
- """
- if not self.has_valid_infra:
- self.status['license'] = ("na", "no valid package infra")
- self.status['license-files'] = ("na", "no valid package infra")
- return
- var = self.pkgvar()
- self.status['license'] = ("error", "missing")
- self.status['license-files'] = ("error", "missing")
- if var in self.all_licenses:
- self.license = self.all_licenses[var]
- self.status['license'] = ("ok", "found")
- if var in self.all_license_files:
- self.status['license-files'] = ("ok", "found")
- def set_hash_info(self):
- """
- Fills in the .status['hash'] field
- """
- if not self.has_valid_infra:
- self.status['hash'] = ("na", "no valid package infra")
- self.status['hash-license'] = ("na", "no valid package infra")
- return
- hashpath = self.path.replace(".mk", ".hash")
- if os.path.exists(hashpath):
- self.status['hash'] = ("ok", "found")
- else:
- self.status['hash'] = ("error", "missing")
- def set_patch_count(self):
- """
- Fills in the .patch_count, .patch_files and .status['patches'] fields
- """
- if not self.has_valid_infra:
- self.status['patches'] = ("na", "no valid package infra")
- return
- pkgdir = os.path.dirname(self.path)
- for subdir, _, _ in os.walk(pkgdir):
- self.patch_files = fnmatch.filter(os.listdir(subdir), '*.patch')
- if self.patch_count == 0:
- self.status['patches'] = ("ok", "no patches")
- elif self.patch_count < 5:
- self.status['patches'] = ("warning", "some patches")
- else:
- self.status['patches'] = ("error", "lots of patches")
- def set_current_version(self):
- """
- Fills in the .current_version field
- """
- var = self.pkgvar()
- if var in self.all_versions:
- self.current_version = self.all_versions[var]
- def set_check_package_warnings(self):
- """
- Fills in the .warnings and .status['pkg-check'] fields
- """
- cmd = ["./utils/check-package"]
- pkgdir = os.path.dirname(self.path)
- self.status['pkg-check'] = ("error", "Missing")
- for root, dirs, files in os.walk(pkgdir):
- for f in files:
- if f.endswith(".mk") or f.endswith(".hash") or f == "Config.in" or f == "Config.in.host":
- cmd.append(os.path.join(root, f))
- o = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[1]
- lines = o.splitlines()
- for line in lines:
- m = re.match("^([0-9]*) warnings generated", line.decode())
- if m:
- self.warnings = int(m.group(1))
- if self.warnings == 0:
- self.status['pkg-check'] = ("ok", "no warnings")
- else:
- self.status['pkg-check'] = ("error", "{} warnings".format(self.warnings))
- return
- def is_cve_ignored(self, cve):
- """
- Tells if the CVE is ignored by the package
- """
- return cve in self.all_ignored_cves.get(self.pkgvar(), [])
- def set_developers(self, developers):
- """
- Fills in the .developers and .status['developers'] field
- """
- self.developers = [
- dev.name
- for dev in developers
- if dev.hasfile(self.path)
- ]
- if self.developers:
- self.status['developers'] = ("ok", "{} developers".format(len(self.developers)))
- else:
- self.status['developers'] = ("warning", "no developers")
- def is_status_ok(self, name):
- return self.status[name][0] == 'ok'
- def __eq__(self, other):
- return self.path == other.path
- def __lt__(self, other):
- return self.path < other.path
- def __str__(self):
- return "%s (path='%s', license='%s', license_files='%s', hash='%s', patches=%d)" % \
- (self.name, self.path, self.is_status_ok('license'),
- self.is_status_ok('license-files'), self.status['hash'], self.patch_count)
- class CVE:
- """An accessor class for CVE Items in NVD files"""
- def __init__(self, nvd_cve):
- """Initialize a CVE from its NVD JSON representation"""
- self.nvd_cve = nvd_cve
- @staticmethod
- def download_nvd_year(nvd_path, year):
- metaf = "nvdcve-%s-%s.meta" % (NVD_JSON_VERSION, year)
- path_metaf = os.path.join(nvd_path, metaf)
- jsonf_gz = "nvdcve-%s-%s.json.gz" % (NVD_JSON_VERSION, year)
- path_jsonf_gz = os.path.join(nvd_path, jsonf_gz)
- # If the database file is less than a day old, we assume the NVD data
- # locally available is recent enough.
- if os.path.exists(path_jsonf_gz) and os.stat(path_jsonf_gz).st_mtime >= time.time() - 86400:
- return path_jsonf_gz
- # If not, we download the meta file
- url = "%s/%s" % (NVD_BASE_URL, metaf)
- print("Getting %s" % url)
- page_meta = requests.get(url)
- page_meta.raise_for_status()
- # If the meta file already existed, we compare the existing
- # one with the data newly downloaded. If they are different,
- # we need to re-download the database.
- # If the database does not exist locally, we need to redownload it in
- # any case.
- if os.path.exists(path_metaf) and os.path.exists(path_jsonf_gz):
- meta_known = open(path_metaf, "r").read()
- if page_meta.text == meta_known:
- return path_jsonf_gz
- # Grab the compressed JSON NVD, and write files to disk
- url = "%s/%s" % (NVD_BASE_URL, jsonf_gz)
- print("Getting %s" % url)
- page_json = requests.get(url)
- page_json.raise_for_status()
- open(path_jsonf_gz, "wb").write(page_json.content)
- open(path_metaf, "w").write(page_meta.text)
- return path_jsonf_gz
- @classmethod
- def read_nvd_dir(cls, nvd_dir):
- """
- Iterate over all the CVEs contained in NIST Vulnerability Database
- feeds since NVD_START_YEAR. If the files are missing or outdated in
- nvd_dir, a fresh copy will be downloaded, and kept in .json.gz
- """
- for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1):
- filename = CVE.download_nvd_year(nvd_dir, year)
- try:
- content = ijson.items(gzip.GzipFile(filename), 'CVE_Items.item')
- except: # noqa: E722
- print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename)
- raise
- for cve in content:
- yield cls(cve['cve'])
- def each_product(self):
- """Iterate over each product section of this cve"""
- for vendor in self.nvd_cve['affects']['vendor']['vendor_data']:
- for product in vendor['product']['product_data']:
- yield product
- @property
- def identifier(self):
- """The CVE unique identifier"""
- return self.nvd_cve['CVE_data_meta']['ID']
- @property
- def pkg_names(self):
- """The set of package names referred by this CVE definition"""
- return set(p['product_name'] for p in self.each_product())
- def affects(self, br_pkg):
- """
- True if the Buildroot Package object passed as argument is affected
- by this CVE.
- """
- if br_pkg.is_cve_ignored(self.identifier):
- return CVE_DOESNT_AFFECT
- for product in self.each_product():
- if product['product_name'] != br_pkg.name:
- continue
- for v in product['version']['version_data']:
- if v["version_affected"] == "=":
- if br_pkg.current_version == v["version_value"]:
- return CVE_AFFECTS
- elif v["version_affected"] == "<=":
- pkg_version = distutils.version.LooseVersion(br_pkg.current_version)
- if not hasattr(pkg_version, "version"):
- print("Cannot parse package '%s' version '%s'" % (br_pkg.name, br_pkg.current_version))
- continue
- cve_affected_version = distutils.version.LooseVersion(v["version_value"])
- if not hasattr(cve_affected_version, "version"):
- print("Cannot parse CVE affected version '%s'" % v["version_value"])
- continue
- try:
- affected = pkg_version <= cve_affected_version
- except TypeError:
- return CVE_UNKNOWN
- if affected:
- return CVE_AFFECTS
- else:
- return CVE_DOESNT_AFFECT
- else:
- print("version_affected: %s" % v['version_affected'])
- return CVE_DOESNT_AFFECT
- def get_pkglist(npackages, package_list):
- """
- Builds the list of Buildroot packages, returning a list of Package
- objects. Only the .name and .path fields of the Package object are
- initialized.
- npackages: limit to N packages
- package_list: limit to those packages in this list
- """
- WALK_USEFUL_SUBDIRS = ["boot", "linux", "package", "toolchain"]
- WALK_EXCLUDES = ["boot/common.mk",
- "linux/linux-ext-.*.mk",
- "package/freescale-imx/freescale-imx.mk",
- "package/gcc/gcc.mk",
- "package/gstreamer/gstreamer.mk",
- "package/gstreamer1/gstreamer1.mk",
- "package/gtk2-themes/gtk2-themes.mk",
- "package/matchbox/matchbox.mk",
- "package/opengl/opengl.mk",
- "package/qt5/qt5.mk",
- "package/x11r7/x11r7.mk",
- "package/doc-asciidoc.mk",
- "package/pkg-.*.mk",
- "package/nvidia-tegra23/nvidia-tegra23.mk",
- "toolchain/toolchain-external/pkg-toolchain-external.mk",
- "toolchain/toolchain-external/toolchain-external.mk",
- "toolchain/toolchain.mk",
- "toolchain/helpers.mk",
- "toolchain/toolchain-wrapper.mk"]
- packages = list()
- count = 0
- for root, dirs, files in os.walk("."):
- rootdir = root.split("/")
- if len(rootdir) < 2:
- continue
- if rootdir[1] not in WALK_USEFUL_SUBDIRS:
- continue
- for f in files:
- if not f.endswith(".mk"):
- continue
- # Strip ending ".mk"
- pkgname = f[:-3]
- if package_list and pkgname not in package_list:
- continue
- pkgpath = os.path.join(root, f)
- skip = False
- for exclude in WALK_EXCLUDES:
- # pkgpath[2:] strips the initial './'
- if re.match(exclude, pkgpath[2:]):
- skip = True
- continue
- if skip:
- continue
- p = Package(pkgname, pkgpath)
- packages.append(p)
- count += 1
- if npackages and count == npackages:
- return packages
- return packages
- def package_init_make_info():
- # Fetch all variables at once
- variables = subprocess.check_output(["make", "BR2_HAVE_DOT_CONFIG=y", "-s", "printvars",
- "VARS=%_LICENSE %_LICENSE_FILES %_VERSION %_IGNORE_CVES"])
- variable_list = variables.decode().splitlines()
- # We process first the host package VERSION, and then the target
- # package VERSION. This means that if a package exists in both
- # target and host variants, with different values (eg. version
- # numbers (unlikely)), we'll report the target one.
- variable_list = [x[5:] for x in variable_list if x.startswith("HOST_")] + \
- [x for x in variable_list if not x.startswith("HOST_")]
- for l in variable_list:
- # Get variable name and value
- pkgvar, value = l.split("=")
- # Strip the suffix according to the variable
- if pkgvar.endswith("_LICENSE"):
- # If value is "unknown", no license details available
- if value == "unknown":
- continue
- pkgvar = pkgvar[:-8]
- Package.all_licenses[pkgvar] = value
- elif pkgvar.endswith("_LICENSE_FILES"):
- if pkgvar.endswith("_MANIFEST_LICENSE_FILES"):
- continue
- pkgvar = pkgvar[:-14]
- Package.all_license_files.append(pkgvar)
- elif pkgvar.endswith("_VERSION"):
- if pkgvar.endswith("_DL_VERSION"):
- continue
- pkgvar = pkgvar[:-8]
- Package.all_versions[pkgvar] = value
- elif pkgvar.endswith("_IGNORE_CVES"):
- pkgvar = pkgvar[:-12]
- Package.all_ignored_cves[pkgvar] = value.split()
- check_url_count = 0
- async def check_url_status(session, pkg, npkgs, retry=True):
- global check_url_count
- try:
- async with session.get(pkg.url) as resp:
- if resp.status >= 400:
- pkg.status['url'] = ("error", "invalid {}".format(resp.status))
- check_url_count += 1
- print("[%04d/%04d] %s" % (check_url_count, npkgs, pkg.name))
- return
- except (aiohttp.ClientError, asyncio.TimeoutError):
- if retry:
- return await check_url_status(session, pkg, npkgs, retry=False)
- else:
- pkg.status['url'] = ("error", "invalid (err)")
- check_url_count += 1
- print("[%04d/%04d] %s" % (check_url_count, npkgs, pkg.name))
- return
- pkg.status['url'] = ("ok", "valid")
- check_url_count += 1
- print("[%04d/%04d] %s" % (check_url_count, npkgs, pkg.name))
- async def check_package_urls(packages):
- tasks = []
- connector = aiohttp.TCPConnector(limit_per_host=5)
- async with aiohttp.ClientSession(connector=connector, trust_env=True) as sess:
- packages = [p for p in packages if p.status['url'][0] == 'ok']
- for pkg in packages:
- tasks.append(check_url_status(sess, pkg, len(packages)))
- await asyncio.wait(tasks)
- def check_package_latest_version_set_status(pkg, status, version, identifier):
- pkg.latest_version = {
- "status": status,
- "version": version,
- "id": identifier,
- }
- if pkg.latest_version['status'] == RM_API_STATUS_ERROR:
- pkg.status['version'] = ('warning', "Release Monitoring API error")
- elif pkg.latest_version['status'] == RM_API_STATUS_NOT_FOUND:
- pkg.status['version'] = ('warning', "Package not found on Release Monitoring")
- if pkg.latest_version['version'] is None:
- pkg.status['version'] = ('warning', "No upstream version available on Release Monitoring")
- elif pkg.latest_version['version'] != pkg.current_version:
- pkg.status['version'] = ('error', "The newer version {} is available upstream".format(pkg.latest_version['version']))
- else:
- pkg.status['version'] = ('ok', 'up-to-date')
- async def check_package_get_latest_version_by_distro(session, pkg, retry=True):
- url = "https://release-monitoring.org//api/project/Buildroot/%s" % pkg.name
- try:
- async with session.get(url) as resp:
- if resp.status != 200:
- return False
- data = await resp.json()
- version = data['version'] if 'version' in data else None
- check_package_latest_version_set_status(pkg,
- RM_API_STATUS_FOUND_BY_DISTRO,
- version,
- data['id'])
- return True
- except (aiohttp.ClientError, asyncio.TimeoutError):
- if retry:
- return await check_package_get_latest_version_by_distro(session, pkg, retry=False)
- else:
- return False
- async def check_package_get_latest_version_by_guess(session, pkg, retry=True):
- url = "https://release-monitoring.org/api/projects/?pattern=%s" % pkg.name
- try:
- async with session.get(url) as resp:
- if resp.status != 200:
- return False
- data = await resp.json()
- # filter projects that have the right name and a version defined
- projects = [p for p in data['projects'] if p['name'] == pkg.name and 'version' in p]
- projects.sort(key=lambda x: x['id'])
- if len(projects) > 0:
- check_package_latest_version_set_status(pkg,
- RM_API_STATUS_FOUND_BY_DISTRO,
- projects[0]['version'],
- projects[0]['id'])
- return True
- except (aiohttp.ClientError, asyncio.TimeoutError):
- if retry:
- return await check_package_get_latest_version_by_guess(session, pkg, retry=False)
- else:
- return False
- check_latest_count = 0
- async def check_package_latest_version_get(session, pkg, npkgs):
- global check_latest_count
- if await check_package_get_latest_version_by_distro(session, pkg):
- check_latest_count += 1
- print("[%04d/%04d] %s" % (check_latest_count, npkgs, pkg.name))
- return
- if await check_package_get_latest_version_by_guess(session, pkg):
- check_latest_count += 1
- print("[%04d/%04d] %s" % (check_latest_count, npkgs, pkg.name))
- return
- check_package_latest_version_set_status(pkg,
- RM_API_STATUS_NOT_FOUND,
- None, None)
- check_latest_count += 1
- print("[%04d/%04d] %s" % (check_latest_count, npkgs, pkg.name))
- async def check_package_latest_version(packages):
- """
- Fills in the .latest_version field of all Package objects
- This field is a dict and has the following keys:
- - status: one of RM_API_STATUS_ERROR,
- RM_API_STATUS_FOUND_BY_DISTRO, RM_API_STATUS_FOUND_BY_PATTERN,
- RM_API_STATUS_NOT_FOUND
- - version: string containing the latest version known by
- release-monitoring.org for this package
- - id: string containing the id of the project corresponding to this
- package, as known by release-monitoring.org
- """
- for pkg in [p for p in packages if not p.has_valid_infra]:
- pkg.status['version'] = ("na", "no valid package infra")
- tasks = []
- connector = aiohttp.TCPConnector(limit_per_host=5)
- async with aiohttp.ClientSession(connector=connector, trust_env=True) as sess:
- packages = [p for p in packages if p.has_valid_infra]
- for pkg in packages:
- tasks.append(check_package_latest_version_get(sess, pkg, len(packages)))
- await asyncio.wait(tasks)
- def check_package_cves(nvd_path, packages):
- if not os.path.isdir(nvd_path):
- os.makedirs(nvd_path)
- for cve in CVE.read_nvd_dir(nvd_path):
- for pkg_name in cve.pkg_names:
- if pkg_name in packages and cve.affects(packages[pkg_name]) == CVE_AFFECTS:
- packages[pkg_name].cves.append(cve.identifier)
- def calculate_stats(packages):
- stats = defaultdict(int)
- stats['packages'] = len(packages)
- for pkg in packages:
- # If packages have multiple infra, take the first one. For the
- # vast majority of packages, the target and host infra are the
- # same. There are very few packages that use a different infra
- # for the host and target variants.
- if len(pkg.infras) > 0:
- infra = pkg.infras[0][1]
- stats["infra-%s" % infra] += 1
- else:
- stats["infra-unknown"] += 1
- if pkg.is_status_ok('license'):
- stats["license"] += 1
- else:
- stats["no-license"] += 1
- if pkg.is_status_ok('license-files'):
- stats["license-files"] += 1
- else:
- stats["no-license-files"] += 1
- if pkg.is_status_ok('hash'):
- stats["hash"] += 1
- else:
- stats["no-hash"] += 1
- if pkg.latest_version['status'] == RM_API_STATUS_FOUND_BY_DISTRO:
- stats["rmo-mapping"] += 1
- else:
- stats["rmo-no-mapping"] += 1
- if not pkg.latest_version['version']:
- stats["version-unknown"] += 1
- elif pkg.latest_version['version'] == pkg.current_version:
- stats["version-uptodate"] += 1
- else:
- stats["version-not-uptodate"] += 1
- stats["patches"] += pkg.patch_count
- stats["total-cves"] += len(pkg.cves)
- if len(pkg.cves) != 0:
- stats["pkg-cves"] += 1
- return stats
- html_header = """
- <head>
- <script src=\"https://www.kryogenix.org/code/browser/sorttable/sorttable.js\"></script>
- <style type=\"text/css\">
- table {
- width: 100%;
- }
- td {
- border: 1px solid black;
- }
- td.centered {
- text-align: center;
- }
- td.wrong {
- background: #ff9a69;
- }
- td.correct {
- background: #d2ffc4;
- }
- td.nopatches {
- background: #d2ffc4;
- }
- td.somepatches {
- background: #ffd870;
- }
- td.lotsofpatches {
- background: #ff9a69;
- }
- td.good_url {
- background: #d2ffc4;
- }
- td.missing_url {
- background: #ffd870;
- }
- td.invalid_url {
- background: #ff9a69;
- }
- td.version-good {
- background: #d2ffc4;
- }
- td.version-needs-update {
- background: #ff9a69;
- }
- td.version-unknown {
- background: #ffd870;
- }
- td.version-error {
- background: #ccc;
- }
- </style>
- <title>Statistics of Buildroot packages</title>
- </head>
- <a href=\"#results\">Results</a><br/>
- <p id=\"sortable_hint\"></p>
- """
- html_footer = """
- </body>
- <script>
- if (typeof sorttable === \"object\") {
- document.getElementById(\"sortable_hint\").innerHTML =
- \"hint: the table can be sorted by clicking the column headers\"
- }
- </script>
- </html>
- """
- def infra_str(infra_list):
- if not infra_list:
- return "Unknown"
- elif len(infra_list) == 1:
- return "<b>%s</b><br/>%s" % (infra_list[0][1], infra_list[0][0])
- elif infra_list[0][1] == infra_list[1][1]:
- return "<b>%s</b><br/>%s + %s" % \
- (infra_list[0][1], infra_list[0][0], infra_list[1][0])
- else:
- return "<b>%s</b> (%s)<br/><b>%s</b> (%s)" % \
- (infra_list[0][1], infra_list[0][0],
- infra_list[1][1], infra_list[1][0])
- def boolean_str(b):
- if b:
- return "Yes"
- else:
- return "No"
- def dump_html_pkg(f, pkg):
- f.write(" <tr>\n")
- f.write(" <td>%s</td>\n" % pkg.path[2:])
- # Patch count
- td_class = ["centered"]
- if pkg.patch_count == 0:
- td_class.append("nopatches")
- elif pkg.patch_count < 5:
- td_class.append("somepatches")
- else:
- td_class.append("lotsofpatches")
- f.write(" <td class=\"%s\">%s</td>\n" %
- (" ".join(td_class), str(pkg.patch_count)))
- # Infrastructure
- infra = infra_str(pkg.infras)
- td_class = ["centered"]
- if infra == "Unknown":
- td_class.append("wrong")
- else:
- td_class.append("correct")
- f.write(" <td class=\"%s\">%s</td>\n" %
- (" ".join(td_class), infra_str(pkg.infras)))
- # License
- td_class = ["centered"]
- if pkg.is_status_ok('license'):
- td_class.append("correct")
- else:
- td_class.append("wrong")
- f.write(" <td class=\"%s\">%s</td>\n" %
- (" ".join(td_class), boolean_str(pkg.is_status_ok('license'))))
- # License files
- td_class = ["centered"]
- if pkg.is_status_ok('license-files'):
- td_class.append("correct")
- else:
- td_class.append("wrong")
- f.write(" <td class=\"%s\">%s</td>\n" %
- (" ".join(td_class), boolean_str(pkg.is_status_ok('license-files'))))
- # Hash
- td_class = ["centered"]
- if pkg.is_status_ok('hash'):
- td_class.append("correct")
- else:
- td_class.append("wrong")
- f.write(" <td class=\"%s\">%s</td>\n" %
- (" ".join(td_class), boolean_str(pkg.is_status_ok('hash'))))
- # Current version
- if len(pkg.current_version) > 20:
- current_version = pkg.current_version[:20] + "..."
- else:
- current_version = pkg.current_version
- f.write(" <td class=\"centered\">%s</td>\n" % current_version)
- # Latest version
- if pkg.latest_version['status'] == RM_API_STATUS_ERROR:
- td_class.append("version-error")
- if pkg.latest_version['version'] is None:
- td_class.append("version-unknown")
- elif pkg.latest_version['version'] != pkg.current_version:
- td_class.append("version-needs-update")
- else:
- td_class.append("version-good")
- if pkg.latest_version['status'] == RM_API_STATUS_ERROR:
- latest_version_text = "<b>Error</b>"
- elif pkg.latest_version['status'] == RM_API_STATUS_NOT_FOUND:
- latest_version_text = "<b>Not found</b>"
- else:
- if pkg.latest_version['version'] is None:
- latest_version_text = "<b>Found, but no version</b>"
- else:
- latest_version_text = "<a href=\"https://release-monitoring.org/project/%s\"><b>%s</b></a>" % \
- (pkg.latest_version['id'], str(pkg.latest_version['version']))
- latest_version_text += "<br/>"
- if pkg.latest_version['status'] == RM_API_STATUS_FOUND_BY_DISTRO:
- latest_version_text += "found by <a href=\"https://release-monitoring.org/distro/Buildroot/\">distro</a>"
- else:
- latest_version_text += "found by guess"
- f.write(" <td class=\"%s\">%s</td>\n" %
- (" ".join(td_class), latest_version_text))
- # Warnings
- td_class = ["centered"]
- if pkg.warnings == 0:
- td_class.append("correct")
- else:
- td_class.append("wrong")
- f.write(" <td class=\"%s\">%d</td>\n" %
- (" ".join(td_class), pkg.warnings))
- # URL status
- td_class = ["centered"]
- url_str = pkg.status['url'][1]
- if pkg.status['url'][0] in ("error", "warning"):
- td_class.append("missing_url")
- if pkg.status['url'][0] == "error":
- td_class.append("invalid_url")
- url_str = "<a href=%s>%s</a>" % (pkg.url, pkg.status['url'][1])
- else:
- td_class.append("good_url")
- url_str = "<a href=%s>Link</a>" % pkg.url
- f.write(" <td class=\"%s\">%s</td>\n" %
- (" ".join(td_class), url_str))
- # CVEs
- td_class = ["centered"]
- if len(pkg.cves) == 0:
- td_class.append("correct")
- else:
- td_class.append("wrong")
- f.write(" <td class=\"%s\">\n" % " ".join(td_class))
- for cve in pkg.cves:
- f.write(" <a href=\"https://security-tracker.debian.org/tracker/%s\">%s<br/>\n" % (cve, cve))
- f.write(" </td>\n")
- f.write(" </tr>\n")
- def dump_html_all_pkgs(f, packages):
- f.write("""
- <table class=\"sortable\">
- <tr>
- <td>Package</td>
- <td class=\"centered\">Patch count</td>
- <td class=\"centered\">Infrastructure</td>
- <td class=\"centered\">License</td>
- <td class=\"centered\">License files</td>
- <td class=\"centered\">Hash file</td>
- <td class=\"centered\">Current version</td>
- <td class=\"centered\">Latest version</td>
- <td class=\"centered\">Warnings</td>
- <td class=\"centered\">Upstream URL</td>
- <td class=\"centered\">CVEs</td>
- </tr>
- """)
- for pkg in sorted(packages):
- dump_html_pkg(f, pkg)
- f.write("</table>")
- def dump_html_stats(f, stats):
- f.write("<a id=\"results\"></a>\n")
- f.write("<table>\n")
- infras = [infra[6:] for infra in stats.keys() if infra.startswith("infra-")]
- for infra in infras:
- f.write(" <tr><td>Packages using the <i>%s</i> infrastructure</td><td>%s</td></tr>\n" %
- (infra, stats["infra-%s" % infra]))
- f.write(" <tr><td>Packages having license information</td><td>%s</td></tr>\n" %
- stats["license"])
- f.write(" <tr><td>Packages not having license information</td><td>%s</td></tr>\n" %
- stats["no-license"])
- f.write(" <tr><td>Packages having license files information</td><td>%s</td></tr>\n" %
- stats["license-files"])
- f.write(" <tr><td>Packages not having license files information</td><td>%s</td></tr>\n" %
- stats["no-license-files"])
- f.write(" <tr><td>Packages having a hash file</td><td>%s</td></tr>\n" %
- stats["hash"])
- f.write(" <tr><td>Packages not having a hash file</td><td>%s</td></tr>\n" %
- stats["no-hash"])
- f.write(" <tr><td>Total number of patches</td><td>%s</td></tr>\n" %
- stats["patches"])
- f.write("<tr><td>Packages having a mapping on <i>release-monitoring.org</i></td><td>%s</td></tr>\n" %
- stats["rmo-mapping"])
- f.write("<tr><td>Packages lacking a mapping on <i>release-monitoring.org</i></td><td>%s</td></tr>\n" %
- stats["rmo-no-mapping"])
- f.write("<tr><td>Packages that are up-to-date</td><td>%s</td></tr>\n" %
- stats["version-uptodate"])
- f.write("<tr><td>Packages that are not up-to-date</td><td>%s</td></tr>\n" %
- stats["version-not-uptodate"])
- f.write("<tr><td>Packages with no known upstream version</td><td>%s</td></tr>\n" %
- stats["version-unknown"])
- f.write("<tr><td>Packages affected by CVEs</td><td>%s</td></tr>\n" %
- stats["pkg-cves"])
- f.write("<tr><td>Total number of CVEs affecting all packages</td><td>%s</td></tr>\n" %
- stats["total-cves"])
- f.write("</table>\n")
- def dump_html_gen_info(f, date, commit):
- # Updated on Mon Feb 19 08:12:08 CET 2018, Git commit aa77030b8f5e41f1c53eb1c1ad664b8c814ba032
- f.write("<p><i>Updated on %s, git commit %s</i></p>\n" % (str(date), commit))
- def dump_html(packages, stats, date, commit, output):
- with open(output, 'w') as f:
- f.write(html_header)
- dump_html_all_pkgs(f, packages)
- dump_html_stats(f, stats)
- dump_html_gen_info(f, date, commit)
- f.write(html_footer)
- def dump_json(packages, defconfigs, stats, date, commit, output):
- # Format packages as a dictionnary instead of a list
- # Exclude local field that does not contains real date
- excluded_fields = ['url_worker', 'name']
- pkgs = {
- pkg.name: {
- k: v
- for k, v in pkg.__dict__.items()
- if k not in excluded_fields
- } for pkg in packages
- }
- defconfigs = {
- d.name: {
- k: v
- for k, v in d.__dict__.items()
- } for d in defconfigs
- }
- # Aggregate infrastructures into a single dict entry
- statistics = {
- k: v
- for k, v in stats.items()
- if not k.startswith('infra-')
- }
- statistics['infra'] = {k[6:]: v for k, v in stats.items() if k.startswith('infra-')}
- # The actual structure to dump, add commit and date to it
- final = {'packages': pkgs,
- 'stats': statistics,
- 'defconfigs': defconfigs,
- 'package_status_checks': Package.status_checks,
- 'commit': commit,
- 'date': str(date)}
- with open(output, 'w') as f:
- json.dump(final, f, indent=2, separators=(',', ': '))
- f.write('\n')
- def resolvepath(path):
- return os.path.abspath(os.path.expanduser(path))
- def parse_args():
- parser = argparse.ArgumentParser()
- output = parser.add_argument_group('output', 'Output file(s)')
- output.add_argument('--html', dest='html', type=resolvepath,
- help='HTML output file')
- output.add_argument('--json', dest='json', type=resolvepath,
- help='JSON output file')
- packages = parser.add_mutually_exclusive_group()
- packages.add_argument('-n', dest='npackages', type=int, action='store',
- help='Number of packages')
- packages.add_argument('-p', dest='packages', action='store',
- help='List of packages (comma separated)')
- parser.add_argument('--nvd-path', dest='nvd_path',
- help='Path to the local NVD database', type=resolvepath)
- args = parser.parse_args()
- if not args.html and not args.json:
- parser.error('at least one of --html or --json (or both) is required')
- return args
- def __main__():
- args = parse_args()
- if args.packages:
- package_list = args.packages.split(",")
- else:
- package_list = None
- date = datetime.datetime.utcnow()
- commit = subprocess.check_output(['git', 'rev-parse',
- 'HEAD']).splitlines()[0].decode()
- print("Build package list ...")
- packages = get_pkglist(args.npackages, package_list)
- print("Getting developers ...")
- developers = parse_developers()
- print("Build defconfig list ...")
- defconfigs = get_defconfig_list()
- for d in defconfigs:
- d.set_developers(developers)
- print("Getting package make info ...")
- package_init_make_info()
- print("Getting package details ...")
- for pkg in packages:
- pkg.set_infra()
- pkg.set_license()
- pkg.set_hash_info()
- pkg.set_patch_count()
- pkg.set_check_package_warnings()
- pkg.set_current_version()
- pkg.set_url()
- pkg.set_developers(developers)
- print("Checking URL status")
- loop = asyncio.get_event_loop()
- loop.run_until_complete(check_package_urls(packages))
- print("Getting latest versions ...")
- loop = asyncio.get_event_loop()
- loop.run_until_complete(check_package_latest_version(packages))
- if args.nvd_path:
- print("Checking packages CVEs")
- check_package_cves(args.nvd_path, {p.name: p for p in packages})
- print("Calculate stats")
- stats = calculate_stats(packages)
- if args.html:
- print("Write HTML")
- dump_html(packages, stats, date, commit, args.html)
- if args.json:
- print("Write JSON")
- dump_json(packages, defconfigs, stats, date, commit, args.json)
- __main__()
|