cve.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. #!/usr/bin/env python3
  2. # Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
  3. # Copyright (C) 2020 by Gregory CLEMENT <gregory.clement@bootlin.com>
  4. #
  5. # This program is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation; either version 2 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. import datetime
  19. import os
  20. import requests # URL checking
  21. import distutils.version
  22. import time
  23. import subprocess
  24. import sys
  25. import operator
  26. try:
  27. import ijson
  28. # backend is a module in < 2.5, a string in >= 2.5
  29. if 'python' in getattr(ijson.backend, '__name__', ijson.backend):
  30. try:
  31. import ijson.backends.yajl2_cffi as ijson
  32. except ImportError:
  33. sys.stderr.write('Warning: Using slow ijson python backend\n')
  34. except ImportError:
  35. sys.stderr.write("You need ijson to parse NVD for CVE check\n")
  36. exit(1)
  37. sys.path.append('utils/')
  38. NVD_START_YEAR = 1999
  39. NVD_BASE_URL = "https://github.com/fkie-cad/nvd-json-data-feeds/releases/latest/download"
  40. ops = {
  41. '>=': operator.ge,
  42. '>': operator.gt,
  43. '<=': operator.le,
  44. '<': operator.lt,
  45. '=': operator.eq
  46. }
  47. # Check if two CPE IDs match each other
  48. def cpe_matches(cpe1, cpe2):
  49. cpe1_elems = cpe1.split(":")
  50. cpe2_elems = cpe2.split(":")
  51. remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1],
  52. zip(cpe1_elems, cpe2_elems))
  53. return len(list(remains)) == 0
  54. def cpe_product(cpe):
  55. return cpe.split(':')[4]
  56. def cpe_version(cpe):
  57. return cpe.split(':')[5]
  58. class CVE:
  59. """An accessor class for CVE Items in NVD files"""
  60. CVE_AFFECTS = 1
  61. CVE_DOESNT_AFFECT = 2
  62. CVE_UNKNOWN = 3
  63. def __init__(self, nvd_cve):
  64. """Initialize a CVE from its NVD JSON representation"""
  65. self.nvd_cve = nvd_cve
  66. @staticmethod
  67. def download_nvd_year(nvd_path, year):
  68. metaf = "CVE-%s.meta" % year
  69. path_metaf = os.path.join(nvd_path, metaf)
  70. jsonf_xz = "CVE-%s.json.xz" % year
  71. path_jsonf_xz = os.path.join(nvd_path, jsonf_xz)
  72. # If the database file is less than a day old, we assume the NVD data
  73. # locally available is recent enough.
  74. if os.path.exists(path_jsonf_xz) and os.stat(path_jsonf_xz).st_mtime >= time.time() - 86400:
  75. return path_jsonf_xz
  76. # If not, we download the meta file
  77. url = "%s/%s" % (NVD_BASE_URL, metaf)
  78. print("Getting %s" % url)
  79. page_meta = requests.get(url)
  80. page_meta.raise_for_status()
  81. # If the meta file already existed, we compare the existing
  82. # one with the data newly downloaded. If they are different,
  83. # we need to re-download the database.
  84. # If the database does not exist locally, we need to redownload it in
  85. # any case.
  86. if os.path.exists(path_metaf) and os.path.exists(path_jsonf_xz):
  87. meta_known = open(path_metaf, "r").read()
  88. if page_meta.text == meta_known:
  89. return path_jsonf_xz
  90. # Grab the compressed JSON NVD, and write files to disk
  91. url = "%s/%s" % (NVD_BASE_URL, jsonf_xz)
  92. print("Getting %s" % url)
  93. page_json = requests.get(url)
  94. page_json.raise_for_status()
  95. open(path_jsonf_xz, "wb").write(page_json.content)
  96. open(path_metaf, "w").write(page_meta.text)
  97. return path_jsonf_xz
  98. @staticmethod
  99. def sort_id(cve_ids):
  100. def cve_key(cve_id):
  101. year, id_ = cve_id.split('-')[1:]
  102. return (int(year), int(id_))
  103. return sorted(cve_ids, key=cve_key)
  104. @classmethod
  105. def read_nvd_dir(cls, nvd_dir):
  106. """
  107. Iterate over all the CVEs contained in NIST Vulnerability Database
  108. feeds since NVD_START_YEAR. If the files are missing or outdated in
  109. nvd_dir, a fresh copy will be downloaded, and kept in .json.gz
  110. """
  111. for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1):
  112. filename = CVE.download_nvd_year(nvd_dir, year)
  113. try:
  114. uncompressed = subprocess.check_output(["xz", "-d", "-c", filename])
  115. content = ijson.items(uncompressed, 'cve_items.item')
  116. except: # noqa: E722
  117. print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename)
  118. raise
  119. for cve in content:
  120. yield cls(cve)
  121. def each_product(self):
  122. """Iterate over each product section of this cve"""
  123. for vendor in self.nvd_cve['cve']['affects']['vendor']['vendor_data']:
  124. for product in vendor['product']['product_data']:
  125. yield product
  126. def parse_node(self, node):
  127. """
  128. Parse the node inside the configurations section to extract the
  129. cpe information usefull to know if a product is affected by
  130. the CVE. Actually only the product name and the version
  131. descriptor are needed, but we also provide the vendor name.
  132. """
  133. # The node containing the cpe entries matching the CVE can also
  134. # contain sub-nodes, so we need to manage it.
  135. for child in node.get('children', ()):
  136. for parsed_node in self.parse_node(child):
  137. yield parsed_node
  138. for cpe in node.get('cpeMatch', ()):
  139. if not cpe['vulnerable']:
  140. return
  141. product = cpe_product(cpe['criteria'])
  142. version = cpe_version(cpe['criteria'])
  143. # ignore when product is '-', which means N/A
  144. if product == '-':
  145. return
  146. op_start = ''
  147. op_end = ''
  148. v_start = ''
  149. v_end = ''
  150. if version != '*' and version != '-':
  151. # Version is defined, this is a '=' match
  152. op_start = '='
  153. v_start = version
  154. else:
  155. # Parse start version, end version and operators
  156. if 'versionStartIncluding' in cpe:
  157. op_start = '>='
  158. v_start = cpe['versionStartIncluding']
  159. if 'versionStartExcluding' in cpe:
  160. op_start = '>'
  161. v_start = cpe['versionStartExcluding']
  162. if 'versionEndIncluding' in cpe:
  163. op_end = '<='
  164. v_end = cpe['versionEndIncluding']
  165. if 'versionEndExcluding' in cpe:
  166. op_end = '<'
  167. v_end = cpe['versionEndExcluding']
  168. yield {
  169. 'id': cpe['criteria'],
  170. 'v_start': v_start,
  171. 'op_start': op_start,
  172. 'v_end': v_end,
  173. 'op_end': op_end
  174. }
  175. def each_cpe(self):
  176. for nodes in self.nvd_cve.get('configurations', []):
  177. for node in nodes['nodes']:
  178. for cpe in self.parse_node(node):
  179. yield cpe
  180. @property
  181. def identifier(self):
  182. """The CVE unique identifier"""
  183. return self.nvd_cve['id']
  184. @property
  185. def affected_products(self):
  186. """The set of CPE products referred by this CVE definition"""
  187. return set(cpe_product(p['id']) for p in self.each_cpe())
  188. def affects(self, name, version, cve_ignore_list, cpeid=None):
  189. """
  190. True if the Buildroot Package object passed as argument is affected
  191. by this CVE.
  192. """
  193. if self.identifier in cve_ignore_list:
  194. return self.CVE_DOESNT_AFFECT
  195. pkg_version = distutils.version.LooseVersion(version)
  196. if not hasattr(pkg_version, "version"):
  197. print("Cannot parse package '%s' version '%s'" % (name, version))
  198. pkg_version = None
  199. # if we don't have a cpeid, build one based on name and version
  200. if not cpeid:
  201. cpeid = "cpe:2.3:*:*:%s:%s:*:*:*:*:*:*:*" % (name, version)
  202. # if we have a cpeid, use its version instead of the package
  203. # version, as they might be different due to
  204. # <pkg>_CPE_ID_VERSION
  205. else:
  206. pkg_version = distutils.version.LooseVersion(cpe_version(cpeid))
  207. for cpe in self.each_cpe():
  208. if not cpe_matches(cpe['id'], cpeid):
  209. continue
  210. if not cpe['v_start'] and not cpe['v_end']:
  211. return self.CVE_AFFECTS
  212. if not pkg_version:
  213. continue
  214. if cpe['v_start']:
  215. try:
  216. cve_affected_version = distutils.version.LooseVersion(cpe['v_start'])
  217. inrange = ops.get(cpe['op_start'])(pkg_version, cve_affected_version)
  218. except TypeError:
  219. return self.CVE_UNKNOWN
  220. # current package version is before v_start, so we're
  221. # not affected by the CVE
  222. if not inrange:
  223. continue
  224. if cpe['v_end']:
  225. try:
  226. cve_affected_version = distutils.version.LooseVersion(cpe['v_end'])
  227. inrange = ops.get(cpe['op_end'])(pkg_version, cve_affected_version)
  228. except TypeError:
  229. return self.CVE_UNKNOWN
  230. # current package version is after v_end, so we're
  231. # not affected by the CVE
  232. if not inrange:
  233. continue
  234. # We're in the version range affected by this CVE
  235. return self.CVE_AFFECTS
  236. return self.CVE_DOESNT_AFFECT