cve.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #!/usr/bin/env python
  2. # Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
  3. # Copyright (C) 2020 by Gregory CLEMENT <gregory.clement@bootlin.com>
  4. #
  5. # This program is free software; you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation; either version 2 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. # General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  18. import datetime
  19. import os
  20. import requests # URL checking
  21. import distutils.version
  22. import time
  23. import gzip
  24. import sys
  25. try:
  26. import ijson
  27. except ImportError:
  28. sys.stderr.write("You need ijson to parse NVD for CVE check\n")
  29. exit(1)
  30. sys.path.append('utils/')
  31. NVD_START_YEAR = 2002
  32. NVD_JSON_VERSION = "1.0"
  33. NVD_BASE_URL = "https://nvd.nist.gov/feeds/json/cve/" + NVD_JSON_VERSION
  34. class CVE:
  35. """An accessor class for CVE Items in NVD files"""
  36. CVE_AFFECTS = 1
  37. CVE_DOESNT_AFFECT = 2
  38. CVE_UNKNOWN = 3
  39. def __init__(self, nvd_cve):
  40. """Initialize a CVE from its NVD JSON representation"""
  41. self.nvd_cve = nvd_cve
  42. @staticmethod
  43. def download_nvd_year(nvd_path, year):
  44. metaf = "nvdcve-%s-%s.meta" % (NVD_JSON_VERSION, year)
  45. path_metaf = os.path.join(nvd_path, metaf)
  46. jsonf_gz = "nvdcve-%s-%s.json.gz" % (NVD_JSON_VERSION, year)
  47. path_jsonf_gz = os.path.join(nvd_path, jsonf_gz)
  48. # If the database file is less than a day old, we assume the NVD data
  49. # locally available is recent enough.
  50. if os.path.exists(path_jsonf_gz) and os.stat(path_jsonf_gz).st_mtime >= time.time() - 86400:
  51. return path_jsonf_gz
  52. # If not, we download the meta file
  53. url = "%s/%s" % (NVD_BASE_URL, metaf)
  54. print("Getting %s" % url)
  55. page_meta = requests.get(url)
  56. page_meta.raise_for_status()
  57. # If the meta file already existed, we compare the existing
  58. # one with the data newly downloaded. If they are different,
  59. # we need to re-download the database.
  60. # If the database does not exist locally, we need to redownload it in
  61. # any case.
  62. if os.path.exists(path_metaf) and os.path.exists(path_jsonf_gz):
  63. meta_known = open(path_metaf, "r").read()
  64. if page_meta.text == meta_known:
  65. return path_jsonf_gz
  66. # Grab the compressed JSON NVD, and write files to disk
  67. url = "%s/%s" % (NVD_BASE_URL, jsonf_gz)
  68. print("Getting %s" % url)
  69. page_json = requests.get(url)
  70. page_json.raise_for_status()
  71. open(path_jsonf_gz, "wb").write(page_json.content)
  72. open(path_metaf, "w").write(page_meta.text)
  73. return path_jsonf_gz
  74. @classmethod
  75. def read_nvd_dir(cls, nvd_dir):
  76. """
  77. Iterate over all the CVEs contained in NIST Vulnerability Database
  78. feeds since NVD_START_YEAR. If the files are missing or outdated in
  79. nvd_dir, a fresh copy will be downloaded, and kept in .json.gz
  80. """
  81. for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1):
  82. filename = CVE.download_nvd_year(nvd_dir, year)
  83. try:
  84. content = ijson.items(gzip.GzipFile(filename), 'CVE_Items.item')
  85. except: # noqa: E722
  86. print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename)
  87. raise
  88. for cve in content:
  89. yield cls(cve['cve'])
  90. def each_product(self):
  91. """Iterate over each product section of this cve"""
  92. for vendor in self.nvd_cve['affects']['vendor']['vendor_data']:
  93. for product in vendor['product']['product_data']:
  94. yield product
  95. @property
  96. def identifier(self):
  97. """The CVE unique identifier"""
  98. return self.nvd_cve['CVE_data_meta']['ID']
  99. @property
  100. def pkg_names(self):
  101. """The set of package names referred by this CVE definition"""
  102. return set(p['product_name'] for p in self.each_product())
  103. def affects(self, br_pkg):
  104. """
  105. True if the Buildroot Package object passed as argument is affected
  106. by this CVE.
  107. """
  108. if br_pkg.is_cve_ignored(self.identifier):
  109. return self.CVE_DOESNT_AFFECT
  110. for product in self.each_product():
  111. if product['product_name'] != br_pkg.name:
  112. continue
  113. for v in product['version']['version_data']:
  114. if v["version_affected"] == "=":
  115. if v["version_value"] == "-":
  116. return self.CVE_AFFECTS
  117. elif br_pkg.current_version == v["version_value"]:
  118. return self.CVE_AFFECTS
  119. elif v["version_affected"] == "<=":
  120. pkg_version = distutils.version.LooseVersion(br_pkg.current_version)
  121. if not hasattr(pkg_version, "version"):
  122. print("Cannot parse package '%s' version '%s'" % (br_pkg.name, br_pkg.current_version))
  123. continue
  124. cve_affected_version = distutils.version.LooseVersion(v["version_value"])
  125. if not hasattr(cve_affected_version, "version"):
  126. print("Cannot parse CVE affected version '%s'" % v["version_value"])
  127. continue
  128. try:
  129. affected = pkg_version <= cve_affected_version
  130. except TypeError:
  131. return self.CVE_UNKNOWN
  132. if affected:
  133. return self.CVE_AFFECTS
  134. else:
  135. return self.CVE_DOESNT_AFFECT
  136. else:
  137. print("version_affected: %s" % v['version_affected'])
  138. return self.CVE_DOESNT_AFFECT