Pythonista 達の熱き闘いが,
今,始まる...!!
[2010/12/19 00:43:44] 登録
■ 名前
■ ステータス
HP | SP | 攻撃力 | 集中力 | 防御力 | 素早さ | 運 |
---|---|---|---|---|---|---|
1246 | 24 | 100 | 38 | 8 | 3 | 3 |
■ 必殺技
名前 | タイプ | レベル | 消費 SP |
---|---|---|---|
雀落とし | MultiAttackType | 3 | 20 |
暗器砲 | SuicideAttackType | 2 | 9 |
■ コード
#!/usr/bin/env python # -*- coding: utf-8 -*- # # bottlelib2.py - SSTP Bottle for SLPP/2.0 client library # Copyright (C) 2010 by Atzm WATANABE <atzm@atzm.org> # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License (version 2) as # published by the Free Software Foundation. It is distributed in the # hope that it will be useful, but WITHOUT ANY WARRANTY; without even the # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR # PURPOSE. See the GNU General Public License for more details. # # $Id: bottlelib2.py,v 1.2 2010/12/05 14:49:13 atzm Exp $ # import re import time import socket import select import urllib import mimetools import traceback try: import cStringIO as _StringIO except ImportError: import StringIO as _StringIO URL_BOTTLE2 = 'http://bottle.mikage.to/bottle2.cgi' SLPP_HOST = 'bottle.mikage.to' SLPP_PORT = 9871 USER_AGENT = 'bottlelib2/1.0' BROADCAST_TYPE_VOTE = 'Vote' BROADCAST_TYPE_AGREE = 'Agree' def _sjis2unicode(s): return unicode(s, 'sjis', 'replace') def _unicode2sjis(u): return u.encode('sjis', 'replace') def _mid2unixtime(mid): return int('0x%s' % mid[:8], 16) def _build_response(stringio): message = mimetools.Message(stringio, 0) data = {} for key in message.keys(): key = key.lower() # canonicalize response key values = [_sjis2unicode(v) for v in message.getheaders(key)] data[key] = values return data class BottleClientError(RuntimeError): def __init__(self, message, detail=None): self.message = message self.detail = detail def __str__(self): if self.detail is None: message = self.message else: message = "%s (%s)" % (self.message, self.detail) return message class BottleHTTPClient: def __init__(self, url=URL_BOTTLE2, luid='', ua=USER_AGENT, proxy=None): if luid and len(luid) != 82: raise BottleClientError('invalid LUID', luid) self.url = url self.luid = luid self.ua = ua self.proxy = proxy self.channels = {} def _build_request(self, req): req = '\r\n'.join(req + ['Agent: %s' % self.ua]) return urllib.quote_plus(req) # public APIs def clear(self): self.channels = {} def send_cgi_request(self, url, req): class BottleURLopener(urllib.URLopener): version = self.ua try: req = self._build_request(req) fil = BottleURLopener(proxies=self.proxy).open(url, req) hdr = fil.info() sio = _StringIO.StringIO(fil.read()) fil.close() except: s = _StringIO.StringIO() traceback.print_exc(file=s) raise BottleClientError('could not send CGI request', s.getvalue()) res = _build_response(sio) return res.get('result', [''])[0] == 'OK', res, hdr def get_new_id(self): ok, res, hdr = self.send_cgi_request(self.url, ['Command: getNewId']) if not ok: msg = None if 'extramessage' in res: msg = res['extramessage'][0] raise BottleClientError('failed to get LUID', msg) if res['luid'][0] != 82: raise BottleClientError('got invalid LUID', luid) self.luid = res['luid'][0] return self.luid def get_channels(self): if not self.luid: raise BottleClientError('LUID is not set') ok, res, hdr = self.send_cgi_request(self.url, [ 'Command: getChannels', 'LUID: %s' % self.luid, ]) if not ok: msg = None if 'extramessage' in res: msg = res['extramessage'][0] raise BottleClientError('failed to get channels', msg) self.channels = {} for i in xrange(int(res['count'][0])): ch_index = i + 1 prefix = "ch%d_%%s" % ch_index # index means no specials? self.channels[res[prefix % 'name'][0]] = { 'index': ch_index, 'name': res[prefix % 'name'][0], 'ghost': res[prefix % 'ghost'][0], 'info': res[prefix % 'info'][0], 'warnpost': int(res[prefix % 'warnpost'][0]), 'nopost': int(res[prefix % 'nopost'][0]), 'count': int(res[prefix % 'count'][0]), } return self.channels def set_channels(self, channels): if not self.luid: raise BottleClientError('LUID is not set') req = ['Command: setChannels', 'LUID: %s' % self.luid] for i, channel in enumerate(channels): if channel not in self.channels: raise BottleClientError('invalid channel', channel) req.append('Ch%d: %s' % (i + 1, _unicode2sjis(channel))) ok, res, hdr = self.send_cgi_request(self.url, req) if not ok: msg = None if 'extramessage' in res: msg = res['extramessage'][0] raise BottleClientError('failed to set channels', msg) # after, must handle ChannelList return ok def send_broadcast(self, channel, sc, ghost=None): if not self.luid: raise BottleClientError('LUID is not set') if channel not in self.channels: raise BottleClientError('unknown channel', channel) if self.channels[channel]['nopost'] == 1: raise BottleClientError('posting channel is forbidden', channel) req = [ 'Command: sendBroadcast', 'LUID: %s' % self.luid, 'Channel: %s' % _unicode2sjis(channel), 'Talk: %s' % _unicode2sjis(sc), ] if ghost: req.append('Ghost: %s' % _unicode2sjis(ghost)) ok, res, hdr = self.send_cgi_request(self.url, req) if not ok: msg = None if 'extramessage' in res: msg = res['extramessage'][0] raise BottleClientError('failed to send broadcast', msg) return ok def vote_message(self, mid, type_): if not self.luid: raise BottleClientError('LUID is not set') if type_ not in [BROADCAST_TYPE_VOTE, BROADCAST_TYPE_AGREE]: raise BottleClientError('invalid vote type', type_) ok, res, hdr = self.send_cgi_request(self.url, [ 'Command: voteMessage', 'LUID: %s' % self.luid, 'MID: %s' % mid, 'VoteType: %s' % type_, ]) if not ok: msg = None if 'extramessage' in res: msg = res['extramessage'][0] raise BottleClientError('failed to vote message', msg) return int(res['votes'][0]) class BottleSLPPClient: def __init__(self, host=SLPP_HOST, port=SLPP_PORT): self.host = host self.port = port self.sock = None self.sockfp = None def _recv_slpp_message(self): buf = [] cmd = None for line in self.sockfp: if line == '\r\n': if cmd: break continue if not cmd: cmd = line.strip() elif line.find(':') > 0: buf.append(line) res = _build_response(_StringIO.StringIO(''.join(buf))) return cmd, res # public APIs def open(self): if self.sock or self.sockfp: self.close() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self.sockfp = self.sock.makefile() def close(self): if self.sock: self.sock.close() if self.sockfp: self.sockfp.close() self.sock = None self.sockfp = None def connect(self, luid): self.sockfp.write('POST /\r\n\r\n' + luid + '\r\n') self.sockfp.flush() cmd, detail = self._recv_slpp_message() if not cmd: self.close() self.notify_connection_closed() return False ver, status, reason = cmd.split(None, 2) if ver not in ['HTTP/1.0', 'HTTP/1.1']: raise BottleClientError('unknown protocol', ver) if status != '200': raise BottleClientError('invalid status', status) if reason != 'OK': raise BottleClientError('invalid reason', reason) return True def handle_event(self, timeout=0): # default: non-blocking r, w, e = select.select([self.sockfp], [], [], timeout) if not r: return False cmd, detail = self._recv_slpp_message() if not cmd: self.close() self.notify_connection_closed() return False handler = getattr(self, '_handle_%s' % cmd, None) if not callable(handler): if __debug__: print 'unable to handle command %s: %s' % (cmd, str(detail)) return False handler(cmd, detail) return True # SLPP/2.0 command handlers def _handle_broadcastMessage(self, cmd, detail): mid = detail['mid'][0] sc = detail['script'][0] ch = detail['channel'][0] time_ = _mid2unixtime(mid) ifghost = None if 'ifghost' in detail: ifghost = detail['ifghost'][0] self.prep_sstp_message(time_, mid, sc, ch, ifghost, False, False) self.handle_sstp_message(time_, mid, sc, ch, ifghost, False, False) def _handle_forceBroadcastMessage(self, cmd, detail): if 'script' in detail: mid = detail['mid'][0] scrt = detail['script'][0] time_ = _mid2unixtime(mid) self.prep_sstp_message(time_, mid, scrt, None, None, False, True) self.handle_sstp_message(time_, mid, scrt, None, None, False, True) if 'dialogmessage' in detail: self.prep_dialog_message(detail['dialogmessage'][0]) self.handle_dialog_message(detail['dialogmessage'][0]) def _handle_allUsers(self, cmd, detail): self.prep_all_users(int(detail['num'][0])) self.handle_all_users(int(detail['num'][0])) def _handle_channelUsers(self, cmd, detail): self.prep_channel_users(detail['channel'][0], int(detail['num'][0])) self.handle_channel_users(detail['channel'][0], int(detail['num'][0])) def _handle_channelList(self, cmd, detail): num = int(detail['num'][0]) entries = {} if 'entry' in detail: entry_re = re.compile('(\S+)\s+\((\d+)\)') for e in detail['entry']: m = entry_re.match(e) if m: entries[m.group(1)] = int(m.group(2)) self.prep_channel_list(num, entries) self.handle_channel_list(num, entries) def _handle_broadcastInformation(self, cmd, detail): type_ = detail['type'][0] self.prep_broadcast_information(type_, detail, False) self.handle_broadcast_information(type_, detail, False) def _handle_forceBroadcastInformation(self, cmd, detail): type_ = detail['type'][0] self.prep_broadcast_information(type_, detail, True) self.handle_broadcast_information(type_, detail, True) def _handle_closeChannel(self, cmd, detail): self.prep_close_channel(detail['channel'][0]) self.handle_close_channel(detail['channel'][0]) def _handle_sendUnicast(self, cmd, detail): pass # ??? # callbacks for connection close def notify_connection_closed(self): pass # callbacks for preprocessing message def prep_sstp_message(self, time_, mid, sc, ch, ghost, unicast, forced): pass def prep_dialog_message(self, message): pass def prep_all_users(self, num): pass def prep_channel_users(self, channel, num): pass def prep_channel_list(self, num, entries): pass def prep_broadcast_information(self, type_, detail, forced): pass def prep_close_channel(self, channel): pass # callbacks for handling messages def handle_sstp_message(self, time_, mid, sc, ch, ghost, unicast, forced): pass def handle_dialog_message(self, message): pass def handle_all_users(self, num): pass def handle_channel_users(self, channel, num): pass def handle_channel_list(self, num, entries): pass def handle_broadcast_information(self, type_, detail, forced): pass def handle_close_channel(self, channel): pass class BottleClient(BottleSLPPClient, BottleHTTPClient): def __init__(self, host, port, url, luid='', ua=USER_AGENT, proxy=None): BottleHTTPClient.__init__(self, url, luid, ua, proxy) BottleSLPPClient.__init__(self, host, port) self.joined_channels = [] self.users = 0 self.started = False # public APIs def start(self): if not self.luid: self.get_new_id() self.open() self.connect(self.luid) self.get_channels() self.started = True return self.started def finish(self): self.close() self.clear() self.joined_channels = [] self.users = 0 self.started = False def join(self, channels): if not self.started: raise BottleClientError('not started yet') if not channels: raise BottleClientError('empty channels required') self.set_channels(channels) self.joined_channels = channels return True def run_forever(self): while self.started: self.handle_event(None) # callbacks for BottleSLPPClient def notify_connection_closed(self): self.finish() def prep_all_users(self, num): self.users = num def prep_channel_users(self, channel, num): if channel in self.channels: self.channels[channel]['count'] = num if channel not in self.joined_channels: self.joined_channels.append(channel) def prep_close_channel(self, channel): if channel in self.channels: del self.channels[channel] while channel in self.joined_channels: self.joined_channels.remove(channel) class BottleDumper(BottleClient): def handle_sstp_message(self, time_, mid, sc, ch, ghost, unicast, forced): print ' - SSTP message received:' print ' time :', time.ctime(time_) print ' mid :', mid print ' script :', sc print ' channel :', ch print ' ifghost :', ghost print ' unicast :', unicast print ' forced :', forced def handle_dialog_message(self, message): print ' - Dialog message received:' print ' message :', message def handle_all_users(self, num): print ' - All users notification received:' print ' num :', num def handle_channel_users(self, channel, num): print ' - Channel users notification received:' print ' channel :', channel print ' num :', num def handle_channel_list(self, num, entries): print ' - Channel list notification received:' print ' num :', num print ' entries :' for k, v in entries.iteritems(): print ' %s: %s' % (k, v) def handle_broadcast_information(self, type_, detail, forced): print ' - Broadcast information received:' for k, v in detail.iteritems(): print ' %-9s: %s' % (k, v) def handle_close_channel(self, channel): print ' - Close channel notification received:' print ' channel :', channel def dump_available_channels(self): print '***** available channels *****' for k, v in self.channels.iteritems(): print ' - %s:' % k for kk, vv in v.iteritems(): print ' %-9s: %s' % (kk, vv) print '******************************' def dump_joined_channels(self): print '****** joined channels ******' for k in dumper.joined_channels: print ' -', k print '******************************' if __name__ == '__main__': import os import sys import getopt options, rest = getopt.getopt(sys.argv[1:], 'h:p:u:l:', ['host=', 'port=', 'url=', 'luid=']) luid = os.getenv('SSTPBOTTLE_LUID', '') host = SLPP_HOST port = SLPP_PORT url = URL_BOTTLE2 for opt, val in options: if opt in ['-h', '--host']: host = val elif opt in ['-p', '--port']: port = int(val) elif opt in ['-u', '--url']: url = val elif opt in ['-l', '--luid']: luid = val if not luid: raise SystemExit('no LUID specified!') dumper = BottleDumper(host, port, url, luid) if not dumper.start(): raise BottleClientError('could not connect to a server') dumper.dump_available_channels() if not dumper.join(dumper.channels.keys()): raise BottleClientError('could not join channels') dumper.dump_joined_channels() try: dumper.run_forever() except KeyboardInterrupt: print dumper.finish() print r'\e'