NAME = 'Technorati/Python' VERSION = '0.03' # Copyright (C) 2003 Phillip Pearson URL = 'http://www.myelin.co.nz/technorati_py/' # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation files # (the "Software"), to deal in the Software without restriction, # including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, # and to permit persons to whom the Software is furnished to do so, # subject to the following conditions: # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # Related work: # # PyTechnorati by Mark Pilgrim: # http://diveintomark.org/projects/pytechnorati/ # # xmltramp/technorati.py by Aaron Swartz # http://www.aaronsw.com/2002/xmltramp/technorati.py __history__ = ''' v0.03 - now supporting the new 'search' command. v0.02 - now using the latest version of the API (no .xml URLs, format= and version= arguments) - you can now get more than just the first page of cosmos results (use start= or -s / --start) - now throwing an exception when we get an HTTP error - '--cosmos' command-line option added (same as --inbound) - now supporting all license key locations used by PyTechnorati v0.01 initial release http://www.myelin.co.nz/post/2003/5/12/#200305124 ''' import urllib, sgmllib, os, sys import appuifw, e32 app = appuifw.Application() def setLicense(license_key): "Set the license key" global LICENSE_KEY LICENSE_KEY = license_key def findkey(license_key=None): "Find out the current user's API key" class GotIt(Exception): def __init__(self, key): self.key = key def tryvar(key): if key: raise GotIt(key) def tryfile(fn): if DEBUG: print "trying",fn if os.path.exists(fn): tryvar(open(fn).readline().strip()) def modulepath(): return os.path.split(os.path.abspath(sys.argv[0]))[0] try: tryvar(license_key) tryvar(LICENSE_KEY) tryvar(os.environ.get('TECHNORATI_LICENSE_KEY', None)) for path in ('.', os.path.expanduser('~'), modulepath()): for leaf in ('.technoratikey', 'technoratikey.txt', 'apikey.txt'): tryfile(os.path.join(path, leaf)) except GotIt, g: setLicense(g.key) return LICENSE_KEY raise Exception, "Can't find license key" LICENSE_KEY = None DEBUG = 0 class opener(urllib.FancyURLopener): version = '%s v%s; %s' % (NAME, VERSION, URL) def http_error_default(self, url, fp, errcode, errmsg, headers, data=None): raise IOError, "HTTP error %s fetching http:%s" % (errcode, url) class BadUrlError(Exception): pass def call(proc, args, license_key=None): #if args['url'] in (None, ''): # raise BadUrlError("No URL supplied") args['key'] = findkey(license_key) args['format'] = 'xml' args['version'] = '0.9' url = 'http://api.technorati.com/%s?%s' % (proc, urllib.urlencode(args)) o = opener() f = o.open(url) xml = f.read() if DEBUG: print xml return xml def parse(parser, xml): parser.feed(xml) parser.close() return parser.data class genericParser(sgmllib.SGMLParser): def __init__(self, itemsName): sgmllib.SGMLParser.__init__(self) self.data = {} self.inresult = self.inweblog = self.initem = 0 self.weblog = None self.item = None self.data[itemsName] = self.items = [] self.collector = None def collect(self): assert self.collector is None, "already collecting: parse failure!" self.collector = [] def grab(self): s = "".join(self.collector) self.collector = None return s def grab_int(self): x = self.grab() if not x: return 0 return int(x) def handle_data(self, s): if self.collector is not None: self.collector.append(s) def start_document(self, attrs): pass def end_document(self): pass def start_result(self, attrs): self.inresult = 1 def end_result(self): self.inresult = 0 def start_item(self, attrs): self.initem = 1 self.item = {} def end_item(self): self.initem = 0 self.items.append(self.item) self.item = None def start_nearestpermalink(self, attrs): assert self.initem self.collect() def end_nearestpermalink(self): self.item['nearestpermalink'] = self.grab() def start_excerpt(self, attrs): assert self.initem self.collect() def end_excerpt(self): self.item['excerpt'] = self.grab() def start_linkcreated(self, attrs): assert self.initem self.collect() def end_linkcreated(self): self.item['linkcreated'] = self.grab() def start_weblog(self, attrs): assert self.initem or self.inresult, "found element outside or " self.inweblog = 1 self.weblog = {} def end_weblog(self): self.inweblog = 0 if self.initem: self.item['weblog'] = self.weblog #self.weblogs.append(self.weblog) elif self.inresult: self.data['weblog'] = self.weblog else: raise AssertionFailure, " element not in item or result...?" self.weblog = None def start_rankingstart(self, attrs): self.collect() def end_rankingstart(self): self.data['rankingstart'] = int(self.grab()) def start_url(self, attrs): self.collect() def end_url(self): if self.inweblog: self.weblog['url'] = self.grab() else: self.data['url'] = self.grab() def start_name(self, attrs): self.collect() def end_name(self): self.weblog['name'] = self.grab() def start_rssurl(self, attrs): self.collect() def end_rssurl(self): self.weblog['rssurl'] = self.grab() def start_inboundblogs(self, attrs): self.collect() def end_inboundblogs(self): if self.inweblog: x = self.weblog elif self.inresult: x = self.data else: raise AssertionFailure, " element not in or " x['inboundblogs'] = self.grab_int() def start_inboundlinks(self, attrs): self.collect() def end_inboundlinks(self): if self.inweblog: x = self.weblog elif self.inresult: x = self.data else: raise AssertionFailure, " element not in or " x['inboundlinks'] = self.grab_int() def start_lastupdate(self, attrs): self.collect() def end_lastupdate(self): self.weblog['lastupdate'] = self.grab() def getCosmos(url, start=None, license_key=None): "gets a blog's cosmos and returns an ApiResponse containing a Weblog object ('weblog') for the blog and a list ('inLinks') of Link objects for its neighbours" args = {'url': url} if start is not None: args['start'] = '%d' % start xml = call('cosmos', args, license_key) data = parse(genericParser('inbound'), xml) print data return data def getBlogInfo(url, license_key=None): "gets info about a blog and returns it as a Weblog object" xml = call('bloginfo', {'url': url}, license_key) data = parse(genericParser('weblogs'), xml) return data.get('weblog', None) def getOutboundBlogs(url, license_key=None): "gets a list of blogs linked to by a blog and returns an ApiResponse containing a Weblog object ('weblog') for the blog and a list ('outLinks') of Weblog objects for the linked-to blogs" xml = call('outbound', {'url': url}, license_key) data = parse(genericParser('outbound'), xml) return data def search(query, license_key=None): xml = call('search', {'query': query}, license_key) data = parse(genericParser('search'), xml) return data class App: def __init__(self): #pass try: self.view = self.textTab(u'') except Exception, E: print E try: app.body = appuifw.Text() app.set_tabs([u"Main"], self.handle_tab) self.lock = e32.Ao_lock() except Exception, E: print E #appuifw.Application().exit_key_handler = self.exit_key_handler #def exit_key_handler(self): # self.lock.signal() def handle_tab(self, index): self.view.activate() def main(self,url): # pass import sys #opts, rest = getopt.getopt(sys.argv[1:], 'dts:u:', ('debug', 'test', 'inbound', 'cosmos', 'start=', 'info', 'outbound', 'url=', 'search')) #arg = " ".join([x for x in rest if x.strip()]) #func = None #start = None #for opt,val in opts: # _map = {'inbound': getCosmos, # 'cosmos': getCosmos, # 'info': getBlogInfo, # 'outbound': getOutboundBlogs, # 'search': search, # } # if opt in ('-u', '--url'): # url = val # elif opt in ('-s', '--start'): # start = int(val) # elif opt in ('-d', '--debug'): # global DEBUG # DEBUG = 1 # elif opt in ('-t', '--test'): # func = test # elif opt.startswith('--') and _map.has_key(opt[2:]): # assert func is None, "Only one function (url, inbound, info or outbound) may be supplied" # func = _map[opt[2:]] try: func = getCosmos start = None arg = url if func is None: print "No function supplied; --url, --inbound, --info, --search or --outbound must be specified on the command line" return if start is not None: r = func(arg, start) else: r = func(arg) print r self.handle_tab(0) self.lock_wait() except Exception, E: print E class textTab: def __init__(self, text): self.t = appuifw.Text() self.view_text = text def activate(self): appuifw.Application().body = self.t appuifw.app.menu = [(u"Send", self.handle_send)] self.t.focus = True def handle_send(self): pass if __name__ == '__main__': findkey('8da19937392609b081d13208d3231924') url = appuifw.query(u'Enter Cosmos URI to check', 'text') app = App() app.main(url)