Toggle menu
Toggle personal menu
Not logged in
Your IP address will be publicly visible if you make any edits.

slisten.py

From Randomness wiki
Revision as of 13:49, 12 September 2024 by Derg (talk | contribs) (Derg moved page Slisten.py to slisten.py without leaving a redirect: Misspelled title)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
#!/usr/bin/python3
import argparse
import json
import sys

import redis

a = argparse.ArgumentParser()
a.add_argument('-s', dest='subscribe', action='append')
a.add_argument('-p', dest='psubscribe', action='append')
a.add_argument('-r', dest='raw', action='store_true')
a.add_argument('-n', dest='nochannel', action='store_true')
a.add_argument('-f', dest='prettyprint', action='store_true')
a.add_argument('-c', dest='contains', action='store')
a.add_argument('-j', dest='jsoncolor', action='store_true')

d = a.parse_args()

def highlight(byt):
    table = {0:'000', 1:'001', 2:'002', 3:'003', 4:'004', 5:'005', 6:'006', 7:'a', 8:'b', 9:'t', 10:'n', 11:'v', 12:'f', 13:'r'}
    i = 0
    out = 'b\''
    pb = False
    for a in byt:
        if a >= 32 and a <= 126:
            if pb:
                out += '\033[39m'
                pb = False
            if a == 39: # '
                out += '\\\''
            elif a == 92: # \
                out += '\\\\'
            else:
                out += chr(a)
        else:
            i += 1
            pb = True
            out += '\033[9%sm' % ((i % 2) + 1)
            t = table.get(a)
            if t:
                out += '\\%s' % t
            else:
                out += '\\x%02x' % a

    if pb:
        out += '\033[39m'
    out += "'"
    return out

if d.jsoncolor:
    from pygments.formatters.terminal256 import Terminal256Formatter
    from pygson.json_lexer import JSONLexer
    import pygments

    l = JSONLexer()
    f = Terminal256Formatter()

if not d.subscribe and not d.psubscribe:
    exit('Either -s or -p is required')

r = redis.Redis()
p = r.pubsub()

for a in d.subscribe or []: p.subscribe(a)
for a in d.psubscribe or []: p.psubscribe(a)

try:
    for a in p.listen():
        if a['type'] not in ('message', 'pmessage'): continue
        data = a['data']
        isjson = False
        if d.contains and d.contains not in data: continue
        if d.prettyprint:
            try:
                data = json.dumps(json.loads(data.decode('utf-8')), indent=1)
                isjson = True
            except:
                pass
        elif d.raw:
            try:
                data = data.decode('utf-8')
            except:
                pass
        else:
            try:
                data = data.decode('utf-8')
                j = json.loads(data)
                if '\n' in data:
                    data = json.dumps(j)
                isjson = True
            except:
                pass

        chan = a['channel']
        try:
            chan = chan.decode('utf-8')
        except: pass
        if d.jsoncolor and isjson:
            print(chan, pygments.highlight(data, l, f, outfile=None).strip())
        else:
            if isinstance(data, bytes):
                print(chan, highlight(data))
            else:
                print(chan, ascii(data))
        sys.stdout.flush()
except KeyboardInterrupt:
    pass