#!/usr/bin/python3 # The MIT License (MIT) # # Copyright © 2024 pacman64 # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the “Software”), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. info = ''' tlp [options...] [python expression] [files/URIs...] Transform Lines with Python runs a python expression on each line of text input, encoded as UTF-8. Carriage-returns are always ignored in lines, as well as any UTF-8-BOM on the first line of each input. The expression can use either `l` or `line` for the current line, and `i` as a 0-based line counter, which keeps growing, even across input-sources, when given more than one. Input-sources can be either files or web-URIs. When not given any explicit named sources, the standard input is used. It's even possible to reuse the standard input using multiple single dashes (-) in the order needed: stdin is only read once in this case, and kept for later reuse. When the expression results in None, the current input line is ignored. When the expression results in a boolean, this determines whether the line is emitted to the standard output, or ignored. When the expression emits lists, tuples, or generators, each item is emitted as its own line/result. Since empty containers emit no lines, these are the most general type of results, acting as either filters, or input-amplifiers. Examples # numbers from 0 to 5, each on its own output line; no input is read/used tlp = 'range(6)' # all powers up to the 4th, using each input line auto-parsed into a `float` tlp = 'range(1, 6)' | tlp '(float(l)**p for p in range(1, 4+1))' # separate input lines with an empty line between each; global var `empty` # can be used to avoid bothering with nested shell-quoting tlp = 'range(6)' | tlp '["", l] if i > 0 else l' # ignore errors/exceptions, in favor of the original lines/values tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), line)' # ignore errors/exceptions, calling a fallback func with the exception tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), str)' # filtering lines out via None values head -c 1024 /dev/urandom | strings | tlp 'l if len(l) < 20 else None' # boolean-valued results are concise ways to filter lines out head -c 1024 /dev/urandom | strings | tlp 'len(l) < 20' # function/callable results are automatically called on the current line head -c 1024 /dev/urandom | strings | tlp len ''' from itertools import islice from json import dumps, loads compile_py = compile from re import compile as compile_uncached, IGNORECASE from sys import argv, exit, stderr, stdin from time import sleep from typing import Generator, Iterable if len(argv) < 2: print(info.strip(), file=stderr) exit(0) if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'): print(info.strip()) exit(0) def handle_no_input(expr): res = eval(expr) if isinstance(res, (list, range, tuple, Generator)): for e in res: if not isinstance(e, Skip): print(e, flush=True) return res = adapt_result(res, None) if not (res is None): print(res, flush=True) def handle_lines(src, expr): # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global i, l, line, v, val, value i = 0 for e in src: l = e.rstrip('\r\n').rstrip('\n') if i == 0: l = l.lstrip('\xef\xbb\xbf') line = l try: v = val = value = loads(l) except Exception: v = val = value = Skip() res = eval(expr) i += 1 if isinstance(res, (list, range, tuple, Generator)): for e in res: if not isinstance(e, Skip): print(e, flush=True) continue res = adapt_result(res, line) if not (res is None): print(res, flush=True) def handle_pipe(src, expressions): # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global i, l, line, v, val, value # variable names `o` and `p` work like in the `pyp` tool, except # the pipeline steps were given as separate cmd-line arguments global o, p i = 0 for e in src: l = e.rstrip('\r\n').rstrip('\n') if i == 0: l = l.lstrip('\xef\xbb\xbf') line = l o = p = prev = line try: v = val = value = loads(l) except Exception: v = val = value = Skip() for expr in expressions: p = eval(expr) if callable(p): p = p(prev) prev = p res = p i += 1 if isinstance(res, (list, range, tuple, Generator)): for e in res: if not isinstance(e, Skip): print(e, flush=True) continue res = adapt_result(res, line) if not (res is None): print(res, flush=True) def hold_lines(src, lines): for e in src: lines.append(e) yield e def adapt_result(res, fallback): if isinstance(res, Skip): return res if res is None or res is False: return None if callable(res): return res(fallback) if res is True: return fallback if isinstance(res, dict): return dumps(res, allow_nan=False) return str(res) class Skip: pass skip = Skip() def chunk(items, chunk_size): 'Break iterable into chunks, each with up to the item-count given.' if isinstance(items, str): n = len(items) while n >= chunk_size: yield items[:chunk_size] items = items[chunk_size:] n -= chunk_size if n > 0: yield items return if not isinstance(chunk_size, int): raise Exception('non-integer chunk-size') if chunk_size < 1: raise Exception('non-positive chunk-size') it = iter(items) while True: head = tuple(islice(it, chunk_size)) if not head: return yield head chunked = chunk # re_cache is used by custom func compile to cache previously-compiled # regular-expressions, which makes them quicker to (re)use in formulas re_cache = {} # ire_cache is like re_cache, except it's for case-insensitive regexes ire_cache = {} def compile(expr, flags = 0): 'Speed-up using regexes across lines, by avoiding recompilations.' if flags != 0 and flags != IGNORECASE: msg = 'only the default and case-insensitive options are supported' raise Exception(msg) cache = re_cache if flags == 0 else ire_cache if expr in cache: return cache[expr] pat = compile_uncached(expr, flags) cache[expr] = pat return pat def icompile(expr): return compile(expr, IGNORECASE) def cond(*args): if len(args) == 0: return None for i, e in enumerate(args): if i % 2 == 0 and i < len(args) - 1 and e: return args[i + 1] return args[-1] if len(args) % 2 == 1 else None def dive(into, using): 'Depth-first recursive caller for 1-input functions.' if callable(into): into, using = using, into def rec(v): if isinstance(v, dict): return {k: rec(v) for k, v in v.items()} if isinstance(v, Iterable) and not isinstance(v, str): return [rec(v) for v in v] return using(v) return rec(into) def divekeys(into, using): 'Depth-first recursive caller for 2-input funcs which rename dict keys.' if callable(into): into, using = using, into def rec(v): if isinstance(v, dict): return {using(k): rec(v) for k, v in v.items()} if isinstance(v, Iterable) and not isinstance(v, str): return [rec(v) for i, v in enumerate(v)] return v return rec(None, into) def divekv(into, using, using2 = None): 'Depth-first recursive caller for 2-input functions.' if using2 is None: if callable(into): into, using = using, into else: if not callable(using2): into, using, using2 = using2, into, using def rec(k, v): if isinstance(v, dict): return {k: rec(k, v) for k, v in v.items()} if isinstance(v, Iterable) and not isinstance(v, str): return [rec(i, v) for i, v in enumerate(v)] return using(k, v) def rec2(k, v): if isinstance(v, dict): return {str(using(k, v)): rec2(k, v) for k, v in v.items()} if isinstance(v, Iterable) and not isinstance(v, str): # return {str(using(i, v)): rec2(i, v) for i, v in enumerate(v)} return [rec2(i, v) for i, v in enumerate(v)] return using2(k, v) return rec(None, into) if using2 is None else rec2(None, into) kvdive = divekv def drop(src, *what): if isinstance(src, str): for s in what: src = src.replace(s, '') return src def kdrop(src, what): kv = {} for k, v in src.items(): if not (k in what): kv[k] = v return kv if isinstance(src, dict): return kdrop(src, set(what)) if isinstance(src, Iterable): what = set(what) return [kdrop(e, what) for e in src] return None dropped = drop def join(x, y = ' '): 'Join values into a string, or make a dict from keys and values.' if isinstance(x, str): return x.join(str(v) for v in y) if isinstance(y, str): return y.join(str(v) for v in x) return {k: v for k, v in zip(x, y)} def pick(src, *keys): if isinstance(src, dict): return {k: src.get(k, None) for k in keys} return [{k: e.get(k, None) for k in keys} for e in src] def rescue(attempt, fallback = None): try: return attempt() except Exception as e: if callable(fallback): return fallback(e) return fallback catch = rescue catched = rescue caught = rescue recover = rescue recovered = rescue rescued = rescue def retype(x): 'Try to narrow the type of the value given.' if isinstance(x, float): n = int(x) return n if float(n) == x else x if not isinstance(x, str): return x try: return loads(x) except Exception: pass try: return int(x) except Exception: pass try: return float(x) except Exception: pass return x autocast = retype autocasted = retype mold = retype molded = retype recast = retype recasted = retype remold = retype remolded = retype retyped = retype def json0(x): return dumps(x, separators=(',', ':'), allow_nan=False, indent=None) j0 = json0 def jsonl(x): if isinstance(x, Skip): return def emit(x): sep = (', ', ': ') return dumps(x, separators=sep, allow_nan=False, indent=None) if x is None: yield emit(x) return if isinstance(x, (bool, int, float, dict, str)): yield emit(x) return if isinstance(x, Iterable): for e in x: if isinstance(e, Skip): continue yield emit(x) return yield emit(str(x)) jl = jsonl jsonlines = jsonl ndjson = jsonl def typeof(x): # return str(type(x)) return { type(None): 'null', bool: 'boolean', dict: 'object', float: 'number', int: 'number', str: 'string', list: 'array', tuple: 'array', }.get(type(x), 'other') jstype = typeof def wait(seconds, result): 'Wait the given number of seconds, before returning its latter arg.' t = (int, float) if (not isinstance(seconds, t)) and isinstance(result, t): seconds, result = result, seconds sleep(seconds) return result delay = wait def after(x, what): i = x.find(what) return '' if i < 0 else x[i+len(what):] def afterlast(x, what): i = x.rfind(what) return '' if i < 0 else x[i+len(what):] afterfinal = afterlast def before(x, what): i = x.find(what) return x if i < 0 else x[:i] def beforelast(x, what): i = x.rfind(what) return x if i < 0 else x[:i] beforefinal = beforelast def since(x, what): i = x.find(what) return '' if i < 0 else x[i:] def sincelast(x, what): i = x.rfind(what) return '' if i < 0 else x[i:] sincefinal = sincelast def until(x, what): i = x.find(what) return x if i < 0 else x[:i+len(what)] def untilfinal(x, what): i = x.rfind(what) return x if i < 0 else x[:i+len(what)] untillast = untilfinal def blue(s): return f'\x1b[38;2;0;95;215m{s}\x1b[0m' def blueback(s): return f'\x1b[48;2;0;95;215m\x1b[38;2;238;238;238m{s}\x1b[0m' bluebg = blueback def bold(s): return f'\x1b[1m{s}\x1b[0m' bolded = bold def gbm(s, good = False, bad = False, meh = False): ''' Good, Bad, Meh ANSI-styles a plain string via ANSI-style sequences, according to 1..3 conditions given as boolean(ish) values: these are checked in order, so the first truish one wins. ''' if good: return green(s) if bad: return red(s) if meh: return gray(s) return s def gray(s): return f'\x1b[38;2;168;168;168m{s}\x1b[0m' def grayback(s): return f'\x1b[48;2;168;168;168m\x1b[38;2;238;238;238m{s}\x1b[0m' graybg = grayback def green(s): return f'\x1b[38;2;0;135;95m{s}\x1b[0m' def greenback(s): return f'\x1b[48;2;0;135;95m\x1b[38;2;238;238;238m{s}\x1b[0m' greenbg = greenback def highlight(s): return f'\x1b[7m{s}\x1b[0m' hilite = highlight def magenta(s): return f'\x1b[38;2;215;0;255m{s}\x1b[0m' def magentaback(s): return f'\x1b[48;2;215;0;255m\x1b[38;2;238;238;238m{s}\x1b[0m' magback = magentaback magbg = magentaback magentabg = magentaback def orange(s): return f'\x1b[38;2;215;95;0m{s}\x1b[0m' def orangeback(s): return f'\x1b[48;2;215;95;0m\x1b[38;2;238;238;238m{s}\x1b[0m' orangebg = orangeback orback = orangeback orbg = orangeback def purple(s): return f'\x1b[38;2;135;95;255m{s}\x1b[0m' def purpleback(s): return f'\x1b[48;2;135;95;255m\x1b[38;2;238;238;238m{s}\x1b[0m' purback = purpleback purbg = purpleback purplebg = purpleback def red(s): return f'\x1b[38;2;204;0;0m{s}\x1b[0m' def redback(s): return f'\x1b[38;2;204;0;0m\x1b[38;2;238;238;238m{s}\x1b[0m' redbg = redback def underline(s): return f'\x1b[4m{s}\x1b[0m' underlined = underline def fail(msg, code = 1): print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr) exit(code) def make_open_utf8(open): def open_utf8_readonly(path): return open(path, encoding='utf-8') return open_utf8_readonly def message(msg, result = None): print(msg, file=stderr) return result msg = message def seemsurl(path): protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') return any(path.startswith(p) for p in protocols) cr = '\r' crlf = '\r\n' dquo = '"' dquote = '"' empty = '' lcurly = '{' lf = '\n' rcurly = '}' s = '' squo = '\'' squote = '\'' # utf8bom = '\xef\xbb\xbf' nil = None none = None null = None exec = None open_utf8 = make_open_utf8(open) open = open_utf8 no_input_opts = ( '=', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null', ) modules_opts = ( '-m', '--m', '-mod', '--mod', '-module', '--module', '-modules', '--modules', ) more_modules_opts = ('-mm', '--mm', '-more', '--more') pipe_opts = ('-p', '--p', '-pipe', '--pipe') trace_opts = ('-t', '--t', '-trace', '--trace', '-traceback', '--traceback') args = argv[1:] if any(seemsurl(e) for e in args): from io import TextIOWrapper from urllib.request import urlopen no_input = False pipe_mode = False trace_errors = False while len(args) > 0: if args[0] in no_input_opts: no_input = True args = args[1:] continue if args[0] in pipe_opts: pipe_mode = True args = args[1:] break if args[0] in modules_opts: try: if len(args) < 2: msg = 'a module name or a comma-separated list of modules' raise Exception('expected ' + msg) g = globals() from importlib import import_module for e in args[1].split(','): g[e] = import_module(e) g = None import_module = None args = args[2:] except Exception as e: fail(e, 1) continue if args[0] in more_modules_opts: import functools, itertools, json, math, random, statistics, string, time args = args[1:] continue if args[0] in trace_opts: trace_errors = True args = args[1:] continue break try: if pipe_mode: if no_input: raise Exception('can\'t use pipe-mode when input is disabled') exprs = [compile_py(e, e, mode='eval') for e in args] compile_py = None handle_pipe(stdin, exprs) exit(0) expr = '.' if len(args) > 0: expr = args[0] args = args[1:] if expr == '.' and no_input: print(info.strip(), file=stderr) exit(0) if expr == '.': expr = 'line' expr = compile_py(expr, expr, mode='eval') compile_py = None if no_input: handle_no_input(expr) exit(0) if len(args) == 0: handle_lines(stdin, expr) exit(0) got_stdin = False all_stdin = None dashes = args.count('-') for path in args: if path == '-': if dashes > 1: if not got_stdin: all_stdin = [] handle_lines(hold_lines(stdin, all_stdin), expr) got_stdin = True else: handle_lines(all_stdin, expr) else: handle_lines(stdin, expr) continue if seemsurl(path): with urlopen(path) as inp: with TextIOWrapper(inp, encoding='utf-8') as txt: handle_lines(txt, expr) continue with open_utf8(path) as txt: handle_lines(txt, expr) except BrokenPipeError: # quit quietly, instead of showing a confusing error message stderr.close() exit(0) except KeyboardInterrupt: # stderr.close() exit(2) except Exception as e: if trace_errors: raise e else: fail(e, 1)