More actions
Created page with "<syntaxhighlight lang="python"> #!/usr/bin/python3 import argparse import json import sys import redis a = argparse.ArgumentParser() a.add_argument('-s', dest='subscribe', action='append') a.add_argument('-p', dest='psubscribe', action='append') a.add_argument('-r', dest='raw', action='store_true') a.add_argument('-n', dest='nochannel', action='store_true') a.add_argument('-f', dest='prettyprint', action='store_true') a.add_argument('-c', dest='contains', action='store..." |
m Derg moved page Slisten.py to slisten.py without leaving a redirect: Misspelled title |
(No difference)
|
Latest revision as of 13:49, 12 September 2024
#!/usr/bin/python3
import argparse
import json
import sys
import redis
a = argparse.ArgumentParser()
a.add_argument('-s', dest='subscribe', action='append')
a.add_argument('-p', dest='psubscribe', action='append')
a.add_argument('-r', dest='raw', action='store_true')
a.add_argument('-n', dest='nochannel', action='store_true')
a.add_argument('-f', dest='prettyprint', action='store_true')
a.add_argument('-c', dest='contains', action='store')
a.add_argument('-j', dest='jsoncolor', action='store_true')
d = a.parse_args()
def highlight(byt):
table = {0:'000', 1:'001', 2:'002', 3:'003', 4:'004', 5:'005', 6:'006', 7:'a', 8:'b', 9:'t', 10:'n', 11:'v', 12:'f', 13:'r'}
i = 0
out = 'b\''
pb = False
for a in byt:
if a >= 32 and a <= 126:
if pb:
out += '\033[39m'
pb = False
if a == 39: # '
out += '\\\''
elif a == 92: # \
out += '\\\\'
else:
out += chr(a)
else:
i += 1
pb = True
out += '\033[9%sm' % ((i % 2) + 1)
t = table.get(a)
if t:
out += '\\%s' % t
else:
out += '\\x%02x' % a
if pb:
out += '\033[39m'
out += "'"
return out
if d.jsoncolor:
from pygments.formatters.terminal256 import Terminal256Formatter
from pygson.json_lexer import JSONLexer
import pygments
l = JSONLexer()
f = Terminal256Formatter()
if not d.subscribe and not d.psubscribe:
exit('Either -s or -p is required')
r = redis.Redis()
p = r.pubsub()
for a in d.subscribe or []: p.subscribe(a)
for a in d.psubscribe or []: p.psubscribe(a)
try:
for a in p.listen():
if a['type'] not in ('message', 'pmessage'): continue
data = a['data']
isjson = False
if d.contains and d.contains not in data: continue
if d.prettyprint:
try:
data = json.dumps(json.loads(data.decode('utf-8')), indent=1)
isjson = True
except:
pass
elif d.raw:
try:
data = data.decode('utf-8')
except:
pass
else:
try:
data = data.decode('utf-8')
j = json.loads(data)
if '\n' in data:
data = json.dumps(j)
isjson = True
except:
pass
chan = a['channel']
try:
chan = chan.decode('utf-8')
except: pass
if d.jsoncolor and isjson:
print(chan, pygments.highlight(data, l, f, outfile=None).strip())
else:
if isinstance(data, bytes):
print(chan, highlight(data))
else:
print(chan, ascii(data))
sys.stdout.flush()
except KeyboardInterrupt:
pass