1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
#!/usr/bin/python
# list-queue -- list security-master queue contents
# Copyright (C) 2011 Florian Weimer <fw@deneb.enyo.de>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
# This script is intended to be run on security-master to get an
# unprocessed dump of the contents of the embargoed and unembargoed
# queues.
#
# The script reads .deb files. A caching database is written to
# ~/.cache.
######################################################################
# Configuration
DIRECTORIES = ('/org/security-master.debian.org/queue/embargoed',
'/org/security-master.debian.org/queue/unembargoed')
# End Configuration
######################################################################
import sys
import os.path
def setup_path():
dirname = os.path.dirname
base = dirname(dirname(os.path.realpath(sys.argv[0])))
sys.path.insert(0, os.path.join(base, "lib", "python"))
setup_path()
import sqlite3
import json
import debian_support
def createdb():
cache = os.path.expanduser("~/.cache")
if not os.path.isdir(cache):
os.mkdir(cache)
dbfile = os.path.join(cache, "secure-testing_list-debs.sqlite")
db = sqlite3.connect(dbfile, isolation_level="IMMEDIATE")
db.execute("PRAGMA page_size = 4096")
db.execute("PRAGMA journal_mode = WAL")
db.execute("""CREATE TABLE IF NOT EXISTS package (
path TEXT NOT NULL PRIMARY KEY,
size INTEGER NOT NULL CHECK (size >= 0),
mtime INTEGER NOT NULL CHECK (size >= 0),
name TEXT NOT NULL,
version TEXT NOT NULL,
arch TEXT NOT NULL,
source TEXT NOT NULL,
source_version TEXT NOT NULL
)""")
return db
def readdirs():
result = {}
for path in DIRECTORIES:
for entry in os.listdir(path):
if entry.startswith(".") or not entry.endswith(".deb"):
continue
name = os.path.join(path, entry)
stat = os.stat(name)
result[name] = (stat.st_size, stat.st_mtime)
return result
def readpackages(db):
result = {}
c = db.cursor()
for row in c.execute("SELECT * FROM package"):
name, size, mtime = row[0:3]
pkg = debian_support.BinaryPackage(row[3:])
result[name] = (size, mtime, pkg)
return result
def updatepackages(db):
"""Updates the package table from the file system.
Returns the current list of package objects, in arbitary order.
"""
ondisk = readdirs()
indb = readpackages(db)
# Expire old entries
need_delete = ((path,) for path in indb if path not in ondisk)
db.executemany("DELETE FROM package WHERE path = ?", need_delete)
# Update the cache in indb and the database
need_update = [(path, stat) for (path, stat) in ondisk.items()
if path not in indb or stat <> tuple(indb[path][0:2])]
db.executemany("DELETE FROM package WHERE path = ?",
((path,) for path, _ in need_update))
def do_update():
for (path, stat) in need_update:
pkg = debian_support.inspect_deb(path)
indb[path] = stat + (pkg,)
yield (path,) + stat + pkg.astuple()
db.executemany("INSERT INTO package VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
do_update())
# Return a list of BinaryPackage objects
return [item[2] for item in indb.values()]
def main():
db = createdb()
pkgs = updatepackages(db)
result = {
"version" : 1,
"binary" : [pkg.astuple() for pkg in pkgs],
}
db.commit()
print json.dumps(result)
main()
|