diff options
author | Emilio Pozuelo Monfort <pochu@debian.org> | 2021-11-10 12:29:40 +0100 |
---|---|---|
committer | Emilio Pozuelo Monfort <pochu@debian.org> | 2021-11-10 16:18:33 +0100 |
commit | 1b37aaa9f28ed4d667e01b61cf3494a42971e722 (patch) | |
tree | 6f40cf4555d480acfbd91f207f903d721b1a1259 /bin/lts-missing-uploads | |
parent | 2b30069b643ef4a570226190fbeb74710a8c4994 (diff) |
bin/lts-missing-uploads: drop the .py extension
Diffstat (limited to 'bin/lts-missing-uploads')
-rwxr-xr-x | bin/lts-missing-uploads | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/bin/lts-missing-uploads b/bin/lts-missing-uploads new file mode 100755 index 0000000000..eb84a234df --- /dev/null +++ b/bin/lts-missing-uploads @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +# +# Copyright 2016 Chris Lamb <lamby@debian.org> +# +# This file is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This file is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this file. If not, see <https://www.gnu.org/licenses/>. + +import re +import sys +import gzip +import datetime +import eventlet +import requests +import dateutil.relativedelta + +from debian.deb822 import Sources +from debian.debian_support import Version + +class LTSMissingUploads(object): + MONTHS = 6 + SOURCES = ['http://security.debian.org/dists/stretch/updates/{}/source/Sources.gz'.format(component) + for component in ('main', 'contrib', 'non-free')] + + re_line = re.compile( + r'(?P<suffix>msg\d+.html).*\[DLA (?P<dla>[\d-]+)\] (?P<source>[^\s]+) security update.*' + ) + re_version = re.compile(r'^Version.*: (?P<version>.*)') + + def __init__(self): + self.pool = eventlet.GreenPool(10) + self.session = requests.session() + + def main(self, *args): + self.info("Getting last {} month(s) of LTS annoucements", self.MONTHS) + + dlas = {} + def download(x): + self.info("{source}: parsing announcement from {url} ...", **x) + x.update(self.get_dla(x['url'])[0]) + dlas[x['source']] = x + + for idx in range(self.MONTHS): + dt = datetime.datetime.utcnow().replace(day=1) - \ + dateutil.relativedelta.relativedelta(months=idx) + + self.info( + "Getting announcements for {}/{:02} ...", + dt.year, + dt.month, + ) + + # Prefer later DLAs with reversed(..) + for x in reversed(self.get_dlas(dt.year, dt.month)): + if x['source'] not in dlas: + self.pool.spawn_n(download, x) + self.pool.waitall() + + if not dlas: + return 0 + + sources = self.get_sources() + + for source, dla in sorted(dlas.items()): + try: + dla_version = Version(dla['version']) + except ValueError: + self.warn("{}: DLA-{} announced with invalid version: {} <{}>", + source, + dla['dla'], + dla['version'], + dla['url'], + ) + continue + + archive_version = Version(sources[source]) + + if dla_version <= archive_version: + continue + + self.warn("{}: DLA-{} announced version {} but LTS has {} <{}>", + source, + dla['dla'], + dla_version, + archive_version, + dla['url'], + ) + + return 0 + + def get_dlas(self, year, month): + url = 'https://lists.debian.org/debian-lts-announce/{}/{:02}/'.format( + year, + month, + ) + + result = self.parse(url, self.re_line) + + # Prepend URL as the indices have relative URIs + for x in result: + x['url'] = '{}{}'.format(url, x['suffix']) + + return result + + def get_dla(self, url): + return self.parse(url, self.re_version) + + def get_sources(self): + pkgver = {} + for src in self.SOURCES: + self.info("Downloading Sources from {} ...", src) + + response = self.session.get(src) + response.raise_for_status() + + val = gzip.decompress(response.content).decode('utf-8') + + for x in Sources.iter_paragraphs(val): + pkgver[x['Package']] = x['Version'] + + return pkgver + + def parse(self, url, pattern): + result = [] + + for x in self.session.get(url).content.splitlines(): + m = pattern.search(x.decode('utf8')) + + if m is not None: + result.append(m.groupdict()) + + return result + + ## + + def warn(self, msg, *args, **kwargs): + print("W: " + msg.format(*args, **kwargs), file=sys.stderr) + + def info(self, msg, *args, **kwargs): + print("I: " + msg.format(*args, **kwargs), file=sys.stderr) + +if __name__ == '__main__': + eventlet.monkey_patch(socket=True) + + try: + sys.exit(LTSMissingUploads().main(*sys.argv[1:])) + except KeyboardInterrupt: + sys.exit(1) |