From a4c51f4130569e0e7309a180b78d6b022c5cb6ab Mon Sep 17 00:00:00 2001 From: Emilio Pozuelo Monfort Date: Mon, 10 Apr 2023 17:13:12 +0200 Subject: process-cve-records: new script to parse MITRE CVE 5.0 records This replaces the other part of bin/updatelist, but using the new CVE JSON 5.0 format. Closes #17, #18. --- bin/process-cve-records | 155 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100755 bin/process-cve-records (limited to 'bin') diff --git a/bin/process-cve-records b/bin/process-cve-records new file mode 100755 index 0000000000..135e19c27e --- /dev/null +++ b/bin/process-cve-records @@ -0,0 +1,155 @@ +#!/usr/bin/python3 +# +# Parse MITRE JSON 5.0 records and update data/CVE/list +# +# See https://github.com/CVEProject/cve-schema +# and https://github.com/CVEProject/cvelistV5 +# +# Copyright © 2023 Emilio Pozuelo Monfort + +import io +import json +import os +import sys +import zipfile + +import requests + +import setup_paths # noqa +from sectracker import parsers + +CVE_ZIPFILE = 'https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip' + +debug_enabled = False + +def debug(m): + if debug_enabled: + print(m) + + +def get_annotation(annotations, ann_type): + for ann in annotations: + if isinstance(ann, ann_type): + return ann + + +def is_published(record): + return record['cveMetadata']['state'] == 'PUBLISHED' + + +def is_reserved(record): + return record['cveMetadata']['state'] == 'RESERVED' + + +def is_rejected(record): + return record['cveMetadata']['state'] == 'REJECTED' + + +def parse_record(record, cve): + # remove all flags, and add the current one if needed + ann = get_annotation(cve.annotations, parsers.FlagAnnotation) + if ann: + cve.annotations.remove(ann) + + if is_published(record): + # no flag for published records + pass + elif is_reserved(record): + ann = parsers.FlagAnnotation(0, 'RESERVED') + cve.annotations.insert(0, ann) + elif is_rejected(record): + ann = parsers.FlagAnnotation(0, 'REJECTED') + cve.annotations.insert(0, ann) + + if len(cve.header.description) == 0 \ + and not is_rejected(record) and not is_reserved(record): + desc = [desc['value'] + for desc in record['containers']['cna']['descriptions'] + if desc['lang'].startswith('en')] + if desc: + desc = desc[0] + if desc and len(desc) > 70: + # for some reason descriptions contain new lines + desc = desc.replace('\n', ' ') + desc = desc[:70] + ' ...' + cve.header.description = f"({desc})" + + if not is_reserved(record) and not is_rejected(record) \ + and not get_annotation(cve.annotations, parsers.StringAnnotation): + ann = parsers.StringAnnotation(0, 'TODO', 'check') + cve.annotations.append(ann) + + +def process_record_file(f): + global cve_dir + global cves + + record = json.load(f) + cve_id = record['cveMetadata']['cveId'] + + try: + cve = cve_dir[cve_id] + except KeyError: + header = parsers.Header(0, cve_id, '') + cve = parsers.Bug('', header, list()) + cves.insert(0, cve) + parse_record(record, cve) + + +def process_record_filename(record_file): + with open(record_file) as f: + process_record_file(f) + + +def process_record_dir(record_dir): + for year_dir in os.listdir(record_dir): + for record_file in os.listdir(year_dir): + debug("processing record " + record_file) + process_record_filename(record_file) + debug("record processed") + + +def process_zip_file(zip_file): + z = zipfile.ZipFile(zip_file) + for fname in z.namelist(): + if os.path.basename(fname).startswith('CVE-'): + f = z.open(fname) + debug("processing record " + fname) + process_record_file(f) + debug("record processed") + + +def download_zip_file(): + debug("downloading zip file...") + r = requests.get(CVE_ZIPFILE) + debug(f"downloaded, status {r.status_code}") + b = io.BytesIO(r.content) + process_zip_file(b) + + +main_list = os.path.dirname(__file__) + '/../data/CVE/list' + +debug("reading cve file") +cves = parsers.cvelist(main_list) +debug("finished reading cve file") + +cve_dir = { cve.header.name: cve for cve in cves } + +if len(sys.argv) == 1: + # no argument, we download the CVE db + download_zip_file() +elif sys.argv[1].endswith('.json'): + record_file = sys.argv[1] + debug("processing record " + record_file) + process_record_filename(record_file) + debug("record processed") +elif sys.argv[1].endswith('.zip'): + zip_file = sys.argv[1] + process_zip_file(zip_file) +else: + record_dir = sys.argv[1] + process_record_dir(record_dir) + +# write CVE file back +with open(main_list, 'w') as f: + parsers.writecvelist(cves, f) -- cgit v1.2.3