"""Screen scraper for Patch Manager interface
The patch form URL is http://www.sourceforge.net/patch/index.php.
GET method
If I'm lucky, it can be used without authentication.
the input fields are: (* means hidden field)
*group_id=5470
*custom=set
_assigned_to=None
_status=None
This script produces the following HTML for each entry:
100518
|
fix bltinmodule.c for 64-bit platforms |
2000-Jun-07 03:21 |
gvanrossum |
tmick |
If there are more than 50 patches, the following HTML is produced:
| | Next 50 --> |
Future plans:
support authentication
command-line interface for modifying patches
"""
import cgi
import re
import types
from urllib import urlencode
from urlparse import urljoin
from urllib import urlopen
import pg
from sfdb import PatchDBInterface
VERBOSE = 0
DATABASE = None
class PatchListParser:
"""Minimal re-based parsed that grabs relevant URLs from summary"""
rx_href = re.compile('HREF="([?/=&_A-Za-z0-9]+)"')
def parse_hrefs(self, buf):
hrefs = []
offset = 0
while 1:
mo = self.rx_href.search(buf, offset)
if mo is None:
break
offset = mo.end(1)
hrefs.append(mo.group(1))
return hrefs
def get_query_hrefs(self, buf):
queries = []
for href in self.parse_hrefs(buf):
if href[0] == '?':
queries.append(href)
return queries
class PatchParser:
"""Minimal re-based parser that pulls key-values from patch page"""
rx_entry = re.compile(']*>(.+): (.+) | ')
def parse(self, buf):
entries = {}
offset = 0
while 1:
mo = self.rx_entry.search(buf, offset)
if mo is None:
break
offset = mo.end(2)
k, v = mo.group(1, 2)
entries[k] = v
return entries
def urldecode(query):
d = cgi.parse_qs(query)
for k, v in d.items():
if len(v) != 1:
raise ValueError, "unexpected duplicate entry"
d[k] = v[0]
return d
class PatchManager:
url = "http://www.sourceforge.net/patch/index.php"
group_id = 5470
list_parser = PatchListParser()
patch_parser = PatchParser()
# XXX to get the right numeric values for assigned_to and status,
# would need to scrape them out of the form...
def get_patches(self, assigned_to='0', status='0'):
assert type(assigned_to) == types.StringType
assert type(status) == types.StringType
url = self._get_initial_query(assigned_to, status)
patch_list = self._load_patch_summary(url)
patches = {}
for patch_id, p in patch_list:
patches[patch_id] = self._load_patch_detail(p)
return patches
def _get_initial_query(self, assigned_to, status):
dict = {'group_id': self.group_id,
'set': 'custom',
'SUBMIT': 'Browse',
'_assigned_to': assigned_to,
'_status': status,
}
query = urlencode(dict)
return "%s?%s" % (self.url, query)
def _load_patch_summary(self, url):
todo = [(url, 0)]
patches = []
offset = 0
while todo:
url, offset = todo[0]
del todo[0]
if VERBOSE:
print "load %s" % url
buf = urlopen(url).read()
for href in self.list_parser.get_query_hrefs(buf):
d = urldecode(href[1:])
if d['func'] == 'detailpatch':
patches.append((int(d['patch_id']),
urljoin(self.url, href)))
elif d['func'] == 'browse':
new_offset = int(d['offset'])
if new_offset > offset:
todo.append((urljoin(self.url, href),
new_offset))
return patches
def _load_patch_detail(self, url):
if VERBOSE:
print "load %s" % url
buf = urlopen(url).read()
return self.patch_parser.parse(buf)
if __name__ == "__main__":
import sys
import getopt
opts, args = getopt.getopt(sys.argv[1:], 'vd:')
assert len(args) == 0
for k, v in opts:
if k == '-v':
VERBOSE = 1
elif k == '-d':
DATABASE = v
pmgr = PatchManager()
if VERBOSE:
print "Loading patches"
p = pmgr.get_patches()
if VERBOSE:
print "Retrieved %d patches" % len(p)
if VERBOSE:
print "Inserting into local database"
if DATABASE:
db = pg.connect(DATABASE)
else:
db = pg.connect()
pdbi = PatchDBInterface(db)
for p_id, attrs in p.items():
pdbi.update(p_id, attrs)
if VERBOSE:
new = len(p) - pdbi.num_deletes
print "Found %d new patches" % new
print "Updated %d existing patches" % pdbi.num_deletes