summaryrefslogtreecommitdiffstats
path: root/bin
diff options
context:
space:
mode:
authorEmilio Pozuelo Monfort <pochu@debian.org>2023-04-10 17:13:12 +0200
committerEmilio Pozuelo Monfort <pochu@debian.org>2023-04-27 11:19:32 +0200
commita4c51f4130569e0e7309a180b78d6b022c5cb6ab (patch)
tree8a7977f0f253cb0dab4ea1d48129a16ef1213619 /bin
parent3800e37a04c078c57a3a0e260c08e903bb24338e (diff)
process-cve-records: new script to parse MITRE CVE 5.0 records
This replaces the other part of bin/updatelist, but using the new CVE JSON 5.0 format. Closes #17, #18.
Diffstat (limited to 'bin')
-rwxr-xr-xbin/process-cve-records155
1 files changed, 155 insertions, 0 deletions
diff --git a/bin/process-cve-records b/bin/process-cve-records
new file mode 100755
index 0000000000..135e19c27e
--- /dev/null
+++ b/bin/process-cve-records
@@ -0,0 +1,155 @@
+#!/usr/bin/python3
+#
+# Parse MITRE JSON 5.0 records and update data/CVE/list
+#
+# See https://github.com/CVEProject/cve-schema
+# and https://github.com/CVEProject/cvelistV5
+#
+# Copyright © 2023 Emilio Pozuelo Monfort <pochu@debian.org>
+
+import io
+import json
+import os
+import sys
+import zipfile
+
+import requests
+
+import setup_paths # noqa
+from sectracker import parsers
+
+CVE_ZIPFILE = 'https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip'
+
+debug_enabled = False
+
+def debug(m):
+ if debug_enabled:
+ print(m)
+
+
+def get_annotation(annotations, ann_type):
+ for ann in annotations:
+ if isinstance(ann, ann_type):
+ return ann
+
+
+def is_published(record):
+ return record['cveMetadata']['state'] == 'PUBLISHED'
+
+
+def is_reserved(record):
+ return record['cveMetadata']['state'] == 'RESERVED'
+
+
+def is_rejected(record):
+ return record['cveMetadata']['state'] == 'REJECTED'
+
+
+def parse_record(record, cve):
+ # remove all flags, and add the current one if needed
+ ann = get_annotation(cve.annotations, parsers.FlagAnnotation)
+ if ann:
+ cve.annotations.remove(ann)
+
+ if is_published(record):
+ # no flag for published records
+ pass
+ elif is_reserved(record):
+ ann = parsers.FlagAnnotation(0, 'RESERVED')
+ cve.annotations.insert(0, ann)
+ elif is_rejected(record):
+ ann = parsers.FlagAnnotation(0, 'REJECTED')
+ cve.annotations.insert(0, ann)
+
+ if len(cve.header.description) == 0 \
+ and not is_rejected(record) and not is_reserved(record):
+ desc = [desc['value']
+ for desc in record['containers']['cna']['descriptions']
+ if desc['lang'].startswith('en')]
+ if desc:
+ desc = desc[0]
+ if desc and len(desc) > 70:
+ # for some reason descriptions contain new lines
+ desc = desc.replace('\n', ' ')
+ desc = desc[:70] + ' ...'
+ cve.header.description = f"({desc})"
+
+ if not is_reserved(record) and not is_rejected(record) \
+ and not get_annotation(cve.annotations, parsers.StringAnnotation):
+ ann = parsers.StringAnnotation(0, 'TODO', 'check')
+ cve.annotations.append(ann)
+
+
+def process_record_file(f):
+ global cve_dir
+ global cves
+
+ record = json.load(f)
+ cve_id = record['cveMetadata']['cveId']
+
+ try:
+ cve = cve_dir[cve_id]
+ except KeyError:
+ header = parsers.Header(0, cve_id, '')
+ cve = parsers.Bug('', header, list())
+ cves.insert(0, cve)
+ parse_record(record, cve)
+
+
+def process_record_filename(record_file):
+ with open(record_file) as f:
+ process_record_file(f)
+
+
+def process_record_dir(record_dir):
+ for year_dir in os.listdir(record_dir):
+ for record_file in os.listdir(year_dir):
+ debug("processing record " + record_file)
+ process_record_filename(record_file)
+ debug("record processed")
+
+
+def process_zip_file(zip_file):
+ z = zipfile.ZipFile(zip_file)
+ for fname in z.namelist():
+ if os.path.basename(fname).startswith('CVE-'):
+ f = z.open(fname)
+ debug("processing record " + fname)
+ process_record_file(f)
+ debug("record processed")
+
+
+def download_zip_file():
+ debug("downloading zip file...")
+ r = requests.get(CVE_ZIPFILE)
+ debug(f"downloaded, status {r.status_code}")
+ b = io.BytesIO(r.content)
+ process_zip_file(b)
+
+
+main_list = os.path.dirname(__file__) + '/../data/CVE/list'
+
+debug("reading cve file")
+cves = parsers.cvelist(main_list)
+debug("finished reading cve file")
+
+cve_dir = { cve.header.name: cve for cve in cves }
+
+if len(sys.argv) == 1:
+ # no argument, we download the CVE db
+ download_zip_file()
+elif sys.argv[1].endswith('.json'):
+ record_file = sys.argv[1]
+ debug("processing record " + record_file)
+ process_record_filename(record_file)
+ debug("record processed")
+elif sys.argv[1].endswith('.zip'):
+ zip_file = sys.argv[1]
+ process_zip_file(zip_file)
+else:
+ record_dir = sys.argv[1]
+ process_record_dir(record_dir)
+
+# write CVE file back
+with open(main_list, 'w') as f:
+ parsers.writecvelist(cves, f)

© 2014-2024 Faster IT GmbH | imprint | privacy policy