"""Screen scraper for Patch Manager interface The patch form URL is http://www.sourceforge.net/patch/index.php. GET method If I'm lucky, it can be used without authentication. the input fields are: (* means hidden field) *group_id=5470 *custom=set _assigned_to=None _status=None This script produces the following HTML for each entry: 100518 fix bltinmodule.c for 64-bit platforms 2000-Jun-07 03:21 gvanrossum tmick If there are more than 50 patches, the following HTML is produced:   Next 50 --> Future plans: support authentication command-line interface for modifying patches """ import cgi import re import types from urllib import urlencode from urlparse import urljoin from urllib import urlopen import pg from sfdb import PatchDBInterface VERBOSE = 0 DATABASE = None class PatchListParser: """Minimal re-based parsed that grabs relevant URLs from summary""" rx_href = re.compile('HREF="([?/=&_A-Za-z0-9]+)"') def parse_hrefs(self, buf): hrefs = [] offset = 0 while 1: mo = self.rx_href.search(buf, offset) if mo is None: break offset = mo.end(1) hrefs.append(mo.group(1)) return hrefs def get_query_hrefs(self, buf): queries = [] for href in self.parse_hrefs(buf): if href[0] == '?': queries.append(href) return queries class PatchParser: """Minimal re-based parser that pulls key-values from patch page""" rx_entry = re.compile(']*>(.+):
(.+)') def parse(self, buf): entries = {} offset = 0 while 1: mo = self.rx_entry.search(buf, offset) if mo is None: break offset = mo.end(2) k, v = mo.group(1, 2) entries[k] = v return entries def urldecode(query): d = cgi.parse_qs(query) for k, v in d.items(): if len(v) != 1: raise ValueError, "unexpected duplicate entry" d[k] = v[0] return d class PatchManager: url = "http://www.sourceforge.net/patch/index.php" group_id = 5470 list_parser = PatchListParser() patch_parser = PatchParser() # XXX to get the right numeric values for assigned_to and status, # would need to scrape them out of the form... def get_patches(self, assigned_to='0', status='0'): assert type(assigned_to) == types.StringType assert type(status) == types.StringType url = self._get_initial_query(assigned_to, status) patch_list = self._load_patch_summary(url) patches = {} for patch_id, p in patch_list: patches[patch_id] = self._load_patch_detail(p) return patches def _get_initial_query(self, assigned_to, status): dict = {'group_id': self.group_id, 'set': 'custom', 'SUBMIT': 'Browse', '_assigned_to': assigned_to, '_status': status, } query = urlencode(dict) return "%s?%s" % (self.url, query) def _load_patch_summary(self, url): todo = [(url, 0)] patches = [] offset = 0 while todo: url, offset = todo[0] del todo[0] if VERBOSE: print "load %s" % url buf = urlopen(url).read() for href in self.list_parser.get_query_hrefs(buf): d = urldecode(href[1:]) if d['func'] == 'detailpatch': patches.append((int(d['patch_id']), urljoin(self.url, href))) elif d['func'] == 'browse': new_offset = int(d['offset']) if new_offset > offset: todo.append((urljoin(self.url, href), new_offset)) return patches def _load_patch_detail(self, url): if VERBOSE: print "load %s" % url buf = urlopen(url).read() return self.patch_parser.parse(buf) if __name__ == "__main__": import sys import getopt opts, args = getopt.getopt(sys.argv[1:], 'vd:') assert len(args) == 0 for k, v in opts: if k == '-v': VERBOSE = 1 elif k == '-d': DATABASE = v pmgr = PatchManager() if VERBOSE: print "Loading patches" p = pmgr.get_patches() if VERBOSE: print "Retrieved %d patches" % len(p) if VERBOSE: print "Inserting into local database" if DATABASE: db = pg.connect(DATABASE) else: db = pg.connect() pdbi = PatchDBInterface(db) for p_id, attrs in p.items(): pdbi.update(p_id, attrs) if VERBOSE: new = len(p) - pdbi.num_deletes print "Found %d new patches" % new print "Updated %d existing patches" % pdbi.num_deletes