summaryrefslogtreecommitdiffstats
path: root/bin/process-cve-records
blob: 9f30c7fcf92a4efe0de6012ec9099a2aae142ed6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/python3
#
# Parse MITRE JSON 5.0 records and update data/CVE/list
#
# See https://github.com/CVEProject/cve-schema
# and https://github.com/CVEProject/cvelistV5
#
# Copyright © 2023 Emilio Pozuelo Monfort <pochu@debian.org>

import argparse
import io
import json
import os
import zipfile

import requests

import setup_paths  # noqa
from sectracker import parsers

CVE_ZIPFILE = 'https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip'

def debug(m):
    if args.verbose:
        print(m)


def get_annotation(annotations, ann_type):
    for ann in annotations:
        if isinstance(ann, ann_type):
            return ann


def is_published(record):
    return record['cveMetadata']['state'] == 'PUBLISHED'


def is_reserved(record):
    return record['cveMetadata']['state'] == 'RESERVED'


def is_rejected(record):
    return record['cveMetadata']['state'] == 'REJECTED'


def parse_record(record, cve):
    # remove all flags, and add the current one if needed
    ann = get_annotation(cve.annotations, parsers.FlagAnnotation)
    if ann:
        cve.annotations.remove(ann)

    if is_published(record):
        # no flag for published records
        pass
    elif is_reserved(record):
        ann = parsers.FlagAnnotation(0, 'RESERVED')
        cve.annotations.insert(0, ann)
    elif is_rejected(record):
        ann = parsers.FlagAnnotation(0, 'REJECTED')
        cve.annotations.insert(0, ann)

    if is_reserved(record) or is_rejected(record):
        if cve.header.description.startswith('('):
            cve.header.description = ''
    else:
        desc = [desc['value']
                for desc in record['containers']['cna']['descriptions']
                if desc['lang'].startswith('en')]
        if desc:
            desc = desc[0]

            # for some reason descriptions may contain new lines
            desc = desc.replace('\n', ' ')

            # and even non-printable characters such as \xa0 (&nbsp;)
            # if a character is non-ascii then return character in
            # ASCII-only representation.
            desc = "".join([ c if ord(c) < 128 else ascii(c).strip('\'') for c in desc if c.isprintable() ])

            # and some contain leading spaces
            desc = desc.strip()

            if len(desc) > 70:
                desc = desc[:70] + ' ...'

        cve.header.description = f"({desc})"

    if not is_reserved(record) and not is_rejected(record) \
      and not get_annotation(cve.annotations, parsers.StringAnnotation) \
      and not get_annotation(cve.annotations, parsers.PackageAnnotation):
        ann = parsers.StringAnnotation(0, 'TODO', 'check')
        cve.annotations.append(ann)


def process_record_file(f):
    global cve_dir
    global cves

    record = json.load(f)
    cve_id = record['cveMetadata']['cveId']

    try:
        cve = cve_dir[cve_id]
    except KeyError:
        header = parsers.Header(0, cve_id, '')
        cve = parsers.Bug('', header, list())
        cves.insert(0, cve)

    parse_record(record, cve)

def process_record_filename(record_file):
    with open(record_file) as f:
        process_record_file(f)


def process_record_dir(record_dir):
    for year_dir in os.listdir(record_dir):
        for record_file in os.listdir(year_dir):
            debug("processing record " + record_file)
            process_record_filename(record_file)
            debug("record processed")


def process_zip_file(zip_file):
    z = zipfile.ZipFile(zip_file)
    for fname in z.namelist():
        if os.path.basename(fname).startswith('CVE-'):
            f = z.open(fname)
            debug("processing record " + fname)
            process_record_file(f)
            debug("record processed")


def download_zip_file():
    debug("downloading zip file...")
    r = requests.get(CVE_ZIPFILE)
    debug(f"downloaded, status {r.status_code}")
    b = io.BytesIO(r.content)
    process_zip_file(b)

default_workdir = os.path.join(os.path.dirname(os.path.dirname(__file__)))

parser = argparse.ArgumentParser(description='Update CVE list with MITRE CVE records')
parser.add_argument('-v', '--verbose', action="store_true", help='enable verbose messages')
parser.add_argument('--work-dir', help='path to security-tracker repo (default: relative to the script)', default=default_workdir)
parser.add_argument('file', nargs='?', help='file to process, or download records from MITRE if not specified')
args = parser.parse_args()

main_list = args.work_dir + '/data/CVE/list'

debug("reading cve file")
cves = parsers.cvelist(main_list)
debug("finished reading cve file")

cve_dir = { cve.header.name: cve for cve in cves }

if not args.file:
    # no argument, we download the CVE db
    download_zip_file()
elif args.file.endswith('.json'):
    debug("processing record " + args.file)
    process_record_filename(args.file)
    debug("record processed")
elif args.file.endswith('.zip'):
    process_zip_file(args.file)
else:
    process_record_dir(args.file)

# write CVE file back
with open(main_list, 'w') as f:
    parsers.writecvelist(cves, f)

© 2014-2024 Faster IT GmbH | imprint | privacy policy