summaryrefslogtreecommitdiffstats
path: root/bin/list-queue
blob: 167e9afe346e4b47d2bba808cc1054f3fca9e6a0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/python
# list-queue -- list security-master queue contents
# Copyright (C) 2011 Florian Weimer <fw@deneb.enyo.de>
# 
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

# This script is intended to be run on security-master to get an
# unprocessed dump of the contents of the embargoed and unembargoed
# queues.
#
# The script reads .deb files.  A caching database is written to
# ~/.cache.


######################################################################
# Configuration

DIRECTORIES = ('/org/security-master.debian.org/queue/embargoed',
               '/org/security-master.debian.org/queue/unembargoed')

# End Configuration
######################################################################

import sys
import os.path
def setup_path():
    dirname = os.path.dirname
    base = dirname(dirname(os.path.realpath(sys.argv[0])))
    sys.path.insert(0, os.path.join(base, "lib", "python"))
setup_path()

import sqlite3
import json

import debian_support

def createdb():
    cache = os.path.expanduser("~/.cache")
    if not os.path.isdir(cache):
        os.mkdir(cache)
    dbfile = os.path.join(cache, "secure-testing_list-debs.sqlite")
    db = sqlite3.connect(dbfile, isolation_level="IMMEDIATE")
    db.execute("PRAGMA page_size = 4096")
    db.execute("PRAGMA journal_mode = WAL")
    db.execute("""CREATE TABLE IF NOT EXISTS package (
  path TEXT NOT NULL PRIMARY KEY,
  size INTEGER NOT NULL CHECK (size >= 0),
  mtime INTEGER NOT NULL CHECK (size >= 0),
  name TEXT NOT NULL,
  version TEXT NOT NULL,
  arch TEXT NOT NULL,
  source TEXT NOT NULL,
  source_version TEXT NOT NULL
)""")
    return db

def readdirs():
    result = {}
    for path in DIRECTORIES:
        for entry in os.listdir(path):
            if entry.startswith(".") or not entry.endswith(".deb"):
                continue
            name = os.path.join(path, entry)
            stat = os.stat(name)
            result[name] = (stat.st_size, stat.st_mtime)
    return result

def readpackages(db):
    result = {}
    c = db.cursor()
    for row in c.execute("SELECT * FROM package"):
        name, size, mtime = row[0:3]
        pkg = debian_support.BinaryPackage(row[3:])
        result[name] = (size, mtime, pkg)
    return result

def updatepackages(db):
    """Updates the package table from the file system.

    Returns the current list of package objects, in arbitary order.
    """
    ondisk = readdirs()
    indb = readpackages(db)

    # Expire old entries
    need_delete = ((path,) for path in indb if path not in ondisk)
    db.executemany("DELETE FROM package WHERE path = ?", need_delete)
    
    # Update the cache in indb and the database
    need_update = [(path, stat) for (path, stat) in ondisk.items()
                   if path not in indb or stat <> tuple(indb[path][0:2])]
    db.executemany("DELETE FROM package WHERE path = ?",
                   ((path,) for path, _ in need_update))
    def do_update():
        for (path, stat) in need_update:
            pkg = debian_support.inspect_deb(path)
            indb[path] = stat + (pkg,)
            yield (path,) + stat + pkg.astuple()
    db.executemany("INSERT INTO package VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
                   do_update())

    # Return a list of BinaryPackage objects
    return [item[2] for item in indb.values()]

def main():
    db = createdb()
    pkgs = updatepackages(db)
    result = {
        "version" : 1,
        "binary" : [pkg.astuple() for pkg in pkgs],
        }
    db.commit()
    print json.dumps(result)
main()

© 2014-2024 Faster IT GmbH | imprint | privacy policy