pkuipgw/pkuipgw

215 lines
6.3 KiB
Python
Executable File

#!/usr/bin/python3
#
# pkuipgw: PKU IPGW Client for Linux (v0.6.4)
# <https://www.linux-wiki.cn/>
# Copyright (c) 2007-2009,2011-2012 Chen Xing <cxcxcxcx@gmail.com>
# Copyright (c) 2014-2016 Casper Ti. Vector <caspervector@gmail.com>
# Copyright (c) 2015 Jianguo Chen <cjgls@pku.edu.cn>
from __future__ import unicode_literals
import getopt
import json
import re
import sys
import traceback
from getpass import getpass
from os.path import expanduser
if sys.version_info[0] < 3:
from ConfigParser import ConfigParser
from urllib import urlencode
from urllib2 import build_opener
else:
from configparser import ConfigParser
from urllib.parse import urlencode
from urllib.request import build_opener
class IpgwException(Exception):
pass
def uni(s):
if type(s) is type(u''):
return s
else:
return s.decode('UTF-8')
def width(s):
return len(s) + len([c for c in s if ord(c) > 128])
def pad(ws, record):
ret, sep = [], False
for w, s in reversed(list(zip(ws, record))):
if sep:
s += ' ' * (w - width(s) + 2)
if s:
sep = True
ret.append(s)
return ''.join(reversed(ret))
def table(rs):
ws = [max(width(s) for s in field) for field in zip(*rs)]
return ''.join(pad(ws, record) + '\n' for record in rs)
def fmt_dict(d):
return table([[k + ':', v] for k, v in sorted(d.items())])
def my_input(prompt, pw = False):
try:
if pw:
s = getpass(prompt)
else:
sys.stderr.write(prompt)
sys.stderr.flush()
s = sys.stdin.readline()
if not s:
raise EOFError
except EOFError:
sys.stderr.write('\n')
raise IpgwException('ValueError')
return uni(s.rstrip('\r\n'))
def my_open(opener, d):
txt = opener.open(
'https://its.pku.edu.cn/cas/ITSClient', urlencode(d).encode('UTF-8')
).read()
try:
txt = txt.decode('UTF-8')
except UnicodeDecodeError:
txt = txt.decode('GB18030')
try:
ret = json.loads(txt)
assert type(ret) == dict and \
all(type(k) == type(v) == type('') for k, v in ret.items())
except:
sys.stderr.write(txt + '\n' if txt and not txt.endswith('\n') else txt)
raise IpgwException(
'IpgwError', "malformed response for command `%s'" % d['cmd']
)
return ret
def get_cred(configFiles):
if configFiles:
config = ConfigParser()
if not config.read(configFiles):
raise IpgwException('ConfError', 'no readable config file')
elif 'pkuipgw' not in config.sections():
raise IpgwException(
'ConfError', "section `pkuipgw' not found in config file"
)
config = dict((uni(k), uni(v)) for k, v in config.items('pkuipgw'))
if 'username' not in config or 'password' not in config:
raise IpgwException(
'ConfError', "both `username' and `password' required"
)
username, password = config['username'], config['password']
else:
username = my_input('Username: ')
password = my_input('Password: ', pw = True)
return [('username', username), ('password', password)]
def do_kick(opener, cred):
d = my_open(opener, dict(cred + [('cmd', 'getconnections')]))
try:
if list(d.keys()) != ['succ']:
raise ValueError
l = d['succ']
if not l:
raise IpgwException('IpgwError', 'no connection to kick')
l = l.split(';')
if len(l) % 4:
raise ValueError
except ValueError:
sys.stderr.write(fmt_dict(d))
raise IpgwException('IpgwError', 'invalid connection info')
l = [l[i : i + 4] for i in range(0, len(l), 4)]
n = len(l)
sys.stderr.write('List of connections:\n')
sys.stderr.write(table([['(%d)' % i] + r for i, r in enumerate(l)]))
i = my_input('Choose a connection to kick (0 - %d): ' % (n - 1))
try:
i = int(i)
if not (0 <= i < n):
raise ValueError
except ValueError:
raise IpgwException('ValueError')
return my_open(opener, dict(cred + [
('cmd', 'disconnect'), ('ip', l[i][0])
]))
def do_connect(opener, cred, cmd, doAll):
return my_open(opener, dict(
cred + [('cmd', 'open'), ('iprange', 'fee' if doAll else 'free')]
if cmd == 'connect' else
cred + [('cmd', 'closeall')] if doAll else [('cmd', 'close')]
))
def in_main(argv):
try:
opts, args = getopt.getopt(argv, 'c:i')
except getopt.GetoptError:
raise IpgwException('ArgError')
configFiles, ask = [], False
for key, val in opts:
if key == '-c':
configFiles.append(val)
elif key == '-i':
ask = True
if ask:
if configFiles:
raise IpgwException('ArgError')
elif not configFiles:
configFiles = ['/etc/ipgwrc', expanduser('~/.ipgwrc')]
if len(args) < 1 or args[0] not in ['connect', 'disconnect', 'kick']:
raise IpgwException('ArgError')
elif len(args) == 1:
doAll = False
elif len(args) == 2 and args[1] == 'all':
if args[0] == 'kick':
raise IpgwException('ArgError')
doAll = True
else:
raise IpgwException('ArgError')
cred = get_cred(configFiles)
opener = build_opener()
opener.addheaders = [('User-Agent', '.')]
d = do_kick(opener, cred) if args[0] == 'kick' else \
do_connect(opener, cred, args[0], doAll)
sys.stdout.write(fmt_dict(d))
if 'error' in d:
raise IpgwException('IpgwError', 'response indicating failure')
def main():
try:
in_main(sys.argv[1:])
except IpgwException as ex:
info = {
'ArgError': [1,
'Usage: pkuipgw [-c cfg_file [-c ...] | -i]'
' ((connect | disconnect) [all] | kick)\n'
"Use `-i' to read username and password from keyboard.\n"
"Use `connect all' to connect to non-free IPs as well.\n"
], 'ValueError': [1, 'ValueError: invalid input\n'],
'ConfError': [1], 'IpgwError': [2]
}[ex.args[0]]
if len(info) == 1:
info.append('%s: %s\n' % ex.args)
sys.stderr.write(info[1])
sys.exit(info[0])
except:
traceback.print_exc()
sys.exit(3)
else:
sys.exit(0)
if __name__ == '__main__':
main()