cpedb.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. #!/usr/bin/env python3
  2. import xml.etree.ElementTree as ET
  3. from xml.etree.ElementTree import Element, SubElement
  4. import gzip
  5. import os
  6. import pickle
  7. import requests
  8. import time
  9. from xml.dom import minidom
  10. VALID_REFS = ['VENDOR', 'VERSION', 'CHANGE_LOG', 'PRODUCT', 'PROJECT', 'ADVISORY']
  11. CPEDB_URL = "https://static.nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz"
  12. ns = {
  13. '': 'http://cpe.mitre.org/dictionary/2.0',
  14. 'cpe-23': 'http://scap.nist.gov/schema/cpe-extension/2.3',
  15. 'xml': 'http://www.w3.org/XML/1998/namespace'
  16. }
  17. class CPE:
  18. def __init__(self, cpe_str, titles, refs):
  19. self.cpe_str = cpe_str
  20. self.titles = titles
  21. self.references = refs
  22. self.cpe_cur_ver = "".join(self.cpe_str.split(":")[5:6])
  23. def update_xml_dict(self):
  24. ET.register_namespace('', 'http://cpe.mitre.org/dictionary/2.0')
  25. cpes = Element('cpe-list')
  26. cpes.set('xmlns:cpe-23', "http://scap.nist.gov/schema/cpe-extension/2.3")
  27. cpes.set('xmlns:ns6', "http://scap.nist.gov/schema/scap-core/0.1")
  28. cpes.set('xmlns:scap-core', "http://scap.nist.gov/schema/scap-core/0.3")
  29. cpes.set('xmlns:config', "http://scap.nist.gov/schema/configuration/0.1")
  30. cpes.set('xmlns:xsi', "http://www.w3.org/2001/XMLSchema-instance")
  31. cpes.set('xmlns:meta', "http://scap.nist.gov/schema/cpe-dictionary-metadata/0.2")
  32. cpes.set('xsi:schemaLocation', " ".join(["http://scap.nist.gov/schema/cpe-extension/2.3",
  33. "https://scap.nist.gov/schema/cpe/2.3/cpe-dictionary-extension_2.3.xsd",
  34. "http://cpe.mitre.org/dictionary/2.0",
  35. "https://scap.nist.gov/schema/cpe/2.3/cpe-dictionary_2.3.xsd",
  36. "http://scap.nist.gov/schema/cpe-dictionary-metadata/0.2",
  37. "https://scap.nist.gov/schema/cpe/2.1/cpe-dictionary-metadata_0.2.xsd",
  38. "http://scap.nist.gov/schema/scap-core/0.3",
  39. "https://scap.nist.gov/schema/nvd/scap-core_0.3.xsd",
  40. "http://scap.nist.gov/schema/configuration/0.1",
  41. "https://scap.nist.gov/schema/nvd/configuration_0.1.xsd",
  42. "http://scap.nist.gov/schema/scap-core/0.1",
  43. "https://scap.nist.gov/schema/nvd/scap-core_0.1.xsd"]))
  44. item = SubElement(cpes, 'cpe-item')
  45. cpe_short_name = CPE.short_name(self.cpe_str)
  46. cpe_new_ver = CPE.version_update(self.cpe_str)
  47. item.set('name', 'cpe:/' + cpe_short_name)
  48. self.titles[0].text.replace(self.cpe_cur_ver, cpe_new_ver)
  49. for title in self.titles:
  50. item.append(title)
  51. if self.references:
  52. item.append(self.references)
  53. cpe23item = SubElement(item, 'cpe-23:cpe23-item')
  54. cpe23item.set('name', self.cpe_str)
  55. # Generate the XML as a string
  56. xmlstr = ET.tostring(cpes)
  57. # And use minidom to pretty print the XML
  58. return minidom.parseString(xmlstr).toprettyxml(encoding="utf-8").decode("utf-8")
  59. @staticmethod
  60. def version(cpe):
  61. return cpe.split(":")[5]
  62. @staticmethod
  63. def product(cpe):
  64. return cpe.split(":")[4]
  65. @staticmethod
  66. def short_name(cpe):
  67. return ":".join(cpe.split(":")[2:6])
  68. @staticmethod
  69. def version_update(cpe):
  70. return ":".join(cpe.split(":")[5:6])
  71. @staticmethod
  72. def no_version(cpe):
  73. return ":".join(cpe.split(":")[:5])
  74. class CPEDB:
  75. def __init__(self, nvd_path):
  76. self.all_cpes = dict()
  77. self.all_cpes_no_version = dict()
  78. self.nvd_path = nvd_path
  79. def gen_cached_cpedb(self, cpedb, cache_all_cpes, cache_all_cpes_no_version):
  80. print("CPE: Unzipping xml manifest...")
  81. nist_cpe_file = gzip.GzipFile(fileobj=open(cpedb, 'rb'))
  82. print("CPE: Converting xml manifest to dict...")
  83. tree = ET.parse(nist_cpe_file)
  84. all_cpedb = tree.getroot()
  85. self.parse_dict(all_cpedb)
  86. print("CPE: Caching dictionary")
  87. cpes_file = open(cache_all_cpes, 'wb')
  88. pickle.dump(self.all_cpes, cpes_file)
  89. cpes_file.close()
  90. cpes_file = open(cache_all_cpes_no_version, 'wb')
  91. pickle.dump(self.all_cpes_no_version, cpes_file)
  92. cpes_file.close()
  93. def get_xml_dict(self):
  94. print("CPE: Setting up NIST dictionary")
  95. if not os.path.exists(os.path.join(self.nvd_path, "cpe")):
  96. os.makedirs(os.path.join(self.nvd_path, "cpe"))
  97. cpe_dict_local = os.path.join(self.nvd_path, "cpe", os.path.basename(CPEDB_URL))
  98. if not os.path.exists(cpe_dict_local) or os.stat(cpe_dict_local).st_mtime < time.time() - 86400:
  99. print("CPE: Fetching xml manifest from [" + CPEDB_URL + "]")
  100. cpe_dict = requests.get(CPEDB_URL)
  101. open(cpe_dict_local, "wb").write(cpe_dict.content)
  102. cache_all_cpes = os.path.join(self.nvd_path, "cpe", "all_cpes.pkl")
  103. cache_all_cpes_no_version = os.path.join(self.nvd_path, "cpe", "all_cpes_no_version.pkl")
  104. if not os.path.exists(cache_all_cpes) or \
  105. not os.path.exists(cache_all_cpes_no_version) or \
  106. os.stat(cache_all_cpes).st_mtime < os.stat(cpe_dict_local).st_mtime or \
  107. os.stat(cache_all_cpes_no_version).st_mtime < os.stat(cpe_dict_local).st_mtime:
  108. self.gen_cached_cpedb(cpe_dict_local,
  109. cache_all_cpes,
  110. cache_all_cpes_no_version)
  111. print("CPE: Loading CACHED dictionary")
  112. cpe_file = open(cache_all_cpes, 'rb')
  113. self.all_cpes = pickle.load(cpe_file)
  114. cpe_file.close()
  115. cpe_file = open(cache_all_cpes_no_version, 'rb')
  116. self.all_cpes_no_version = pickle.load(cpe_file)
  117. cpe_file.close()
  118. def parse_dict(self, all_cpedb):
  119. # Cycle through the dict and build two dict to be used for custom
  120. # lookups of partial and complete CPE objects
  121. # The objects are then used to create new proposed XML updates if
  122. # if is determined one is required
  123. # Out of the different language titles, select English
  124. for cpe in all_cpedb.findall(".//{http://cpe.mitre.org/dictionary/2.0}cpe-item"):
  125. cpe_titles = []
  126. for title in cpe.findall('.//{http://cpe.mitre.org/dictionary/2.0}title[@xml:lang="en-US"]', ns):
  127. title.tail = None
  128. cpe_titles.append(title)
  129. # Some older CPE don't include references, if they do, make
  130. # sure we handle the case of one ref needing to be packed
  131. # in a list
  132. cpe_ref = cpe.find(".//{http://cpe.mitre.org/dictionary/2.0}references")
  133. if cpe_ref:
  134. for ref in cpe_ref.findall(".//{http://cpe.mitre.org/dictionary/2.0}reference"):
  135. ref.tail = None
  136. ref.text = ref.text.upper()
  137. if ref.text not in VALID_REFS:
  138. ref.text = ref.text + "-- UPDATE this entry, here are some examples and just one word should be used -- " + ' '.join(VALID_REFS) # noqa E501
  139. cpe_ref.tail = None
  140. cpe_ref.text = None
  141. cpe_str = cpe.find(".//{http://scap.nist.gov/schema/cpe-extension/2.3}cpe23-item").get('name')
  142. item = CPE(cpe_str, cpe_titles, cpe_ref)
  143. cpe_str_no_version = CPE.no_version(cpe_str)
  144. # This dict must have a unique key for every CPE version
  145. # which allows matching to the specific obj data of that
  146. # NIST dict entry
  147. self.all_cpes.update({cpe_str: item})
  148. # This dict has one entry for every CPE (w/o version) to allow
  149. # partial match (no valid version) check (the obj is saved and
  150. # used as seed for suggested xml updates. By updating the same
  151. # non-version'd entry, it assumes the last update here is the
  152. # latest version in the NIST dict)
  153. self.all_cpes_no_version.update({cpe_str_no_version: item})
  154. def find_partial(self, cpe_str):
  155. cpe_str_no_version = CPE.no_version(cpe_str)
  156. if cpe_str_no_version in self.all_cpes_no_version:
  157. return cpe_str_no_version
  158. def find_partial_obj(self, cpe_str):
  159. cpe_str_no_version = CPE.no_version(cpe_str)
  160. if cpe_str_no_version in self.all_cpes_no_version:
  161. return self.all_cpes_no_version[cpe_str_no_version]
  162. def find_partial_latest_version(self, cpe_str_partial):
  163. cpe_obj = self.find_partial_obj(cpe_str_partial)
  164. return cpe_obj.cpe_cur_ver
  165. def find(self, cpe_str):
  166. if self.find_partial(cpe_str):
  167. if cpe_str in self.all_cpes:
  168. return cpe_str
  169. def gen_update_xml(self, cpe_str):
  170. cpe = self.find_partial_obj(cpe_str)
  171. return cpe.update_xml_dict()