mirror of
https://github.com/ssb22/web-imap-etc.git
synced 2023-06-20 11:18:33 +00:00
809 lines
43 KiB
Python
809 lines
43 KiB
Python
#!/usr/bin/env python
|
|
# (compatible with both Python 2 and Python 3)
|
|
|
|
# webcheck.py v1.576 (c) 2014-23 Silas S. Brown.
|
|
# See webcheck.html for description and usage instructions
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# CHANGES
|
|
# -------
|
|
# If you want to compare this code to old versions, most old
|
|
# versions are being kept on SourceForge's E-GuideDog SVN repository
|
|
# http://sourceforge.net/p/e-guidedog/code/HEAD/tree/ssb22/setup/
|
|
# use: svn co http://svn.code.sf.net/p/e-guidedog/code/ssb22/setup
|
|
# and on GitHub at https://github.com/ssb22/web-imap-etc
|
|
# and on GitLab at https://gitlab.com/ssb22/web-imap-etc
|
|
# and on Bitbucket https://bitbucket.org/ssb22/web-imap-etc
|
|
# and at https://gitlab.developers.cam.ac.uk/ssb22/web-imap-etc
|
|
# and in China: https://gitee.com/ssb22/web-imap-etc
|
|
|
|
max_threads = 10
|
|
delay = 5 # seconds (3 insufficient for StackExchange rate limit)
|
|
keep_etags = False # if True, will also keep any ETag headers as well as Last-Modified
|
|
verify_SSL_certificates = False # webcheck's non-Webdriver URLs are for monitoring public services and there's not a lot of point in SSL authentication; failures due to server/client certificate misconfigurations are more trouble than they're worth
|
|
|
|
import traceback, time, pickle, gzip, re, os, sys, socket, hashlib
|
|
try: import htmlentitydefs # Python 2
|
|
except ImportError: import html.entities as htmlentitydefs # Python 3
|
|
try: from HTMLParser import HTMLParser # Python 2
|
|
except ImportError: # Python 3
|
|
from html.parser import HTMLParser as _HTMLParser
|
|
class HTMLParser(_HTMLParser):
|
|
def __init__(self): _HTMLParser.__init__(self,convert_charrefs=False)
|
|
try: from commands import getoutput
|
|
except: from subprocess import getoutput
|
|
try: import urlparse # Python 2
|
|
except ImportError: import urllib.parse as urlparse # Python 3
|
|
try: from StringIO import StringIO # Python 2
|
|
except: from io import BytesIO as StringIO # Python 3
|
|
try: import Queue # Python 2
|
|
except: import queue as Queue # Python 3
|
|
try: unichr # Python 2
|
|
except: unichr,xrange = chr,range # Python 3
|
|
try: from urllib2 import quote,HTTPCookieProcessor,HTTPErrorProcessor,build_opener,HTTPSHandler,urlopen,Request,HTTPError,URLError # Python 2
|
|
except: # Python 3
|
|
from urllib.parse import quote
|
|
from urllib.request import HTTPCookieProcessor,build_opener,HTTPSHandler,urlopen,Request,HTTPErrorProcessor
|
|
from urllib.error import HTTPError,URLError
|
|
def B(s): # byte-string from "" literal
|
|
if type(s)==type("")==type(u""): return s.encode('utf-8') # Python 3
|
|
else: return s # Python 2
|
|
def S(b):
|
|
if type(b)==type(""): return b # Python 2
|
|
else: return b.decode('utf-8') # Python 3
|
|
def U(s):
|
|
if type(s)==type(u""): return s
|
|
return s.decode('utf-8')
|
|
def UL(s):
|
|
if type(s)==type(u""): return s
|
|
return s.decode('latin1')
|
|
def getBuf(f):
|
|
try: return f.buffer # Python 3
|
|
except: return f # Python 2
|
|
try: import ssl
|
|
except: # you won't be able to check https:// URLs
|
|
ssl = 0 ; verify_SSL_certificates = False
|
|
if '--single-thread' in sys.argv: max_threads = 1 # use --single-thread if something gets stuck and you need Ctrl-C to generate a meaningful traceback
|
|
if max_threads > 1:
|
|
try: import thread # Python 2
|
|
except ImportError: import _thread as thread # Python 3
|
|
|
|
default_filename = "webcheck" + os.extsep + "list"
|
|
def read_input_file(fname=default_filename):
|
|
if os.path.isdir(fname): # support webcheck.list etc as a directory
|
|
ret = [] ; files = os.listdir(fname)
|
|
if default_filename in files: # do this one first
|
|
ret += read_input_file(fname+os.sep+default_filename)
|
|
files.remove(default_filename)
|
|
for f in files:
|
|
if f.endswith("~") or f.lower().endswith(".bak"): continue # ignore
|
|
ret += [(l+" # from "+f) for l in read_input_file(fname+os.sep+f)]
|
|
return ret
|
|
try: o = open(fname)
|
|
except: return [] # not a file or resolvable link to one, e.g. lockfile in a webcheck.list dir
|
|
lines = o.read().replace("\r","\n").split("\n")
|
|
lines.reverse() # so can pop() them in order
|
|
return lines
|
|
def read_input():
|
|
ret = {} # domain -> { url -> checklist [(days,text,elseLogic)] }
|
|
# elseLogic = None or (url,checklist)
|
|
days = 0 ; extraHeaders = []
|
|
url = mainDomain = None
|
|
lines = read_input_file()
|
|
lastList = None
|
|
while lines:
|
|
line = line_withComment = " ".join(lines.pop().split())
|
|
if " #" in line: line = line[:line.index(" #")].strip()
|
|
if not line or line_withComment[0]=='#': continue
|
|
|
|
if line.startswith(":include"):
|
|
lines += [(l+" # from "+line) for l in read_input_file(line.split(None,1)[1])]
|
|
continue
|
|
|
|
if line.endswith(':'): freqCmd = line[:-1]
|
|
else: freqCmd = line
|
|
if freqCmd.lower()=="daily": days = 1
|
|
elif freqCmd.lower()=="weekly": days = 7
|
|
elif freqCmd.lower()=="monthly": days = 30
|
|
elif freqCmd.startswith("days"): days=int(freqCmd.split()[1])
|
|
else: freqCmd = None
|
|
if freqCmd: continue
|
|
|
|
if line.startswith("PYTHONPATH="):
|
|
sys.path = line.split("=",1)[1].replace("$PYTHONPATH:","").replace(":$PYTHONPATH","").split(":") + sys.path # for importing selenium etc, if it's not installed system-wide
|
|
continue
|
|
if line.startswith("PATH="):
|
|
os.environ["PATH"] = ":".join(line.split("=",1)[1].replace("$PATH:","").replace(":$PATH","").split(":") + os.environ.get("PATH","").split(":"))
|
|
continue
|
|
|
|
isElse = False
|
|
if line.startswith("else:"):
|
|
isElse = True
|
|
line=line[5:].lstrip()
|
|
line_withComment=line_withComment[5:].lstrip()
|
|
assert line, "else: must be followed on same line"
|
|
|
|
if line.startswith('also:') and url:
|
|
text = line_withComment[5:].strip()
|
|
# and leave url and mainDomain as-is (same as above line), TODO: interaction of 'also:' (and extra headers lines) with 'else:' might not be what users expect
|
|
elif ':' in line.split()[0] and not line.split(':',1)[1].startswith('//'):
|
|
header, value = line.split(':',1) ; value=value.strip()
|
|
if not value or header.lower()=='user-agent': # no value = delete header; user-agent can be set only once so auto-delete any previous setting
|
|
for e in extraHeaders:
|
|
if e.startswith(header+':'): extraHeaders.remove(e)
|
|
if value: extraHeaders.append(line)
|
|
continue
|
|
elif line.startswith("c://") and ' ; ' in line_withComment: # shell command
|
|
url, text = line_withComment.split(' ; ',1)
|
|
# mainDomain = url # if can parallelise
|
|
mainDomain = "" # might be better not to, if it's ssh commands etc
|
|
elif line.startswith('{') and '}' in line_withComment: # webdriver
|
|
actions = line_withComment[1:line_withComment.index('}')].split()
|
|
balanceBrackets(actions)
|
|
text = line_withComment[line_withComment.index('}')+1:].strip()
|
|
mainDomain = '.'.join(urlparse.urlparse(actions[0]).netloc.rsplit('.',2)[-2:]) # assumes 1st action is a URL
|
|
url = "wd://"+chr(0).join(actions)
|
|
if extraHeaders: url += '\n'+'\n'.join(extraHeaders)
|
|
else: # not webdriver
|
|
lSplit = line_withComment.split(None,1)
|
|
if len(lSplit)==1: url, text = lSplit[0],"" # RSS only
|
|
else: url, text = lSplit
|
|
assert "://" in url
|
|
mainDomain = '.'.join(urlparse.urlparse(url).netloc.rsplit('.',2)[-2:])
|
|
if extraHeaders: url += '\n'+'\n'.join(extraHeaders)
|
|
if isElse:
|
|
assert lastList, "else without suitable rule before it"
|
|
lastList[-1] = lastList[-1][:2] + ((url,[(0,text,None)]),) # must be days=0 because don't want to re-check the days count when just retrieved and failed something possibly on same URL ('else:' can be used for simple retrying)
|
|
lastList = lastList[-1][2][1] # so 'else' can be used as 'else if'
|
|
else:
|
|
lastList = ret.setdefault({
|
|
# domains to treat as equivalent for rate reduce
|
|
"superuser.com":"stackoverflow.com",
|
|
"stackexchange.com":"stackoverflow.com",
|
|
}.get(mainDomain,mainDomain),{}).setdefault(url,[])
|
|
lastList.append((days,text,None))
|
|
return ret
|
|
|
|
def balanceBrackets(wordList):
|
|
"For webdriver instructions: merge adjacent items of wordList so each item has balanced square brackets (currently checks only start and end of each word; if revising this, be careful about use on URLs). Also checks quotes (TODO: make sure that doesn't interfere with brackets)."
|
|
bracketLevel = 0 ; i = 0
|
|
while i < len(wordList)-1:
|
|
blOld = bracketLevel
|
|
if wordList[i][0] in '["': bracketLevel += 1
|
|
elif not bracketLevel and (('->"' in wordList[i] and not wordList[i].endswith('->"')) or '="' in wordList[i]): bracketLevel += 1
|
|
if wordList[i][-1] in ']"': bracketLevel -= 1
|
|
if bracketLevel > 0:
|
|
wordList [i] += " "+wordList[i+1]
|
|
del wordList[i+1] ; bracketLevel = blOld
|
|
else:
|
|
i += 1 ; bracketLevel = 0
|
|
|
|
class HTMLStrings(HTMLParser):
|
|
def __init__(self):
|
|
HTMLParser.__init__(self)
|
|
self.theTxt = []
|
|
self.omit = False
|
|
def handle_data(self, data):
|
|
if self.omit or not data: return
|
|
elif not data.strip(): self.ensure(' ')
|
|
else:
|
|
d2 = data.lstrip()
|
|
if not d2==data: self.ensure(' ') # (always collapse multiple spaces, even across tags)
|
|
if d2: self.theTxt.append(re.sub('[ \t\r\n]+',' ',d2.replace(unichr(160).encode('utf-8').decode('latin1'),' ')))
|
|
def ensure(self,thing):
|
|
if self.theTxt and self.theTxt[-1].endswith(thing): return
|
|
self.theTxt.append(thing)
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag in "p br div h1 h2 h3 h4 h5 h6 th tr td table dt dd".split(): self.ensure(' ') # space rather than newline because we might want to watch for a string that goes across headings etc
|
|
elif tag in ["script","style"]: self.omit=True
|
|
def handle_endtag(self, tag):
|
|
if tag in ["script","style"]: self.omit=False
|
|
def handle_startendtag(self, tag, attrs):
|
|
self.handle_starttag(tag,attrs)
|
|
self.handle_endtag(tag)
|
|
def unescape(self,attr): return attr # as we don't use attrs above, no point trying to unescape them and possibly falling over if something's malformed
|
|
def handle_charref(self,ref):
|
|
if ref.startswith('x'): self.handle_data(unichr(int(ref[1:],16)).encode('utf-8').decode('latin1'))
|
|
else: self.handle_data(unichr(int(ref)).encode('utf-8').decode('latin1'))
|
|
def handle_entityref(self, ref):
|
|
if ref in htmlentitydefs.name2codepoint:
|
|
self.handle_data(unichr(htmlentitydefs.name2codepoint[ref]).encode('utf-8').decode('latin1'))
|
|
else: self.handle_data(('&'+ref+';'))
|
|
def text(self): return u''.join(self.theTxt).strip()
|
|
def htmlStrings(html):
|
|
parser = HTMLStrings()
|
|
try:
|
|
parser.feed(UL(html)) ; parser.close()
|
|
if type(html)==type(u""): return parser.text(), ""
|
|
else: return parser.text().encode("latin1"), ""
|
|
except: return html, "\n- problem extracting strings from HTML at line %d offset %d\n%s" % (parser.getpos()+(traceback.format_exc(),)) # returning html might still work for 'was that text still there' queries; error message is displayed only if it doesn't
|
|
|
|
def main():
|
|
|
|
# 1 job per domain:
|
|
global jobs ; jobs = Queue.Queue()
|
|
for v in read_input().values(): jobs.put(v)
|
|
|
|
global previous_timestamps
|
|
try: previous_timestamps = pickle.Unpickler(open(".webcheck-last","rb")).load()
|
|
except: previous_timestamps = {}
|
|
old_previous_timestamps = previous_timestamps.copy()
|
|
|
|
for i in xrange(1,max_threads):
|
|
if jobs.empty(): break # enough are going already
|
|
thread.start_new_thread(worker_thread,())
|
|
worker_thread() ; jobs.join()
|
|
|
|
if previous_timestamps == old_previous_timestamps: return # no point saving if no changes
|
|
try: pickle.Pickler(open(".webcheck-last","wb")).dump(previous_timestamps)
|
|
except: sys.stdout.write("Problem writing .webcheck-last (progress was NOT saved):\n"+traceback.format_exc()+"\n")
|
|
|
|
def default_opener():
|
|
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: opener = build_opener(HTTPCookieProcessor(),HTTPSHandler(context=ssl._create_unverified_context())) # HTTPCookieProcessor needed for some redirects
|
|
else: opener = build_opener(HTTPCookieProcessor())
|
|
opener.addheaders = [('User-agent', default_ua),
|
|
('Accept-Encoding', 'gzip')]
|
|
return opener
|
|
|
|
default_ua = 'Mozilla/5.0 or whatever you like (actually Webcheck)'
|
|
# you can override this on a per-site basis with "User-Agent: whatever"
|
|
# and undo again with "User-Agent:" on a line by itself.
|
|
# Please override sparingly or with webmaster permission.
|
|
# Let's not even mention it in the readme: we don't want to encourage
|
|
# people to hide their tools from webmasters unnecessarily.
|
|
|
|
class Delayer:
|
|
def __init__(self): self.last_fetch_finished = 0
|
|
def wait(self):
|
|
time.sleep(max(0,self.last_fetch_finished+delay-time.time()))
|
|
if sys.stderr.isatty(): sys.stderr.write('.'),sys.stderr.flush()
|
|
def done(self): self.last_fetch_finished = time.time()
|
|
|
|
def worker_thread(*args):
|
|
opener = [None]
|
|
while True:
|
|
try: job = jobs.get(False)
|
|
except: return # no more jobs left
|
|
try:
|
|
delayer = Delayer()
|
|
items = sorted(job.items()) # sorted will group http and https together
|
|
items.reverse()
|
|
while items:
|
|
url,checklist = items.pop()
|
|
if '\n' in url:
|
|
url = url.split('\n')
|
|
extraHeaders = url[1:] ; url = url[0]
|
|
else: extraHeaders = []
|
|
if (url,'lastFetch') in previous_timestamps and not '--test-all' in sys.argv: # (--test-all is different from removing .webcheck.last because it shouldn't also re-output old items in RSS feeds)
|
|
minDays = min(d[0] for d in checklist)
|
|
if minDays and previous_timestamps[(url,'lastFetch')]+minDays >= dayNo(): continue
|
|
previous_timestamps[(url,'lastFetch')] = dayNo() # (keep it even if minDays==0, because that might be changed by later edits of webcheck.list)
|
|
r = doJob(opener,delayer,url,checklist,extraHeaders)
|
|
if r: # elseLogic yielded more items for this job (don't give to another thread, we need the same delayer as it might be retry on same URL)
|
|
r.reverse() ; items += r # try to keep pop() sequence in order
|
|
except Exception as e:
|
|
print ("Unhandled exception processing job "+repr(job))
|
|
print (traceback.format_exc())
|
|
jobs.task_done()
|
|
|
|
def doJob(opener,delayer,url,checklist,extraHeaders):
|
|
failRet = [c[2] for c in checklist if c[2]]
|
|
delayer.wait()
|
|
if url.startswith("dns://"): # DNS lookup
|
|
try: u,content = None, B(' '.join(sorted(set('('+x[-1][0]+')' for x in socket.getaddrinfo(url[6:],1))))) # TODO this 'sorted' is lexicographical not numeric; it should be OK for most simple cases though (keeping things in a defined order so can check 2 or 3 IPs on same line if the numbers are consecutive and hold same number of digits). Might be better if parse and numeric sort
|
|
except: u,content=None,B("DNS lookup failed")
|
|
textContent = content
|
|
elif url.startswith("wd://"): # run webdriver (this type of url is set internally: see read_input)
|
|
ua = [e for e in extraHeaders if e.lower().startswith('user-agent:')]
|
|
if ua: ua=ua[0].split(':',1)[1].strip()
|
|
else: ua = default_ua
|
|
u,(content,wasError) = None, run_webdriver(ua,url[5:].split(chr(0)),not failRet)
|
|
if wasError: return failRet
|
|
textContent = None # parse 'content' if needed
|
|
url = url[5:].split(chr(0),1)[0] # for display
|
|
elif url.startswith("up://"): # just test if server is up, and no error if not
|
|
try:
|
|
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: urlopen(url[5:],context=ssl._create_unverified_context(),timeout=60)
|
|
else: urlopen(url[5:],timeout=60)
|
|
u,content = None,B("yes")
|
|
except: u,content = None,B("no")
|
|
textContent = content
|
|
elif url.startswith("e://"): # run edbrowse
|
|
from subprocess import Popen,PIPE
|
|
edEnv=os.environ.copy() ; edEnv["TMPDIR"]=getoutput("(TMPDIR=/dev/shm mktemp -d -t edXXXXXX || mktemp -d -t edXXXXXX) 2>/dev/null") # ensure unique cache dir if we're running several threads (TODO: what about edbrowse 3.7.6 and below, which hard-codes a single cache dir in /tmp: had we better ensure only one of these is run at a time, just in case? 3.7.7+ honours TMPDIR)
|
|
try: child = Popen(["edbrowse","-e"],-1,stdin=PIPE,stdout=PIPE,stderr=PIPE,env=edEnv)
|
|
except OSError:
|
|
print ("webcheck misconfigured: couldn't run edbrowse")
|
|
return # no need to update delayer, and probably no need to return failRet if it's an edbrowse misconfiguration
|
|
u,(content,stderr) = None,child.communicate(B("b "+url[4:].replace('\\','\n')+"\n,p\nqt\n")) # but this isn't really the page source (asking edbrowse for page source would be equivalent to fetching it ourselves; it doesn't tell us the DOM)
|
|
try:
|
|
import shutil
|
|
shutil.rmtree(edEnv["TMPDIR"])
|
|
except: pass
|
|
if child.returncode:
|
|
if not failRet:
|
|
print ("edbrowse failed on "+url)
|
|
# Most likely the failure was some link didn't exist when it should have, so show the output for debugging
|
|
print ("edbrowse output was: "+repr(content)+"\n")
|
|
delayer.done() ; return failRet
|
|
textContent = content.replace(B('{'),B(' ')).replace(B('}'),B(' ')) # edbrowse uses {...} to denote links
|
|
url = url[4:].split('\\',1)[0] # for display
|
|
elif url.startswith("c://"): # run command
|
|
content = getoutput(url[len("c://"):])
|
|
u = textContent = None
|
|
elif url.startswith("blocks-lynx://"):
|
|
r=Request(url[len("blocks-lynx://"):])
|
|
r.get_method=lambda:'HEAD'
|
|
r.add_header('User-agent','Lynx/2.8.9dev.4 libwww-FM/2.14')
|
|
u,content = None,B("no") # not blocking Lynx?
|
|
try: urlopen(r,timeout=60)
|
|
except Exception as e:
|
|
if type(e) in [HTTPError,socket.error,socket.timeout,ssl.SSLError]: # MIGHT be blocking Lynx (SSLError can be raised if hit the timeout), check:
|
|
r.add_header('User-agent',default_ua)
|
|
try:
|
|
urlopen(r,timeout=60)
|
|
content = B("yes") # error ONLY with Lynx, not with default UA
|
|
except Exception as e: pass # error with default UA as well, so don't flag this one as a Lynx-test failure
|
|
else:
|
|
print ("Info: "+url+" got "+str(type(e))+" (check the server exists at all?)")
|
|
try: print (e.message)
|
|
except: pass
|
|
textContent = content
|
|
elif url.startswith("head://"):
|
|
r=Request(url[len("head://"):])
|
|
r.get_method=lambda:'HEAD'
|
|
for h in extraHeaders: r.add_header(*tuple(x.strip() for x in h.split(':',1)))
|
|
if not any(h.lower().startswith("user-agent:") for h in extraHeaders): r.add_header('User-agent',default_ua)
|
|
u=None
|
|
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: content=textContent=B(str(urlopen(r,context=ssl._create_unverified_context(),timeout=60).info()))
|
|
else: content=textContent=B(str(urlopen(r,timeout=60).info()))
|
|
elif url.startswith("gemini://"):
|
|
u = None
|
|
content,textContent = get_gemini(url)
|
|
else: # normal URL
|
|
if opener[0]==None: opener[0] = default_opener()
|
|
u,content = tryRead(url,opener[0],extraHeaders,all(t[1] and not t[1].startswith('#') for t in checklist)) # don't monitorError for RSS feeds (don't try to RSS-parse an error message)
|
|
textContent = None
|
|
delayer.done()
|
|
if content==None: return # not modified (so nothing to report), or problem retrieving (which will have been reported by tryRead0: TODO: return failRet in these circumstances so elseLogic can proceed?)
|
|
if u:
|
|
lm = u.info().get("Last-Modified",None)
|
|
if lm: previous_timestamps[(url,'lastMod')] = lm
|
|
if keep_etags:
|
|
e = u.info().get("ETag",None)
|
|
if e: previous_timestamps[(url,'ETag')] = e
|
|
toRet = []
|
|
for item in checklist:
|
|
t = item[1]
|
|
if t.startswith('>'):
|
|
out=check(t[1:],content,"Source of "+url,"")
|
|
elif not t or t.startswith('#'):
|
|
parseRSS(url,content,t.replace('#','',1).strip())
|
|
out = None
|
|
else:
|
|
if textContent==None:
|
|
textContent,errmsg=htmlStrings(content)
|
|
else: errmsg = ""
|
|
out=check(t,textContent,url,errmsg)
|
|
if out:
|
|
if item[2]: toRet.append(item[2])
|
|
else: sys.stdout.write(out) # don't use 'print' or may have problems with threads
|
|
return toRet
|
|
|
|
class NoTracebackException(Exception):
|
|
def __init__(self,message): self.message = message
|
|
def run_webdriver(ua,actionList,reportErrors):
|
|
global webdriver # so run_webdriver_inner has it
|
|
try: from selenium import webdriver
|
|
except:
|
|
print ("webcheck misconfigured: can't import selenium (did you forget to set PYTHONPATH?)")
|
|
return B(""), True
|
|
try:
|
|
from selenium.webdriver.chrome.options import Options
|
|
opts = Options()
|
|
opts.add_argument("--headless")
|
|
opts.add_argument("--disable-gpu")
|
|
opts.add_argument("--user-agent="+ua)
|
|
try: from inspect import getfullargspec as getargspec # Python 3
|
|
except ImportError:
|
|
try: from inspect import getargspec # Python 2
|
|
except ImportError: getargspec = None
|
|
try: useOptions = 'options' in getargspec(webdriver.chrome.webdriver.WebDriver.__init__).args
|
|
except: useOptions = False
|
|
if useOptions: browser = webdriver.Chrome(options=opts)
|
|
else: browser = webdriver.Chrome(chrome_options=opts)
|
|
except Exception as eChrome: # probably no HeadlessChrome, try PhantomJS
|
|
os.environ["QT_QPA_PLATFORM"]="offscreen"
|
|
sa = ['--ssl-protocol=any']
|
|
if not verify_SSL_certificates: sa.append('--ignore-ssl-errors=true')
|
|
try: browser = webdriver.PhantomJS(service_args=sa,service_log_path=os.path.devnull)
|
|
except Exception as jChrome:
|
|
print ("webcheck misconfigured: can't create either HeadlessChrome (%s) or PhantomJS (%s). Check installation. (PATH=%s, cwd=%s, webdriver version %s)" % (str(eChrome),str(jChrome),repr(os.environ.get("PATH","")),repr(os.getcwd()),repr(webdriver.__version__)))
|
|
return B(""), True
|
|
r = "" ; wasError = False
|
|
try: r = run_webdriver_inner(actionList,browser)
|
|
except NoTracebackException as e:
|
|
if reportErrors: print (e.message)
|
|
else: wasError = True
|
|
except:
|
|
if reportErrors: print (traceback.format_exc())
|
|
else: wasError = True
|
|
browser.quit()
|
|
return r,wasError
|
|
|
|
def run_webdriver_inner(actionList,browser):
|
|
browser.set_window_size(1024, 768)
|
|
browser.implicitly_wait(2) # we have our own 'wait for text' and delay values, so the implicit wait does not have to be too high
|
|
def findElem(spec):
|
|
if spec.startswith('#'):
|
|
try: return browser.find_element_by_id(spec[1:])
|
|
except: return browser.find_element_by_name(spec[1:])
|
|
elif spec.startswith('.'):
|
|
if '#' in spec: return browser.find_elements_by_class_name(spec[1:spec.index('#')])[int(spec.split('#')[1])-1] # .class#1, .class#2 etc to choose the Nth element of that class
|
|
else: return browser.find_element_by_class_name(spec[1:])
|
|
else: return browser.find_element_by_link_text(spec)
|
|
def getSrc():
|
|
def f(b,switchBack=[]):
|
|
try: src = b.find_element_by_xpath("//*").get_attribute("outerHTML")
|
|
except: return u"getSrc webdriver exception but can retry" # can get timing-related WebDriverException: Message: Error - Unable to load Atom 'find_element'
|
|
for el in ['frame','iframe']:
|
|
for frame in b.find_elements_by_tag_name(el):
|
|
try: b.switch_to.frame(frame)
|
|
except: # StaleElementReferenceException is possible for some reason
|
|
src += "(Unable to switch to frame "+str(frame)+") "
|
|
continue
|
|
src += f(b,switchBack+[frame])
|
|
b.switch_to.default_content()
|
|
for fr in switchBack: b.switch_to.frame(fr)
|
|
return src
|
|
return f(browser).encode('utf-8')
|
|
snippets = []
|
|
for a in actionList:
|
|
if a.startswith('http'): browser.get(a)
|
|
elif a.startswith('"') and a.endswith('"'):
|
|
# wait for "string" to appear in the source
|
|
tries = 30
|
|
while tries and not myFind(a[1:-1],getSrc()):
|
|
time.sleep(delay) ; tries -= 1
|
|
if not tries:
|
|
try: current_url = browser.current_url
|
|
except: current_url = "(unable to obtain)"
|
|
raise NoTracebackException("webdriver timeout while waiting for %s, current URL is %s content \"%s\"\n" % (repr(a[1:-1]),current_url,repr(getSrc()))) # don't quote current URL: if the resulting email is viewed in (at least some versions of) MHonArc, a bug can result in " being added to the href
|
|
elif a.startswith('[') and a.endswith(']'): # click
|
|
findElem(a[1:-1]).click()
|
|
elif a.startswith('/') and '/' in a[1:]: # click through items in a list to reveal each one (assume w/out Back)
|
|
start = a[1:a.rindex('/')]
|
|
delayAfter = a[a.rindex('/')+1:]
|
|
curNo,startNo,endNo = 0,1,0
|
|
propagate_errors = False
|
|
if ':' in delayAfter:
|
|
delayAfter,rest = delayAfter.split(':')
|
|
if rest.endswith('!'):
|
|
propagate_errors = True
|
|
rest = rest[:-1]
|
|
if '-' in rest:
|
|
startNo,endNo = rest.split('-')
|
|
startNo,endNo = int(startNo),int(endNo)
|
|
else: assert 0, "don't know how to parse "+rest
|
|
try: delayAfter = int(delayAfter)
|
|
except: delayAfter = 1
|
|
if start.startswith('.'):
|
|
startClass = start[1:]
|
|
if '.' in startClass: startClass,closeClass = startClass.split('.')
|
|
else: closeClass = None
|
|
if startNo>1 and sys.stderr.isatty(): sys.stderr.write('(skip %d)' % (startNo-1)),sys.stderr.flush()
|
|
for m in browser.find_elements_by_class_name(startClass):
|
|
curNo += 1
|
|
if curNo < startNo: continue
|
|
if endNo and curNo > endNo: break
|
|
try:
|
|
m.click()
|
|
if sys.stderr.isatty(): sys.stderr.write('*'),sys.stderr.flush()
|
|
except:
|
|
if sys.stderr.isatty(): sys.stderr.write('?'),sys.stderr.flush()
|
|
if propagate_errors: raise NoTracebackException(a+" failed to open instance "+str(curNo))
|
|
else: continue
|
|
time.sleep(delayAfter)
|
|
snippets.append(getSrc())
|
|
if closeClass:
|
|
l = list(browser.find_elements_by_class_name(closeClass))
|
|
for c in l:
|
|
try:
|
|
c.click()
|
|
if sys.stderr.isatty(): sys.stderr.write('x'),sys.stderr.flush()
|
|
if not browser.find_elements_by_class_name(closeClass)==l: break # it did something
|
|
except: pass # maybe it wasn't that one
|
|
time.sleep(delayAfter)
|
|
else:
|
|
l = re.findall(B(' [iI][dD] *="('+re.escape(start)+'[^"]*)'),getSrc()) + re.findall(B(' [iI][dD] *=('+re.escape(start)+'[^"> ]*)'),getSrc())
|
|
for m in l:
|
|
curNo += 1
|
|
if curNo < startNo: continue
|
|
if endNo and curNo > endNo: break
|
|
try:
|
|
browser.find_element_by_id(m).click()
|
|
if sys.stderr.isatty(): sys.stderr.write('*'),sys.stderr.flush() # webdriver's '.' for click-multiple
|
|
except:
|
|
if sys.stderr.isatty(): sys.stderr.write('?'),sys.stderr.flush()
|
|
if propagate_errors: raise NoTracebackException(a+" failed to open instance "+str(curNo))
|
|
else: continue
|
|
time.sleep(delayAfter)
|
|
snippets.append(getSrc())
|
|
elif '->' in a: # set a selection box
|
|
spec, val = a.split('->',1)
|
|
e = webdriver.support.ui.Select(findElem(spec))
|
|
if val.startswith('"') and val.endswith('"'): val=val[1:-1]
|
|
if val: e.select_by_visible_text(val)
|
|
else: e.deselect_all()
|
|
elif a.endswith('*0'): # clear a checkbox
|
|
e = findElem(a[:-2])
|
|
if e.is_selected(): e.click()
|
|
elif a.endswith('*1'): # check a checkbox
|
|
e = findElem(a[:-2])
|
|
if not e.is_selected(): e.click()
|
|
elif '=' in a: # put text in an input box
|
|
spec, val = a.split('=',1)
|
|
if val.startswith('"') and val.endswith('"'): val=val[1:-1]
|
|
findElem(spec).send_keys(val)
|
|
elif re.match("[0-9]+$",a): time.sleep(int(a))
|
|
else: sys.stdout.write("Ignoring webdriver unknown action "+repr(a)+'\n')
|
|
if sys.stderr.isatty(): sys.stderr.write(':'),sys.stderr.flush() # webdriver's '.'
|
|
time.sleep(delay)
|
|
snippets.append(getSrc())
|
|
return B('\n').join(snippets)
|
|
|
|
def get_gemini(url,nestLevel=0):
|
|
if nestLevel > 9: return B("Too many redirects"),B("Too many redirects")
|
|
url = B(url)
|
|
host0 = host = re.match(B("gemini://([^/?#]*)"),url).groups(1)[0]
|
|
port = re.match(B(".*:([0-9]+)$"),host)
|
|
if port:
|
|
port = int(port.groups(1)[0])
|
|
host = host[:host.rindex(B(":"))]
|
|
else: port = 1965
|
|
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
|
|
s.settimeout(60) ; s=ssl.wrap_socket(s)
|
|
s.connect((host,port)) ; s.send(url+B("\r\n"))
|
|
g=[]
|
|
while not g or g[-1]: g.append(s.recv())
|
|
s.close() ; g=B("").join(g)
|
|
if B("\r\n") in g:
|
|
header,body = g.split(B("\r\n"),1)
|
|
else: header,body = g,B("")
|
|
if B(" ") in header: status,meta = header.split(B(" "),1)
|
|
else: status,meta = B("?"),header
|
|
try: status = int(status)
|
|
except: status = 0
|
|
if 20 <= status <= 29:
|
|
if meta.startswith(B("text/gemini")):
|
|
txtonly = re.sub(B("\n *=> +[^ ]*"),B("\n"),body)
|
|
elif B("html") in meta: txtonly = None # will result in htmlStrings
|
|
else: txtonly = body
|
|
return body,txtonly
|
|
elif 30 <= status <= 39:
|
|
if meta.startswith(B("gemini://")):
|
|
return get_gemini(meta,nestLevel+1)
|
|
elif meta.startswith(B("/")):
|
|
return get_gemini(B("gemini://")+host0+meta,nestLevel+1)
|
|
else: return get_gemini(url[:url.rindex(B("/"))+1]+meta,nestLevel+1) # TODO: handle ../ ourselves? or let server do it? (early protocol specification and practice unclear)
|
|
else: return meta,meta # input prompt, error message, or certificate required
|
|
|
|
def dayNo(): return int(time.mktime(time.localtime()[:3]+(0,)*6))/(3600*24)
|
|
|
|
def tryRead(url,opener,extraHeaders,monitorError=True,refreshTry=5):
|
|
oldAddHeaders = opener.addheaders[:]
|
|
for h in extraHeaders:
|
|
if h.lower().startswith("user-agent") and opener.addheaders[0][0]=="User-agent": del opener.addheaders[0] # User-agent override (will be restored after by oldAddHeaders) (TODO: override in run_webdriver also)
|
|
opener.addheaders.append(tuple(x.strip() for x in h.split(':',1)))
|
|
if (url,'lastMod') in previous_timestamps and not '--test-all' in sys.argv:
|
|
opener.addheaders.append(("If-Modified-Since",previous_timestamps[(url,'lastMod')]))
|
|
if keep_etags and (url,'ETag') in previous_timestamps and not '--test-all' in sys.argv:
|
|
opener.addheaders.append(("If-None-Match",previous_timestamps[(url,'lastMod')]))
|
|
ret = tryRead0(url,opener,monitorError)
|
|
opener.addheaders = oldAddHeaders
|
|
if refreshTry: # meta refresh redirects
|
|
u,content = ret
|
|
if content: m = re.search(br'(?is)<head>.*?<meta http-equiv="refresh" content="0; *url=([^"]*)".*?>.*?</head>',content) # TODO: if string found, remove comments and re-check (or even parse properly) ?
|
|
else: m = None # content==None if 304 not modified
|
|
if m:
|
|
m = m.groups(1)[0]
|
|
if type(u"")==type(""): m=m.decode('latin1')
|
|
return tryRead(urlparse.urljoin(url,m),opener,extraHeaders,monitorError,refreshTry-1)
|
|
return ret
|
|
|
|
def tryRead0(url,opener,monitorError):
|
|
url = re.sub("[^!-~]+",lambda m:quote(m.group()),url) # it seems some versions of the library do this automatically but others don't
|
|
u = None
|
|
try:
|
|
u = opener.open(url,timeout=60)
|
|
return u,tryGzip(u.read())
|
|
except HTTPError as e:
|
|
if e.code==304: return None,None # not modified
|
|
elif monitorError: return None,tryGzip(e.fp.read()) # as might want to monitor some phrase on a 404 page
|
|
sys.stdout.write("Error "+str(e.code)+" retrieving "+linkify(url)+"\n") ; return None,None
|
|
except: # try it with a fresh opener and no headers
|
|
try:
|
|
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: u = build_opener(OurRedirHandler(),HTTPCookieProcessor(),HTTPSHandler(context=ssl._create_unverified_context())).open(url,timeout=60)
|
|
else: u = build_opener(OurRedirHandler(),HTTPCookieProcessor()).open(url,timeout=60)
|
|
return u,tryGzip(u.read())
|
|
except HTTPError as e:
|
|
if monitorError: return u,tryGzip(e.fp.read())
|
|
sys.stdout.write("Error "+str(e.code)+" retrieving "+linkify(url)+"\n") ; return None,None
|
|
except URLError as e: # don't need full traceback for URLError, just the message itself
|
|
sys.stdout.write("Problem retrieving "+linkify(url)+"\n"+str(e)+"\n")
|
|
return None,None
|
|
except socket.timeout:
|
|
sys.stdout.write("Timed out retrieving "+linkify(url)+"\n")
|
|
return None,None
|
|
except: # full traceback by default
|
|
sys.stdout.write("Problem retrieving "+linkify(url)+"\n"+traceback.format_exc())
|
|
return None,None
|
|
class OurRedirHandler(HTTPErrorProcessor):
|
|
def __init__(self,nestLevel=0): self.nestLevel = nestLevel
|
|
def our_response(self,request,response,prefix):
|
|
try: code=response.code
|
|
except: return response
|
|
if code not in [301,302,303,307]: return response
|
|
url = re.sub("[^!-~]+",lambda m:quote(m.group()),response.headers['Location']) # not all versions of the library do this, so we'll do it here if simple-open failed
|
|
if self.nestLevel>9: raise Exception("too many redirects")
|
|
if url.startswith("//"): url=prefix+url
|
|
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: return build_opener(OurRedirHandler(self.nestLevel+1),HTTPCookieProcessor(),HTTPSHandler(context=ssl._create_unverified_context())).open(url,timeout=60)
|
|
else: return build_opener(OurRedirHandler(self.nestLevel+1),HTTPCookieProcessor()).open(url,timeout=60)
|
|
def http_response(self,request,response):
|
|
return self.our_response(request,response,"http:")
|
|
def https_response(self,request,response):
|
|
return self.our_response(request,response,"https:")
|
|
|
|
def tryGzip(t):
|
|
try: return gzip.GzipFile('','rb',9,StringIO(t)).read()
|
|
except: return t
|
|
|
|
def check(text,content,url,errmsg):
|
|
if ' #' in text: text,comment = text.split(' #',1) # (comments must be preceded by a space, otherwise interpreted as part of the text as this is sometimes needed in codes)
|
|
else: comment = ""
|
|
orig_comment = comment = comment.strip()
|
|
if comment: comment="\n "+paren(comment)
|
|
text = text.strip()
|
|
assert text # or should have gone to parseRSS instead
|
|
if text.startswith('{') and text.endswith('}') and '...' in text: extract(url,content,text[1:-1].split('...'),orig_comment)
|
|
elif text.startswith("!"): # 'not', so alert if DOES contain
|
|
if len(text)==1: return # TODO: print error?
|
|
if myFind(text[1:],content):
|
|
return url+" contains "+text[1:]+comment+errmsg+"\n"
|
|
elif not myFind(text,content): # alert if DOESN'T contain
|
|
r=linkify(url)+" no longer contains "+text+comment+errmsg+"\n"
|
|
if '??show?' in orig_comment: getBuf(sys.stdout).write(B("Debug: contents of "+linkify(url)+" is:\n")+content+B('\n')) # TODO: document this
|
|
return r
|
|
|
|
def parseRSS(url,content,comment):
|
|
from xml.parsers import expat
|
|
parser = expat.ParserCreate()
|
|
items = [[[],[],[],[]]] ; curElem = [None]
|
|
def StartElementHandler(name,attrs):
|
|
if name in ['item','entry']: items.append([[],[],[],[]])
|
|
if name=='title': curElem[0]=0
|
|
elif name=='link': curElem[0]=1
|
|
elif name in ['description','summary']: curElem[0]=2
|
|
elif name=='pubDate': curElem[0]=3
|
|
else: curElem[0]=None
|
|
if name=='link' and 'href' in attrs: # (note this isn't the ONLY way an href could get in: <link>http...</link> is also possible, and is handled by CharacterDataHandler below, hence EndElementHandler is important for separating links)
|
|
items[-1][curElem[0]].append(attrs['href']+' ')
|
|
def EndElementHandler(name):
|
|
if name in ['item','entry']: # ensure any <link>s outside <item>s are separated
|
|
items.append([[],[],[],[]])
|
|
curElem[0]=None
|
|
elif name in ['description','summary','title','link']:
|
|
if not curElem[0]==None: items[-1][curElem[0]].append(' ') # ensure any additional ones are space-separated
|
|
curElem[0]=None
|
|
def CharacterDataHandler(data):
|
|
if data and not curElem[0]==None:
|
|
items[-1][curElem[0]].append(data)
|
|
parser.StartElementHandler = StartElementHandler
|
|
parser.EndElementHandler = EndElementHandler
|
|
parser.CharacterDataHandler = CharacterDataHandler
|
|
if type(u"")==type(""): content = content.decode("utf-8") # Python 3 (expat needs 'strings' on each platform)
|
|
try: parser.Parse(re.sub("&[A-Za-z]*;",entityref,content),1)
|
|
except expat.error as e: sys.stdout.write("RSS parse error in "+url+paren(comment)+":\n"+repr(e)+"\n(You might want to check if this URL is still serving RSS)\n\n") # and continue with handleRSS ? (it won't erase our existing items if the new list is empty, as it will be in the case of the parse error having been caused by a temporary server error)
|
|
for i in xrange(len(items)):
|
|
items[i][1] = "".join(urlparse.urljoin(url,w) for w in "".join(items[i][1]).strip().split()).strip() # handle links relative to the RSS itself
|
|
for j in [0,2,3]: items[i][j]=re.sub(r"\s+"," ",u"".join(U(x) for x in items[i][j])).strip()
|
|
handleRSS(url,items,comment)
|
|
def entityref(m):
|
|
m=m.group()[1:-1] ; m2 = None
|
|
try: m2=unichr(htmlentitydefs.name2codepoint[m])
|
|
except:
|
|
try:
|
|
if m.startswith("#x"): m2=unichr(int(m[2:],16))
|
|
elif m.startswith("#"): m2=unichr(int(m[1:]))
|
|
except: pass
|
|
if m2 and not m2 in "<>&":
|
|
if type(u"")==type(""): return m2
|
|
else: return m2.encode('utf-8')
|
|
return "&"+m+";"
|
|
def paren(comment):
|
|
comment = " ".join(comment.replace("??track-links-only?","").split())
|
|
if not comment or (comment.startswith('(') and comment.endswith(')')): return comment
|
|
else: return " ("+comment+")"
|
|
def handleRSS(url,items,comment,itemType="RSS/Atom"):
|
|
newItems = [] ; pKeep = set()
|
|
for title,link,txt,date in items:
|
|
if not title: continue # valid entry must have title
|
|
if "??track-links-only?" in comment: hashTitle,hashTxt = date,"" # TODO: document this, it's for when text might change because for example we're fetching it through an add-annotation CGI that can change, but don't ignore if the publication date has changed due to an update (TODO: might be better to do this via a 'pipe to postprocessing' option instead?)
|
|
else: hashTitle,hashTxt = title,re.sub("</?[A-Za-z][^>]*>","",txt) # (ignore HTML markup in RSS, since it sometimes includes things like renumbered IDs)
|
|
k = (url,'seenItem',hashlib.md5(repr((hashTitle,link,hashTxt)).encode("utf-8")).digest()) # TODO: option not to call hashlib, in case someone has the space and is concerned about the small probability of hash collisions? (The Python2-only version of webcheck just used Python's built-in hash(), but in Python 3 that is no longer stable across sessions, so use md5)
|
|
pKeep.add(k)
|
|
if k in previous_timestamps and not '--show-seen-rss' in sys.argv: continue # seen this one already
|
|
previous_timestamps[k] = True
|
|
txt = re.sub("&#x([0-9A-Fa-f]*);",lambda m:unichr(int(m.group(1),16)),re.sub("&#([0-9]*);",lambda m:unichr(int(m.group(1))),txt)) # decode &#..; HTML entities (sometimes used for CJK), but leave < etc as-is (in RSS it would have originated with a double-'escaped' < within 'escaped' html markup)
|
|
txt = re.sub("</?[A-Za-z][^>]*>",simplifyTag,txt) # avoid overly-verbose HTML (but still allow some)
|
|
txt = re.sub("<[pPbBiIuUsS]></[pPbBiIuUsS]>","",txt).strip() # sometimes left after simplifyTag removes img
|
|
if txt: txt += '\n'
|
|
newItems.append(title+'\n'+txt+linkify(link))
|
|
if not pKeep: return # if the feed completely failed to fetch, don't erase what we have
|
|
for k in list(previous_timestamps.keys()):
|
|
if k[:2]==(url,'seenItem') and not k in pKeep:
|
|
del previous_timestamps[k] # dropped from the feed
|
|
if newItems: getBuf(sys.stdout).write((str(len(newItems))+" new "+itemType+" items in "+url+paren(comment)+' :\n'+'\n---\n'.join(n.strip() for n in newItems)+'\n\n').encode('utf-8'))
|
|
def simplifyAttr(match):
|
|
m = match.group()
|
|
if m.lower().startswith(" href="): return m
|
|
else: return ""
|
|
def simplifyTag(match):
|
|
m = match.group()
|
|
t = m.split()[0].replace('<','').replace('>','').replace('/','')
|
|
if t=='a': return re.sub(' [A-Za-z]+="[^"]*"',simplifyAttr,m)
|
|
elif t in ['p','br','em','strong','b','i','u','s']:
|
|
if ' ' in m: return m.split()[0]+'>' # strip attributes
|
|
else: return m
|
|
else: return "" # strip entire tag
|
|
def linkify(link): return link.replace("(","%28").replace(")","%29") # for email clients etc that terminate URLs at parens
|
|
|
|
def extract(url,content,startEndMarkers,comment):
|
|
assert len(startEndMarkers)==2, "Should have exactly one '...' between the braces when extracting items"
|
|
start,end = startEndMarkers
|
|
content,start,end = B(content),B(start),B(end)
|
|
i=0 ; items = []
|
|
while True:
|
|
i = content.find(start,i)
|
|
if i==-1: break
|
|
j = content.find(end,i+len(start))
|
|
if j==-1: break
|
|
c = content[i+len(start):j].decode('utf-8').strip()
|
|
if c: items.append(('Auto-extracted text:','',c,"")) # NB the 'title' field must not be empty (unless we relocate that logic to parseRSS instead of handleRSS)
|
|
i = j+len(end)
|
|
if not items: print ("No items were extracted from "+url+" via "+S(start)+"..."+S(end)+" (check that site changes haven't invalidated this extraction rule)")
|
|
handleRSS(url,items,comment,"extracted")
|
|
|
|
def myFind(text,content):
|
|
text,content = B(text),B(content)
|
|
if text[:1]==B("*"): return re.search(text[1:],content)
|
|
elif text in content: return True
|
|
return normalisePunc(text) in normalisePunc(content)
|
|
def normalisePunc(t):
|
|
"normalise apostrophes; collapse (but don't ignore) whitespace and ignore double-quotes because they might have been <Q> elements; fold case"
|
|
for s,r in [
|
|
(u"\u2013".encode('utf-8'),B("-")), # en-dash
|
|
(u"\u2019".encode('utf-8'),B("'")),
|
|
(u"\u2018".encode('utf-8'),B("'")),
|
|
(u"\u201C".encode('utf-8'),B("")),
|
|
(u"\u201D".encode('utf-8'),B("")),
|
|
(B('"'),B("")),
|
|
(u"\u00A0".encode('utf-8'),B(" ")),
|
|
(u"\uFEFF".encode('utf-8'),B("")),
|
|
(u"\u200B".encode('utf-8'),B(""))
|
|
]: t=t.replace(s,r)
|
|
return re.sub(B(r"(\s)\s+"),B(r"\1"),t).lower()
|
|
|
|
if __name__=="__main__": main()
|