From 4b4c26c96534c175fc0bd493d2e1a3e21a9afffe Mon Sep 17 00:00:00 2001 From: Emilio Pozuelo Monfort Date: Wed, 21 Jun 2023 13:49:32 +0200 Subject: Rewrite check-new-issues in Python While at it, switch to the new MITRE CVE 5 API, as the previous API will be removed soon. Fixes #20 --- bin/check-new-issues | 1455 +++++++++++++++++++++++++------------------------- 1 file changed, 739 insertions(+), 716 deletions(-) (limited to 'bin') diff --git a/bin/check-new-issues b/bin/check-new-issues index 9660e18e14..fef315711f 100755 --- a/bin/check-new-issues +++ b/bin/check-new-issues @@ -1,16 +1,290 @@ -#!/usr/bin/perl - -use strict; -use File::Temp; -use Getopt::Std; -#use Smart::Comments; -use Term::ReadLine; - -my %opts; -getopts('ln:fhi:t:Tca:e:uUsDb', \%opts); - -sub print_commands { - print <<'EOF'; +#!/usr/bin/python3 +# +# Interactive command to iterate over new CVEs in order to triage +# them. +# +# Based on a previous Perl script written by Stefan Fritsch and others. +# +# Copyright © 2023 Emilio Pozuelo Monfort +# +# This file is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# This file is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + +import argparse +import collections +import json +import logging +import os +import re +import readline +import subprocess +import sys +import tempfile +import textwrap +import zipfile + +import requests + +import setup_paths # noqa +from sectracker import parsers +from bugs import temp_bug_name + +#logging.getLogger().setLevel("DEBUG") + +def debug(s): + if args.verbose: + print(s) + +def get_annotation(annotations, ann_type): + for ann in annotations: + if isinstance(ann, ann_type): + return ann + +def ann_is_todo_check(ann): + if isinstance(ann, parsers.StringAnnotation): + if ann.type == "TODO" and ann.description == "check": + return True + return False + +def read_packages_file(file): + packages = [] + + with open(file) as f: + for line in f: + if line.startswith('#'): + continue + + line = line.strip() + if line != "": + packages.append(line) + + return packages + +def read_wnpp_file(wnpp_file): + wnpp = {} + + wnpp_re = re.compile(r"^([\w.-]+): ((?:ITP|RFP) .+)$") + + with open(wnpp_file) as f: + for line in f: + m = wnpp_re.match(line) + if m: + package, bug = m.group(1, 2) + wnpp[package] = bug + + return wnpp + +def print_urls(cve_id): + cve = cve5s[cve_id] if cve_id in cve5s else None + + if cve: + cna = cve['containers']['cna'] + if 'references' in cna: + for ref in cna['references']: + print('Reference: ' + ref['url']) + print("") + +def get_cve5_description(cve_id): + cve = cve5s[cve_id] if cve_id in cve5s else None + desc = None + + if cve: + if 'descriptions' in cve['containers']['cna']: + desc = [desc['value'] + for desc in cve['containers']['cna']['descriptions'] + if desc['lang'].startswith('en')] + if desc: + desc = desc[0] + + # for some reason descriptions may contain new lines + desc = desc.replace('\n', ' ') + + # and some contain leading spaces + desc = desc.strip() + + return desc + +def save_datafile(cves, datafile): + debug(f'writing {len(cves)} to {datafile}') + with open(datafile, "w") as f: + parsers.writecvelist(cves, f) + +def print_cve(cve): + cvelist = [cve] + parsers.writecvelist(cvelist, sys.stdout) + +def read_cve5_file(f): + cve5s = {} + + z = zipfile.ZipFile(cve5_file) + for fname in z.namelist(): + if os.path.basename(fname).startswith('CVE-'): + f = z.open(fname) + debug("processing record " + fname) + record = json.load(f) + cve_id = record['cveMetadata']['cveId'] + cve5s[cve_id] = record + + return cve5s + +# returns the first Debian bug associated to the CVE, or None +def get_cve_bug(cve): + for ann in cve.annotations: + if isinstance(ann, parsers.PackageAnnotation): + for flag in ann.flags: + if isinstance(flag, parsers.PackageBugAnnotation): + return flag.bug + + return None + +def get_cve_name(cve): + if not 'XXXX' in cve.header.name: + return cve.header.name + + # for XXXX entries, we can't use the name in CVE/list as it will + # cause collitions, so we use the hashed name instead. + bug = get_cve_bug(cve) or 0 + desc = cve.header.description + + return temp_bug_name(bug, desc) + +def parse_cves(): + cvelist = parsers.cvelist(datafile) + + # we want to use a dict to easily lookup and replace a cve, but we need + # to keep order from the list above for when we write the keys back. + cves = collections.OrderedDict() + for cve in cvelist: + name = get_cve_name(cve) + + cves[name] = cve + + return cves + +def auto_nfu(name): + desc = get_cve5_description(name) + + wordpress_re = re.compile(r".*in\s+the\s+(.+)\s+(plugin|theme)\s+(?:[\w\d.]+\s+)?(?:(?:(?:before|through)\s+)?[\w\d.]+\s+)?for\s+[Ww]ord[Pp]ress.*") + m = wordpress_re.match(desc) + if m: + name, type = m.group(1, 2) + return f"{name} {type} for WordPress" + + nfu_re = re.compile(r".*\b(FS\s+.+?\s+Clone|Meinberg\s+LANTIME|Ecava\s+IntegraXor|Foxit\s+Reader|Cambium\s+Networks\s+.+?\s+firmware|Trend\s+Micro|(?:SAP|IBM|EMC|NetApp|Micro\sFocus).+?(?=tool|is|version|[\d(,])).*") + m = nfu_re.match(desc) + if m: + name = m.group(1) + name = name.strip() + return name + + return None + +apt_cache_cache = [] +apt_cache_cache_term = "" + +def apt_cache(term): + global apt_cache_cache_term + global apt_cache_cache + + if term == apt_cache_cache_term: + return apt_cache_cache + + cmd = subprocess.run(['apt-cache', 'search', term], text=True, capture_output=True) + apt_cache_cache = cmd.stdout.strip().split('\n') + apt_cache_cache_term = term + + return apt_cache_cache + +def read_embedded_copies(): + emb_file = "data/embedded-code-copies" + with open(emb_file) as f: + comment_section = True + code = None + pkg = None + + for line in f.readlines(): + if re.match(r'^---BEGIN', line): + comment_section = False + continue + + if not comment_section: + debug(line) + if m := re.match(r'^([\w][\w+-.]+)', line): + code = m.group(1).lower() + debug("code: " + code) + pkg = None + if code in embed_code: + syntax_error("Duplicate embedded code $code") + elif line.strip() == "": + code = None + pkg = None + debug("empty line, resetting") + elif m := re.match(r'^\s+(?:\[\w+\]\s+)?-\s+(\w[\w.-]+)', line): + pkg = m.group(1) + debug("pkg: " + pkg) + line = line.strip() + if code not in embed_code: + embed_code[code] = {} + if pkg not in embed_code[code]: + embed_code[code][pkg] = {} + embed_pkg[pkg] = {} + + embed_code[code][pkg][line] = True + embed_pkg[pkg][code] = True + elif re.match(r'^\s+(?:NOTE|TODO)', line): + # note should follow a pkg line, which should have already + # been processed + pass + else: + syntax_error(f"Cannot parse {line}") + +def syntax_error(s): + print("embedded-code-copies: " + s, file=sys.stderr) + sys.exit(1) + +def search_embed(text): + found = 0 + text = text.lower() + if text in embed_code: + print(f"{text} is embedded by: " + + " ".join(sorted(embed_code[text].keys()))) + found = 1 + + if text in embed_pkg: + print(f"{text} embeds: " + + " ".join(sorted(embed_pkg[text].keys()))) + found = 1 + + return found + +def wnpp_to_candidates(): + for pkg, line in wnpp.items(): + # there might be more than one bug, so only take the first + bugline = line.split('|')[0] + + type, bug = bugline.split(" ") + if re.match(r'^(?:RFP|ITP)$', type): + yield f"{pkg} (bug #{bug})" + +def print_stats(): + temp_cves = [e for e in cves.keys() if 'TEMP' in e] + + print(f"{len(cve5s)} CVEs", end="") + print(f", {len(temp_cves)} temp issues", end="") + if num_todo > 0: + print(f", {num_todo} todos", end="") + if num_missing_bug > 0: + print(f", {num_missing_bug} entries with missing bug reference", end="") + print("") + +def print_commands(): + print(''' * blank line to skip to next issue * .h to repeat this help output of the list of commands * .fname to do "apt-file search name" @@ -25,708 +299,457 @@ sub print_commands { * - package-entry to add an entry for "package" and launch an editor (e.g. - poppler ) * q to save and quit * CTRL-C to quit without saving - * everything else is inserted as product name for a NOT-FOR-US -EOF -} - -if ($opts{h}) { - print <<'EOF'; -downloads allitems.txt from cve.mitre.org and shows full description for each -"TODO: check" item (2003 and newer). Then - -- tries to guess product name and php filename and does - apt-cache and apt-file search -- waits for input: -EOF - print_commands; - print <<'EOF'; - -Use "git diff" and "git reset" as needed ;-) - -OPTIONS: [ -l [-n ] [-f] ] --l : just list issues --n : show max n lines of each description (default 2) --f : show full CVE/list entry as well --i regexp : use regexp to select todos (default: 'CVE-20(?:0[3-9]|[1-9][0-9])' ) --t regexp : use regexp to select todos (default: '^\s+TODO: check$' ) --T : same as -t '^\s+TODO: check' (note the missing $) --u : also show unfixed issues without bug reference --U : only show unfixed issues without bug reference instead of TODO items --c : only do syntax check of embedded-code-copies --e : use for embedded-code-copies, "-" for STDIN --a : If automatic apt-cache/apt-file search gives more than n results, - display only the count (default 10) --s : skip automatic apt-cache/apt-file searches, suggest the - command to run instead --D : skip the download operations --b : auto process entries (e.g. NFUs) - -EOF - - exit(0); -} - -# TODO/BUGS: -# - go back to previous issue / undo -# - handle entries with several TODO lines -# - handle claimed-by - - -my $basedir; -if (-e "security-tracker/data/CVE/list") { - $basedir="security-tracker"; -} elsif (-e "data/CVE/list") { - $basedir="."; -} elsif (-e "../data/CVE/list") { - $basedir=".."; -} - - -my $embed_code = {}; -my $embed_pkg = {}; -my $embed_errors; - -read_embedded_copies(); - -if ($opts{c}) { - exit($embed_errors); -} - - -my $datafile="$basedir/data/CVE/list"; -my $allitemsfile="gunzip -c $basedir/../allitems.txt.gz|"; -my $allitemsurl="https://cve.mitre.org/data/downloads/allitems.txt.gz"; -my $removedfile="$basedir/data/packages/removed-packages"; -my $wnppurl="https://qa.debian.org/data/bts/wnpp_rm"; -my $wnppfile="../wnpp_rm"; - -my $issue_regexp= $opts{i} || 'CVE-20(?:0[3-9]|[1-9][0-9])'; -my $todo_regexp= $opts{t} || ( $opts{T} ? '^\s+TODO: check' : '^\s+TODO: check$' ); -my $auto_display_limit = 10; -$auto_display_limit = $opts{a} if defined $opts{a}; - -my $editor= 'sensible-editor'; - -unless ($opts{D}) { - system "cd $basedir/.. ; wget -N $allitemsurl"; - system "cd $basedir/.. ; wget -N $wnppurl"; -} - -print "Reading data...\n"; - -my $entries=read_file($datafile, qr/^CVE/ ); -my $CVEs=read_file($allitemsfile, qr/^=+$/ ); -my $data; -my @todos; -my %afcache; -my $num_todo; -my $num_missing_bug; - -foreach my $cve (@{$CVEs}) { - $cve =~ /^Name:\s*(CVE\S+)/m or next; - my $name = $1; - - # cleanup the description - $cve =~ s/^Current Votes:.+candidate not yet[^\n]+\n{2,3}//ms; - $cve =~ s/^(?:Phase|Status|Category):[^\n]*\n//gms; - - $data->{$name}->{CVE}=\$cve; -} - -my %wnpp; -open(WNPP, $wnppfile) or die "could not open $wnppfile"; -while () { - next unless (m/^([\w.-]+): ((?:ITP|RFP) .+)$/); - $wnpp{lc $1} = $2; -} -close(WNPP); + * everything else is inserted as product name for a NOT-FOR-US''') + +parser = argparse.ArgumentParser(description="review new CVE entries") +parser.add_argument('-l', '--list', action='store_true', + help='Only list issues') +parser.add_argument('-f', '--full', action='store_true', + help='Show full CVE entries') +parser.add_argument('-u', '--unfixed', action='store_true', + help='Also process CVEs with unfixed issues and no bugs') +parser.add_argument('-U', '--only-unfixed', action='store_true', + help='Only process CVEs with unfixed issues and no bugs') +parser.add_argument('-a', '--auto', action='store_true', + help='Automatically process NOT-FOR-US entries') +parser.add_argument('-s', '--skip', action='store_true', + help='Skip automatic apt-cache/apt-file searches') +parser.add_argument('-D', '--no-download', action='store_true', + help='Skip downloading files') +parser.add_argument('-v', '--verbose', action='store_true', + help='Verbose mode') + +args = parser.parse_args() + + +embed_code = {} +embed_pkg = {} + +read_embedded_copies() + +cve5_file_url = 'https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip' +cve5_file = 'mitre.zip' +datafile = "data/CVE/list" +removed_packages_file = "data/packages/removed-packages" +ignore_bug_file = "data/packages/ignored-debian-bug-packages" +wnppurl = "https://qa.debian.org/data/bts/wnpp_rm" +wnppfile = "../wnpp_rm" + +issue_re = re.compile(r'CVE-20(?:0[3-9]|[1-9][0-9])|TEMP') +auto_display_limit = 10 +#$auto_display_limit = $opts{a} if defined $opts{a} +editor = 'sensible-editor' + +if not args.no_download: + debug("downloading files...") + + r = requests.get(cve5_file_url) + with open(cve5_file, "wb") as f: + f.write(r.content) + + r = requests.get(wnppurl) + with open(wnppfile, "w") as f: + f.write(r.text) + +debug("reading data...") + +# We have CVE 5.0 JSON information coming from MITRE, we use cve5 for those +# We also have CVE information coming from our data/CVE/list, we use cve there +cves = parse_cves() +cve5s = read_cve5_file(cve5_file) + +todos = [] +afcache = {} +num_todo = 0 +num_missing_bug = 0 + +wnpp = read_wnpp_file(wnppfile) # packages that should be ignored by -u/-U -my @ignore_missing_bug_list = qw/linux-2.6 linux-2.6.24 - kfreebsd-source kfreebsd-5 kfreebsd-6 kfreebsd-7 - mozilla mozilla-firefox mozilla-thunderbird firefox - php4 - gnutls11 - /; -my %ignore_missing_bug; -if ($opts{u} || $opts{U}) { - push @ignore_missing_bug_list, read_removed_packages_file($removedfile); - $ignore_missing_bug{$_} = 1 for @ignore_missing_bug_list; -} - -my %seen_pkgs; - -foreach my $entry (@{$entries}) { - my $name; - if ( $entry =~ /^(CVE-....-\d{4,})/ ) { - $name=$1; - } - elsif ( $entry =~ /^(CVE-....-XXXX.*)\n/ ){ - $name=$1; - } - else { - die "invalid entry:\n$entry"; - } - if (!$opts{l} && $entry =~ /^\s+-\s+([^\s]+)/m ) { - my $pkg = $1; - my $fc = substr($pkg, 0, 1); - - $seen_pkgs{$fc} = {} - unless (exists($seen_pkgs{$fc})); - $seen_pkgs{$fc}{$pkg} = undef; - } - $data->{$name}->{entry}=\$entry; - if ($name =~ /$issue_regexp/) { - if (!$opts{U} && $entry =~ /$todo_regexp/m ) { - push @todos, $name; - $num_todo++; - } - elsif ( ($opts{u} || $opts{U}) - && $entry =~ /^\s+-\s+(\S+)\s+(.*)$/m - && ! exists $ignore_missing_bug{$1} - && $2 !~ /unimportant/ - && $entry !~ /-\s+$1\s.*?bug #/m - ) { - push @todos, $name; - $num_missing_bug++; - } - } -} - -print scalar(@{$CVEs}), " CVEs, ", - scalar(@{$entries}) - scalar(@{$CVEs}), " temp issues"; -print ", $num_todo todos matching /$todo_regexp/" if $num_todo; -print ", $num_missing_bug entries with missing bug reference" if $num_missing_bug; -print "\n"; - -if ((! $opts{l}) and (! $opts{b})) { - print "\nCommands:\n"; - print_commands; - print "\n"; -} - -if ($opts{l}) { - #list only - foreach my $todo (sort {$b <=> $a} @todos) { - my $desc=description($todo); - if ($desc) { - my $lines=$opts{n} || 2; - if ($desc =~ /((?:.*\n){1,$lines})/) { - $desc = $1; - $desc =~ s/^/ /mg; - if ($opts{f}) { - print ${$data->{$todo}->{entry}}, $desc; - } - else { - print "$todo:\n$desc"; - } - } - } - else { - print "${$data->{$todo}->{entry}}"; - } - } - exit 0; -} - -if ($opts{b}) { - # auto process - foreach my $todo (sort {$b <=> $a} @todos) { - if ($data->{$todo}->{CVE}) { - my $nfu_entry = auto_nfu($todo); - if ($nfu_entry) { - ${$data->{$todo}->{entry}} =~ - s/^\s*TODO: check/\tNOT-FOR-US: $nfu_entry/m ; - next; - } - } - } - save_datafile(); - exit 0; -} - -my $term = new Term::ReadLine 'check-new-issues'; -if ($term->ReadLine() eq 'Term::ReadLine::Stub') { - print "Install libterm-readline-gnu-perl to get readline support!\n"; -} - -my $attribs = $term->Attribs; - -my @completion_commands = qw(.f .c .w .m .r .g ! v e - .help q d); -$attribs->{completer_word_break_characters} = ' '; - -sub initial_completion { - my ($text, $line, $start, $end) = @_; - - $attribs->{attempted_completion_over} = 1; - - - # If first word then complete commands - if ($start == 0) { - $attribs->{completion_word} = \@completion_commands; - - # do not add useless blank spaces on completion - $attribs->{completion_suppress_append} = 1 - unless ($line eq '-'); - - return $term->completion_matches($text, - $attribs->{list_completion_function}); - } elsif ($line =~ /^-\s+(.)?(?:([^\s]+)\s+)?/) { - my ($fc, $pkg) = ($1, $2); - - if (length($fc) == 0) { - $attribs->{completion_suppress_append} = 1; - $attribs->{completion_word} = [ keys %seen_pkgs ]; - } elsif (length($pkg) != 0) { - $attribs->{completion_word} = [ qw( ) ]; - } elsif (exists($seen_pkgs{$fc})) { - $attribs->{completion_word} = [ keys %{$seen_pkgs{$fc}} ]; - } else { - $attribs->{completion_word} = []; - } - - return $term->completion_matches($text, - $attribs->{list_completion_function}); - } else { - return; - } -} - -$attribs->{attempted_completion_function} = \&initial_completion; - -foreach my $todo (sort {$b <=> $a} @todos) { - last unless present_issue($todo); -} -save_datafile(); - -sub save_datafile { - open(my $fh, ">", $datafile); - print $fh @{$entries}; - close($fh); -} - -sub present_issue { - my $name = shift; - my $quit = 0; - - print_full_entry($name); - - if ($data->{$name}->{CVE}) { - my $nfu_entry = auto_nfu($name); - if ($nfu_entry) { - ${$data->{$name}->{entry}} =~ - s/^\s*TODO: check/\tNOT-FOR-US: $nfu_entry/m ; - print "New entry auto set to set to:\n${$data->{$name}->{entry}}"; - return 1; - } - } - - auto_search($name); - - READ: while (my $r=$term->readline(">") ) { - chomp $r; - if ($r =~ /^\s*$/) { - last READ; - } - elsif ($r=~ /^\.c(.*)$/ ) { - my $s = $1; - $s =~ tr{a-zA-Z0-9_@-}{ }cs; - print "=== apt-cache search $s :\n"; - system("apt-cache search $s|less -FX"); - print "===\n"; - next READ; - } - elsif ($r=~ /^\.f(.*)$/ ) { - my $s = $1; - $s =~ s/^\s*(.*?)\s*$/$1/; - $s = quotemeta($s); - print "=== apt-file search $s:\n"; - system("apt-file search $s|less -FX"); - print "===\n"; - next READ; - } - elsif ($r=~ /^\.w(.*)$/ ) { - my $s = $1; - $s =~ s/^\s*(.*?)\s*$/$1/; - print "=== wnpp lookup for '$s':\n"; - search_wnpp($s); - print "===\n"; - next READ; - } - elsif ($r=~ /^\.m(.*)$/ ) { - my $s = $1; - $s =~ s/^\s+//; - $s =~ s/\s+$//; - print "references to $s in embedded-code-copies:\n"; - search_embed($s) or print "none\n"; - next READ; - } - elsif ($r=~ /^\.g(.+)$/ ) { - my $n = $1; - $n =~ s/^\s*(.*?)\s*$/$1/; - if (!exists($data->{$n})) { - print "unknown issue '$n'\n"; - next READ; - } - unless (present_issue($n)) { - $quit = 1; - last READ; - } - print "back at $name (you might want to type 'd')\n"; - next READ; - } - elsif ($r=~ /^\.h/i ) { - print_commands; - next READ; - } - elsif ($r=~ /^!(.+)$/ ) { - system($1); - print "exit status: $?\n"; - next READ; - } - elsif ($r=~ /^q\s?$/i ) { - $quit = 1; - last READ; - } - elsif ($r=~ /^[ve]\s?$/i ) { - my $newentry=edit_entry(${$data->{$name}->{entry}}); - if ( $newentry eq ${$data->{$name}->{entry}} ) { - print "Not changed.\n"; - next READ; - } - else { - ${$data->{$name}->{entry}}=$newentry; - print "New entry set to:\n$newentry"; - last READ; - } - } - elsif ($r=~ /^d\s?$/i ) { - print_full_entry($name); - next READ; - } - elsif ($r=~ /^(\-\s+.+)$/ ) { - my @comps=split /\s+/, $1; - push @comps, '' - unless (scalar(@comps)>2); - my $inputentry = join(' ', @comps); - - my $preventry=${$data->{$name}->{entry}}; - $preventry =~ - s/^\s+/\t$inputentry\n$&/m ; - - if ($comps[2] eq '') { - $preventry =~ - s/^\s*TODO: check\n//m ; - } - - my $newentry=edit_entry($preventry); - ${$data->{$name}->{entry}}=$newentry; - print "New entry set to:\n$newentry"; - last READ; - } - elsif ($r=~ /^\.r(.*)$/ ) { - my $tmp=new File::Temp(); - my $tmpname=$tmp->filename; - system("$basedir/bin/report-vuln $1 $name > $tmpname"); - system("$editor $tmpname"); - close($tmp); - next READ; - } - else { - ${$data->{$name}->{entry}} =~ - s/^\s*TODO: check/\tNOT-FOR-US: $r/m ; - print "New entry set to:\n${$data->{$name}->{entry}}"; - last READ; - } - } - - return (!$quit); -} - -sub print_full_entry { - my $name = shift; - - print ${$data->{$name}->{CVE}} if $data->{$name}->{CVE}; - print ${$data->{$name}->{entry}}; -} - -sub description { - my $name=shift; - - defined $data->{$name}->{CVE} or return ""; - - ${$data->{$name}->{CVE}} =~ /\n\n(.*\n)\n/s; - my $desc = $1; - $desc =~ s/\n\n+/\n/; - - return $desc; -} - -sub read_file -{ - my $file=shift; - my $re=shift; - - - open(my $fh, $file) or die "could not open $file"; - - my @data; - my $cur=""; - while (my $line=<$fh>) { - if ($line =~ $re and $cur) { - push @data, $cur; - $cur = ""; - } - $cur.=$line; - } - push @data, $cur if $cur; - - close($fh); - - - return \@data; -} - - -sub edit_entry { - my $entry=shift; - my $tmp=new File::Temp(); - my $tmpname=$tmp->filename; - print $tmp $entry; - close $tmp; - system "$editor $tmpname"; - - local $/; #slurp - open($tmp, $tmpname); - return <$tmp>; - -} - -sub wnpp_to_history { - my $pkg = shift; - - # there might be more than one bug, so only take the first - my ($bugline) = (split /[|]/, $wnpp{$pkg}, 2); - - my ($type, $bug) = split /\s+/, $bugline; - return unless ($type =~ /^(?:RFP|ITP)$/); - - $term->addhistory("- $pkg (bug #$bug)"); -} - -sub auto_nfu { - my $name=shift; - - my $desc=description($name); - $desc =~ s/[\s\n]+/ /g; - - if ($desc =~ m/in\s+the\s+(.+)\s+(plugin|theme)\s+(?:[\w\d.]+\s+)?(?:(?:(?:before|through)\s+)?[\w\d.]+\s+)?for\s+[Ww]ord[Pp]ress/) { - my ($name, $type) = ($1, $2); - return "$name $type for WordPress"; - } - if ($desc =~ m/\b(FS\s+.+?\s+Clone|Meinberg\s+LANTIME|Ecava\s+IntegraXor|Foxit\s+Reader|Cambium\s+Networks\s+.+?\s+firmware|Trend\s+Micro|(?:SAP|IBM|EMC|NetApp|Micro\sFocus).+?(?=tool|is|version|[\d(,]))/) { - my $name = $1; - $name =~ s/\s$//; - return $name; - } - return ''; -} - -sub auto_search { - my $name=shift; - - my $desc=description($name); - $desc =~ s/[\s\n]+/ /g; - - my $file; - my $prog; - if ( $desc =~ /^(\S+(?: [A-Z]\w*)*) \d/ ) { - $prog = $1; - } - elsif ( $desc =~ / in (\S+\.\S+) in (?:the )?(\S+) / ) { - $file = $1; - $prog = $2; - } - elsif ( $desc =~ / in (?:the )?(\S+) / ) { - $prog = $1; - } - if ($prog) { - unless ($opts{s}) { - my $prog_esc =$prog; - $prog_esc =~ tr{a-zA-Z0-9_@/-}{ }cs; - print "doing apt-cache search..."; - my @ac=apt_cache($prog_esc); - if (scalar @ac > $auto_display_limit || scalar @ac == 0) { - print "\r", scalar @ac, " results from apt-cache search $prog_esc\n"; - } - else { - print "\r=== apt-cache search $prog_esc:\n", @ac, "===\n"; - } - } else { - print "You probably want to .c$prog\n"; - } - - foreach my $p (split /\s+/, $prog) { - search_embed($p); - my @wr = search_wnpp($p); - if (scalar @wr > $auto_display_limit) { - print scalar @wr, " results from searching '$prog' in WNPP\n"; - } - else { - for my $we (@wr) { - print "$we: $wnpp{$we}\n"; - wnpp_to_history($we); - } - } - } - } - if ( $file =~ /^(?:index|default|login|search|admin)\.(?:php3?|asp|cgi|pl)$/i ) { - return; - } - if ( $file =~ /(php3?|asp|cgi|pl)$/ ) { - unless ($opts{s}) { - if (! exists $afcache{$file}) { - my $file_esc = quotemeta($file); - print "doing apt-file search..."; - $afcache{$file}=[`apt-file -i search $file_esc`]; - if (scalar @{$afcache{$file}} > $auto_display_limit) { - # replace with empty array to save mem - my $num = scalar @{$afcache{$file}}; - $afcache{$file} = []; - $afcache{$file}->[$num-1] = undef; - } - } - if (scalar @{$afcache{$file}} > $auto_display_limit || - scalar @{$afcache{$file}} == 0) { - print "\r", scalar @{$afcache{$file}}, - " results from apt-file -i search $file\n"; - } - else { - print "\r=== apt-file -i search $file:\n", @{$afcache{$file}}, "===\n"; - } - } else { - print "You probably want to .f$file\n"; - } - } -} - -{ - my @apt_cache_cache; - my $apt_cache_cache_term; - - sub apt_cache { - my $term = shift; - - if ($term eq $apt_cache_cache_term) { - return @apt_cache_cache; - } - - @apt_cache_cache = `apt-cache search $term`; - $apt_cache_cache_term = $term; - - return @apt_cache_cache; - } -} - -sub read_embedded_copies { - my $emb_file = $opts{e} || "$basedir/data/embedded-code-copies"; - open(my $fh, $emb_file); - - # skip comments - while (<$fh>) { - last if /^---BEGIN/; - } - - my ($code, $pkg); - while (my $line = <$fh>) { - if ($line =~ /^([\w][\w+-.]+)/) { - $code = lc($1); - $pkg = undef; - if (exists $embed_code->{$code}) { - syntax_error("Duplicate embedded code $code") - } - } - elsif ($line =~ /^\s*$/) { - $code = undef; - $pkg = undef; - } - elsif ($line =~ /^\s+(?:\[\w+\]\s+)?-\s+(\w[\w.-]+)/) { - $pkg = $1; - $line =~ s/^\s+//; - if ($embed_code->{$code}->{$pkg}) { - $embed_code->{$code}->{$pkg} .= $line; - } - else { - $embed_code->{$code}->{$pkg} = $line; - push @{$embed_pkg->{$pkg}}, $code; - } - } - elsif ($line =~ /^\s+(?:NOTE|TODO)/) { - $line =~ s/^\s+//; - if ($pkg) { - $embed_code->{$code}->{$pkg} .= $line; - } - } - else { - syntax_error("Cannot parse $line"); - } - } -} - -sub syntax_error { - $embed_errors=1; - print STDERR "embedded-code-copies:$.: @_\n"; -} - -sub search_embed { - my $text = shift; - my $found = 0; - $text = lc($text); - if (exists $embed_code->{$text}) { - print "$text is embedded by: ", - join(" ", sort keys %{$embed_code->{$text}}), - "\n"; - $found = 1; - } - if (exists $embed_pkg->{$text}) { - print "$text embeds: ", - join(" ", sort @{$embed_pkg->{$text}}), - "\n"; - $found = 1; - } - return $found; -} - -sub search_wnpp { - my $s = shift; - $s = lc $s; - - my @matches; - @matches = grep(/$s/, sort keys %wnpp); - - if (wantarray) { - return @matches; - } else { - foreach my $e (@matches) { - print "$e: $wnpp{$e}\n"; - } - return (length(@matches) > 0); - } -} - -sub read_removed_packages_file { - my $file = shift; - - open(my $fh, "<", $file) or die "could not open $file"; - my @packages; - my $line; - while (defined ($line = <$fh>)) { - chomp $line; - $line =~ s/^\s+//; - $line =~ s/\s+$//; - next if $line =~ /^$/; - next if $line =~ /^#/; - push @packages, $line; - } - return @packages; -} +ignore_missing_bugs = read_packages_file(removed_packages_file) +ignore_missing_bugs += read_packages_file(ignore_bug_file) +# TODO: move this to data/packages/{removed,ignored-debian-bug}-packages +ignore_missing_bugs += "linux-2.6 linux-2.6.24 \ + kfreebsd-source kfreebsd-5 kfreebsd-6 kfreebsd-7 \ + mozilla mozilla-firefox mozilla-thunderbird firefox \ + php4 gnutls11".split() + +seen_pkgs = {} + +for name, cve in cves.items(): + if not args.list: + for ann in cve.annotations: + if isinstance(ann, parsers.PackageAnnotation): + pkg = ann.package + seen_pkgs[pkg] = True + + if issue_re.match(name): + if not args.only_unfixed: + for ann in cve.annotations: + if ann_is_todo_check(ann): + todos.append(name) + num_todo += 1 + + if args.unfixed or args.only_unfixed: + for ann in cve.annotations: + if not isinstance(ann, parsers.PackageAnnotation): + continue + if ann.release != None and ann.release != "": + continue + if ann.package in ignore_missing_bugs: + continue + if ann.kind != "unfixed": + continue + + urgency = get_annotation(ann.flags, parsers.PackageUrgencyAnnotation) + if urgency and urgency.severity == "unimportant": + continue + + bug = get_annotation(ann.flags, parsers.PackageBugAnnotation) + if bug: + continue + + todos.append(name) + num_missing_bug += 1 + +print_stats() + +if not args.list and not args.auto: + print("") + print("Commands:") + print_commands() + print("") + +if args.list: + for todo in sorted(todos, reverse=True): + desc = get_cve5_description(todo) + if desc: + indent = " " + lines = textwrap.wrap(desc, initial_indent=indent, subsequent_indent=indent) + desc = "\n".join(lines[0:2]) + if args.full: + print_cve(cves[todo]) + print(f"{desc}") + else: + print(f"{todo}:\n{desc}") + else: + print_cve(cves[todo]) + + sys.exit(0) + +if args.auto: + # auto process + for todo in todos: + if todo in cve5s: + if nfu_entry := auto_nfu(todo): + set_cve_fnu(todo, nfu_entry) + + save_datafile(cves.values(), datafile) + sys.exit(0) + +def set_cve_nfu(name, desc): + cve = cves[name] + # remove todo: check annotation... + cve.annotations = [ann for ann in cve.annotations if not ann_is_todo_check(ann)] + # ... and add a NFU annotation + ann = parsers.StringAnnotation(0, "NOT-FOR-US", desc) + cve.annotations.append(ann) + +def print_full_entry(name): + print_urls(name) + if desc := get_cve5_description(name): + print(desc) + print_cve(cves[name]) + +def edit_entry(entry, extra_text=None): + _, filename = tempfile.mkstemp() + save_datafile([entry], filename) + + with open(filename) as f: + old_data = f.read() + + if extra_text is not None: + with open(filename, "a") as f: + f.write(extra_text) + + subprocess.run(f"{editor} {filename}", shell=True) + + with open(filename) as f: + new_data = f.read() + + newcves = parsers.cvelist(filename) + os.unlink(filename) + + debug(f"edit_entry: old_data\n{old_data}\n") + debug(f"edit_entry: new_data\n{new_data}\n") + + if old_data == new_data: + return None + + return newcves[0] + +def search_wnpp(s, wantarray=False): + s = s.lower() + + matches = [w for w in wnpp.keys() if s in w] + matches = sorted(matches) + + if wantarray: + return matches + + for e in matches: + print(f"{e}: {wnpp[e]}") + + return len(matches) > 0 + +def auto_search(name): + desc = get_cve5_description(name) or "" + desc = desc.strip() + #$desc =~ s/[\s\n]+/ /g; + + prog = None + file = None + + if m := re.match(r'^(\S+(?: [A-Z]\w*)*) \d', desc): + prog = m.group(1) + elif m := re.search(r' in (\S+\.\S+) in (?:the )?(\S+) ', desc): + file = m.group(1) + prog = m.group(2) + elif m := re.search(r' in (?:the )?(\S+) ', desc): + prog = m.group(1) + if prog: + debug("prog: " + prog) + if not args.skip: + prog_esc = prog + #$prog_esc =~ tr{a-zA-Z0-9_@/-}{ }cs; + ac = apt_cache(prog_esc) + if len(ac) > auto_display_limit or len(ac) == 0: + print(f"{len(ac)} results from apt-cache search {prog_esc}") + else: + print(f"=== apt-cache search {prog_esc}:") + for result in ac: + print(result) + print("===") + else: + print(f"You probably want to .c{prog}") + + for p in prog.split(): + search_embed(p) + wr = search_wnpp(p, wantarray=True) + if len(wr) > auto_display_limit: + print(f"{len(wr)} results from searching '{p}' in WNPP") + else: + for we in wr: + print(f"{we}: {wnpp[we]}") + + if file and re.match(r'^(?:index|default|login|search|admin)\.(?:php3?|asp|cgi|pl)$', file): + return + if file and re.search(r'(php3?|asp|cgi|pl)$', file): + if not args.skip: + if file not in afcache: + file_esc = file + #file_esc = quotemeta(file) + print(f"doing apt-file search {file_esc}") + cmd = subprocess.run(['apt-file', '-i', 'search', file_esc], text=True, capture_output=True) + afcache[file] = cmd.stdout.split('\n') + #if (scalar @{$afcache{$file}} > $auto_display_limit) { + # # replace with empty array to save mem + # my $num = scalar @{$afcache{$file}} + # afcache[file] = [] + if len(afcache[file]) > auto_display_limit or \ + len(afcache[file]) == 0: + print(f"{len(afcache[file])} results from apt-file -i search {file}") + else: + print(f"=== apt-file -i search {file}:") + for result in afcache[file]: + print(afcache[file]) + print("===") + else: + print(f"You probably want to .f{file}") + +def present_issue(name): + quit = False + + print_full_entry(name) + + if name in cve5s: + if nfu_entry := auto_nfu(name): + set_cve_nfu(name, nfu_entry) + print("New entry automatically set to NFU:") + entry = cves[name] + print_cve(entry) + return True + + auto_search(name) + + while True: + line = input("> ") + line = line.strip() + + if m := re.match(r"^\s*$", line): + break + elif m := re.match(r"^\.c(.*)$", line): + s = m.group(1).strip() + #$s =~ tr{a-zA-Z0-9_@-}{ }cs; + print(f"=== apt-cache search {s}") + subprocess.run(f"apt-cache search {s} | less -FX", shell=True) + print("===") + continue + elif m := re.match(r"^\.f(.*)$", line): + s = m.group(1).strip() + #s = quotemeta(s) + print(f"=== apt-file search {s}") + subprocess.run(f"apt-file search {s} | less -FX", shell=True) + print("===") + continue + elif m := re.match(r"^\.w(.*)$", line): + s = m.group(1).strip() + print(f"=== wnpp lookup for '{s}':") + search_wnpp(s) + print("===") + continue + elif m := re.match(r'^\.m(.*)$', line): + s = m.group(1).strip() + print(f"references to {s} in embedded-code-copies:") + search_embed(s) or print("none") + continue + elif m := re.match(r"^\.g(.+)$", line): + n = m.group(1).strip() + if n not in cves: + print(f"unknown issue '{n}'") + continue + + if present_issue(n): + quit = True + break + print(f"back at {name} (you might want to type 'd')") + continue + elif re.match("^\.h$", line): + print_commands() + continue + elif m := re.match(r"^!(.+)$", line): + cmd = m.group(1) + r = subprocess.run(cmd, shell=True) + print(f"exit status: {r.returncode}") + continue + elif re.match(r"^q\s?$", line): + quit = True + break + elif re.match(r"^[ve]\s?$", line): + entry = cves[name] + new_entry = edit_entry(entry) + if not new_entry: + print("Not changed.") + continue + else: + cves[name] = new_entry + print("New entry set to:") + print_cve(new_entry) + break + elif re.match(r"^d\s?$", line): + print_full_entry(name) + continue + elif m := re.match(r"^(\-\s+.+)$", line): + components = m.group(1).split() + if len(components) <= 2: + components.append('') + + extra_text = "\t" + " ".join(components) + old_entry = cves[name] + + if components[2] == '': + old_entry.annotations = [ann for ann in old_entry.annotations + if not ann_is_todo_check(ann)] + + new_entry = edit_entry(old_entry, extra_text=extra_text) + cves[name] = new_entry + print("New entry set to:") + print_cve(new_entry) + break + elif m := re.match(f'^\.r(.*)$', line): + pkg = m.group(1).strip() + _, tmpname = tempfile.mkstemp() + subprocess.run(f"bin/report-vuln {pkg} {name} > {tmpname}", shell=True) + subprocess.run(f"{editor} {tmpname}", shell=True) + #os.unlink(tmpname) + continue + else: + set_cve_nfu(name, line) + print("New entry set to:") + print_cve(cves[name]) + break + + return quit + +completion_commands = ".f .c .w .m .r .g ! v e - .help q d".split() + +def complete_line(text, state): + response = None + + origline = readline.get_line_buffer() + begin = readline.get_begidx() + end = readline.get_endidx() + being_completed = origline[begin:end] + words = origline.split() + + logging.debug('origline=%s', repr(origline)) + logging.debug('begin=%s', begin) + logging.debug('end=%s', end) + logging.debug('being_completed=%s', being_completed) + logging.debug('words=%s', words) + + if not words: + current_candidates = completion_commands + else: + try: + if begin == 0: + # first word + candidates = completion_commands + else: + # later word + first = words[0] + if first == '-': + # autocomplete - pkg entries + if len(words) == 1 or (len(words) == 2 and being_completed): + candidates = list(seen_pkgs.keys()) + candidates += list(wnpp_to_candidates()) + else: + candidates = ' '.split() + #else: + # XXX + + if being_completed: + # match options with portion of input + # being completed + current_candidates = [ w for w in candidates + if w.startswith(being_completed) ] + else: + # matching empty string so use all candidates + current_candidates = candidates + + logging.debug('candidates=%s', current_candidates) + + except (KeyError, IndexError) as err: + logging.error('completion error: %s', err) + current_candidates = [] + + try: + response = current_candidates[state] + if len(current_candidates) == 1: + response += " " + except IndexError: + response = None + logging.debug('complete(%s, %s) => %s', repr(text), state, response) + return response + +readline.set_completer(complete_line) +# we don't want '<' to be considered a delim as we use it as a word for +# e.g. '' +readline.set_completer_delims(' ') +readline.parse_and_bind('tab: complete') + +for todo in sorted(todos, reverse=True): + if present_issue(todo): + break + +save_datafile(cves.values(), datafile) -- cgit v1.2.3