File: tlp.py
   1 #!/usr/bin/python
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright (c) 2026 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the "Software"), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tlp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Lines with Python runs a python expression on each line of text
  31 input, encoded as UTF-8. Carriage-returns are always ignored in lines, as
  32 well as any UTF-8-BOM on the first line of each input.
  33 
  34 The expression can use either `l` or `line` for the current line, and `i` as
  35 a 0-based line counter which keeps growing even across input-sources, when
  36 given multiple inputs. Also available is `n`, a 1-based line counter which
  37 otherwise works the same way.
  38 
  39 Each line is automatically parsed as JSON: when successful, the parsed line
  40 is available to the expression as `v`, or `value`, with value `err` set to
  41 None, since parsing succeeded; on failure, `v` and `value` are set to None,
  42 while `err` has the exception as a value. You can check success/failure by
  43 checking if `err` is None, or not.
  44 
  45 Input-sources can be either files or web-URIs. When not given any explicit
  46 named sources, the standard input is used. It's even possible to reuse the
  47 standard input using multiple single dashes (-) in the order needed: stdin
  48 is only read once in this case, and kept for later reuse.
  49 
  50 When the expression results in None, the current input line is ignored. When
  51 the expression results in a boolean, its value determines whether each line
  52 is emitted to the standard output, or ignored.
  53 
  54 When the expression emits lists, tuples, or generators, each item is emitted
  55 as its own line/result. Since empty containers emit no lines, these are the
  56 most general type of results, acting as either filters, or input-amplifiers.
  57 
  58 
  59 Examples
  60 
  61 # numbers from 0 to 5, each on its own output line; no input is read/used
  62 tlp = 'range(6)'
  63 
  64 # all powers up to the 4th, using each input line auto-parsed into a `float`
  65 tlp = 'range(1, 6)' | tlp '(v**p for p in range(1, 4+1))'
  66 
  67 # separate input lines with an empty line between each; global var `empty`
  68 # can be used to avoid bothering with nested shell-quoting
  69 tlp = 'range(6)' | tlp '["", l] if i > 0 else l'
  70 
  71 # ignore errors/exceptions, in favor of the original lines/values
  72 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), line)'
  73 
  74 # ignore errors/exceptions, calling a fallback func with the exception
  75 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), str)'
  76 
  77 # filtering lines out via None values
  78 head -c 1024 /dev/urandom | strings | tlp 'l if len(l) < 20 else None'
  79 
  80 # boolean-valued results are concise ways to filter lines out
  81 head -c 1024 /dev/urandom | strings | tlp 'len(l) < 20'
  82 
  83 # function/callable results are automatically called on the current line
  84 head -c 1024 /dev/urandom | strings | tlp len
  85 
  86 # emit 10 random integers between 1 and 10
  87 tlp -m random = '(random.randint(1, 10) for _ in range(10))'
  88 
  89 # emit standard input lines slowly, delaying output 0.5 seconds each time
  90 tlp -m time '(time.sleep(0.5), line)[-1]'
  91 
  92 # emit documentation for collections.defaultdict from the python stdlib
  93 tlp = -m collections 'help(collections.defaultdict)' | cat
  94 '''
  95 
  96 
  97 from itertools import islice
  98 from json import dumps, loads
  99 from math import isinf, isnan
 100 from re import compile as compile_uncached, IGNORECASE
 101 from sys import argv, exit, stderr, stdin
 102 from time import localtime, sleep, strftime
 103 from typing import Generator, Iterable
 104 
 105 
 106 if len(argv) < 2:
 107     print(info.strip(), file=stderr)
 108     exit(0)
 109 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'):
 110     print(info.strip())
 111     exit(0)
 112 
 113 
 114 def handle_no_input(expr):
 115     res = eval(expr)
 116     if isinstance(res, (list, range, tuple, Generator)):
 117         for e in res:
 118             e = adapt_result(e, None)
 119             if not (e is None):
 120                 print(e, flush=True)
 121         return
 122 
 123     res = adapt_result(res, None)
 124     if not (res is None):
 125         print(res, flush=True)
 126 
 127 def handle_lines(src, expr):
 128     # `comprehension` expressions seem to ignore local variables: even
 129     # lambda-based workarounds fail
 130     global i, n, l, line, v, val, value, e, err, error
 131 
 132     i = 0
 133     n = 1
 134     e = err = error = None
 135 
 136     for l in src:
 137         l = l.rstrip('\r\n').rstrip('\n')
 138         if i == 0:
 139             l = l.lstrip('\xef\xbb\xbf')
 140 
 141         line = l
 142         try:
 143             e = err = error = None
 144             v = val = value = loads(l)
 145         except BrokenPipeError as ex:
 146             raise ex
 147         except Exception as ex:
 148             e = err = error = ex
 149             v = val = value = Skip()
 150         res = eval(expr)
 151         i += 1
 152         n += 1
 153 
 154         if isinstance(res, (list, range, tuple, Generator)):
 155             for e in res:
 156                 e = adapt_result(e, None)
 157                 if not (e is None):
 158                     print(e, flush=True)
 159             continue
 160 
 161         res = adapt_result(res, line)
 162         if not (res is None):
 163             print(res, flush=True)
 164 
 165 def hold_lines(src, lines):
 166     for e in src:
 167         lines.append(e)
 168         yield e
 169 
 170 def adapt_result(res, fallback):
 171     if isinstance(res, BaseException):
 172         raise res
 173     if isinstance(res, Skip) or res is None or res is False:
 174         return None
 175     if callable(res):
 176         return res(fallback)
 177     if res is True:
 178         return fallback
 179     if isinstance(res, dict):
 180         return dumps(res, allow_nan=False)
 181     return str(res)
 182 
 183 def fail(msg, code = 1):
 184     print(str(msg), file=stderr)
 185     exit(code)
 186 
 187 def make_open_utf8(open):
 188     def open_utf8_readonly(path):
 189         return open(path, encoding='utf-8')
 190     return open_utf8_readonly
 191 
 192 def seemsurl(path):
 193     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 194     return any(path.startswith(p) for p in protocols)
 195 
 196 class Skip:
 197     pass
 198 
 199 skip = Skip()
 200 
 201 def chunk(items, chunk_size):
 202     'Break iterable into chunks, each with up to the item-count given.'
 203 
 204     if isinstance(items, str):
 205         n = len(items)
 206         while n >= chunk_size:
 207             yield items[:chunk_size]
 208             items = items[chunk_size:]
 209             n -= chunk_size
 210         if n > 0:
 211             yield items
 212         return
 213 
 214     if not isinstance(chunk_size, int):
 215         raise Exception('non-integer chunk-size')
 216     if chunk_size < 1:
 217         raise Exception('non-positive chunk-size')
 218 
 219     it = iter(items)
 220     while True:
 221         head = tuple(islice(it, chunk_size))
 222         if not head:
 223             return
 224         yield head
 225 
 226 chunked = chunk
 227 
 228 # re_cache is used by custom func compile to cache previously-compiled
 229 # regular-expressions, which makes them quicker to (re)use in formulas
 230 re_cache = {}
 231 
 232 def re_compile(expr, flags = 0):
 233     'Speed-up using regexes across lines, by avoiding recompilations.'
 234 
 235     if flags in re_cache:
 236         cache = re_cache[flags]
 237     else:
 238         cache = {}
 239         re_cache[flags] = cache
 240     if expr in cache:
 241         return cache[expr]
 242 
 243     pat = compile_uncached(expr, flags)
 244     cache[expr] = pat
 245     return pat
 246 
 247 def icompile(expr):
 248     return re_compile(expr, IGNORECASE)
 249 
 250 def cond(*args):
 251     if len(args) == 0:
 252         return None
 253 
 254     for i, e in enumerate(args):
 255         if i % 2 == 0 and i < len(args) - 1 and e:
 256             return args[i + 1]
 257 
 258     return args[-1] if len(args) % 2 == 1 else None
 259 
 260 def dive(into, using):
 261     'Depth-first recursive caller for 1-input functions.'
 262 
 263     if callable(into):
 264         into, using = using, into
 265 
 266     def rec(v):
 267         if isinstance(v, dict):
 268             return {k: rec(v) for k, v in v.items()}
 269         if isinstance(v, Iterable) and not isinstance(v, str):
 270             return [rec(v) for v in v]
 271         return using(v)
 272 
 273     return rec(into)
 274 
 275 def divekeys(into, using):
 276     'Depth-first recursive caller for 2-input funcs which rename dict keys.'
 277 
 278     if callable(into):
 279         into, using = using, into
 280 
 281     def rec(v):
 282         if isinstance(v, dict):
 283             return {using(k): rec(v) for k, v in v.items()}
 284         if isinstance(v, Iterable) and not isinstance(v, str):
 285             return [rec(v) for i, v in enumerate(v)]
 286         return v
 287 
 288     return rec(None, into)
 289 
 290 def divekv(into, using, using2 = None):
 291     'Depth-first recursive caller for 2-input functions.'
 292 
 293     if using2 is None:
 294         if callable(into):
 295             into, using = using, into
 296     else:
 297         if not callable(using2):
 298             into, using, using2 = using2, into, using
 299 
 300     def rec(k, v):
 301         if isinstance(v, dict):
 302             return {k: rec(k, v) for k, v in v.items()}
 303         if isinstance(v, Iterable) and not isinstance(v, str):
 304             return [rec(i, v) for i, v in enumerate(v)]
 305         return using(k, v)
 306 
 307     def rec2(k, v):
 308         if isinstance(v, dict):
 309             return {str(using(k, v)): rec2(k, v) for k, v in v.items()}
 310         if isinstance(v, Iterable) and not isinstance(v, str):
 311             # return {str(using(i, v)): rec2(i, v) for i, v in enumerate(v)}
 312             return [rec2(i, v) for i, v in enumerate(v)]
 313         return using2(k, v)
 314 
 315     return rec(None, into) if using2 is None else rec2(None, into)
 316 
 317 kvdive = divekv
 318 
 319 def drop(src, *what):
 320     if isinstance(src, str):
 321         for s in what:
 322             src = src.replace(s, '')
 323         return src
 324 
 325     def kdrop(src, what):
 326         return {k: v for (k, v) in src.items() if not (k in what)}
 327 
 328     if isinstance(src, dict):
 329         return kdrop(src, set(what))
 330 
 331     if isinstance(src, Iterable):
 332         what = set(what)
 333         return [kdrop(e, what) for e in src if isinstance(e, dict)]
 334 
 335     return None
 336 
 337 dropped = drop
 338 
 339 def join(x, y = ' '):
 340     'Join values into a string, or make a dict from keys and values.'
 341 
 342     if isinstance(x, str):
 343         return x.join(str(v) for v in y)
 344     if isinstance(y, str):
 345         return y.join(str(v) for v in x)
 346     return {k: v for k, v in zip(x, y)}
 347 
 348 def maybe(f, x):
 349     try:
 350         return f(x)
 351     except Exception as _:
 352         return x
 353 
 354 def number(x):
 355     try:
 356         return int(x)
 357     except Exception as _:
 358         pass
 359     try:
 360         return float(x)
 361     except Exception as _:
 362         return x
 363 
 364 def pick(src, *keys):
 365     if isinstance(src, dict):
 366         return {k: src.get(k, None) for k in keys}
 367     return [{k: e.get(k, None) for k in keys} for e in src if isinstance(e, dict)]
 368 
 369 picked = pick
 370 
 371 def plain(s):
 372     'Ignore all ANSI-style sequences in a string.'
 373     return re_compile('''\x1b\\[([0-9;]+m|[0-9]*[A-HJKST])''').sub('', s)
 374 
 375 def predicate(x):
 376     'Helps various higher-order funcs, by standardizing `predicate` values.'
 377     if callable(x):
 378         return x
 379     if isinstance(x, float):
 380         if isnan(x):
 381             return lambda y: isinstance(y, float) and isnan(y)
 382         if isinf(x):
 383             return lambda y: isinstance(y, float) and isinf(y)
 384     return lambda y: x == y
 385 
 386 def rescue(attempt, fallback = None):
 387     try:
 388         return attempt()
 389     except BrokenPipeError as e:
 390         raise e
 391     except Exception as e:
 392         if callable(fallback):
 393             return fallback(e)
 394         return fallback
 395 
 396 rescued = rescue
 397 
 398 def retype(x):
 399     'Try to narrow the type of the value given.'
 400 
 401     if isinstance(x, float):
 402         n = int(x)
 403         return n if float(n) == x else x
 404 
 405     if not isinstance(x, str):
 406         return x
 407 
 408     try:
 409         return loads(x)
 410     except Exception:
 411         pass
 412 
 413     try:
 414         return int(x)
 415     except Exception:
 416         pass
 417 
 418     try:
 419         return float(x)
 420     except Exception:
 421         pass
 422 
 423     return x
 424 
 425 autocast = autocasted = mold = molded = recast = recasted = remold = retype
 426 remolded = retyped = retype
 427 
 428 def json0(x):
 429     return dumps(x, separators=(',', ':'), allow_nan=False, indent=None)
 430 
 431 j0 = json0
 432 
 433 def jsonl(x):
 434     if isinstance(x, Skip):
 435         return
 436 
 437     def emit(x):
 438         return dumps(x, separators=(', ', ': '), allow_nan=False, indent=None)
 439 
 440     if x is None:
 441         yield emit(x)
 442         return
 443 
 444     if isinstance(x, (bool, int, float, dict, str)):
 445         yield emit(x)
 446         return
 447 
 448     if isinstance(x, Iterable):
 449         for e in x:
 450             if isinstance(e, Skip):
 451                 continue
 452             yield emit(x)
 453         return
 454 
 455     yield emit(str(x))
 456 
 457 jl = jsonlines = ndjson = jsonl
 458 
 459 def typeof(x):
 460     # return str(type(x))
 461     return {
 462         type(None): 'null',
 463         bool: 'boolean',
 464         dict: 'object',
 465         float: 'number',
 466         int: 'number',
 467         str: 'string',
 468         list: 'array',
 469         tuple: 'array',
 470     }.get(type(x), 'other')
 471 
 472 jstype = typeof
 473 
 474 def wait(seconds, result):
 475     'Wait the given number of seconds, before returning its latter arg.'
 476 
 477     if not isinstance(seconds, (int, float)):
 478         if isinstance(result, (int, float)):
 479             seconds, result = result, seconds
 480     sleep(seconds)
 481     return result
 482 
 483 delay = wait
 484 
 485 def after(x, what):
 486     i = x.find(what)
 487     return '' if i < 0 else x[i+len(what):]
 488 
 489 def afterlast(x, what):
 490     i = x.rfind(what)
 491     return '' if i < 0 else x[i+len(what):]
 492 
 493 afterfinal = afterlast
 494 
 495 def before(x, what):
 496     i = x.find(what)
 497     return x if i < 0 else x[:i]
 498 
 499 def beforelast(x, what):
 500     i = x.rfind(what)
 501     return x if i < 0 else x[:i]
 502 
 503 beforefinal = beforelast
 504 
 505 def since(x, what):
 506     i = x.find(what)
 507     return '' if i < 0 else x[i:]
 508 
 509 def sincelast(x, what):
 510     i = x.rfind(what)
 511     return '' if i < 0 else x[i:]
 512 
 513 sincefinal = sincelast
 514 
 515 def until(x, what):
 516     i = x.find(what)
 517     return x if i < 0 else x[:i+len(what)]
 518 
 519 def untilfinal(x, what):
 520     i = x.rfind(what)
 521     return x if i < 0 else x[:i+len(what)]
 522 
 523 untillast = untilfinal
 524 
 525 def blue(s):
 526     return f'\x1b[38;2;0;95;215m{s}\x1b[0m'
 527 
 528 def blueback(s):
 529     return f'\x1b[48;2;0;95;215m\x1b[38;2;238;238;238m{s}\x1b[0m'
 530 
 531 bluebg = blueback
 532 
 533 def bold(s):
 534     return f'\x1b[1m{s}\x1b[0m'
 535 
 536 bolded = bold
 537 
 538 def gbm(s, good = False, bad = False, meh = False):
 539     '''
 540     Good, Bad, Meh ANSI-styles a plain string via ANSI-style sequences,
 541     according to 1..3 conditions given as boolean(ish) values: these are
 542     checked in order, so the first truish one wins.
 543     '''
 544 
 545     if good:
 546         return green(s)
 547     if bad:
 548         return red(s)
 549     if meh:
 550         return gray(s)
 551     return s
 552 
 553 def gray(s):
 554     return f'\x1b[38;2;168;168;168m{s}\x1b[0m'
 555 
 556 def grayback(s):
 557     return f'\x1b[48;2;168;168;168m\x1b[38;2;238;238;238m{s}\x1b[0m'
 558 
 559 def green(s):
 560     return f'\x1b[38;2;0;135;95m{s}\x1b[0m'
 561 
 562 def greenback(s):
 563     return f'\x1b[48;2;0;135;95m\x1b[38;2;238;238;238m{s}\x1b[0m'
 564 
 565 def highlight(s):
 566     return f'\x1b[7m{s}\x1b[0m'
 567 
 568 hilite = highlight
 569 
 570 def orange(s):
 571     return f'\x1b[38;2;215;95;0m{s}\x1b[0m'
 572 
 573 def orangeback(s):
 574     return f'\x1b[48;2;215;95;0m\x1b[38;2;238;238;238m{s}\x1b[0m'
 575 
 576 def purple(s):
 577     return f'\x1b[38;2;135;95;255m{s}\x1b[0m'
 578 
 579 def purpleback(s):
 580     return f'\x1b[48;2;135;95;255m\x1b[38;2;238;238;238m{s}\x1b[0m'
 581 
 582 def red(s):
 583     return f'\x1b[38;2;204;0;0m{s}\x1b[0m'
 584 
 585 def redback(s):
 586     return f'\x1b[38;2;204;0;0m\x1b[38;2;238;238;238m{s}\x1b[0m'
 587 
 588 def underline(s):
 589     return f'\x1b[4m{s}\x1b[0m'
 590 
 591 underlined = underline
 592 
 593 def message(msg, result = None):
 594     print(msg, file=stderr)
 595     return result
 596 
 597 msg = message
 598 
 599 # seen is used by func `once` to remember previously-given values
 600 seen = set()
 601 
 602 def once(x):
 603     if x in seen:
 604         return None
 605     seen.add(x)
 606     return x
 607 
 608 dedup = unique = once
 609 
 610 def utf8(x):
 611     try:
 612         if isinstance(x, str):
 613             x = x.encode('utf-8')
 614         return str(x, 'utf-8')
 615     except Exception:
 616         return None
 617 
 618 def ymdhms(when = None):
 619     fmt = f'%Y-%m-%d %H:%M:%S'
 620     if isinstance(when, (float, int)):
 621         return strftime(fmt, localtime(float(when)))
 622     if isinstance(when, tuple):
 623         return strftime(fmt, when)
 624     return strftime(fmt, localtime())
 625 
 626 
 627 cr = '\r'
 628 crlf = '\r\n'
 629 dquo = dquote = '"'
 630 empty = ''
 631 lcurly = '{'
 632 lf = '\n'
 633 rcurly = '}'
 634 space = ' '
 635 squo = squote = '\''
 636 tab = '\t'
 637 
 638 nil = none = null = None
 639 
 640 
 641 exec = None
 642 open_utf8 = make_open_utf8(open)
 643 open = open_utf8
 644 
 645 no_input_opts = (
 646     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 647 )
 648 modules_opts = (
 649     '-m', '--m', '-mod', '--mod', '-module', '--module',
 650     '-modules', '--modules',
 651 )
 652 trace_opts = ('-t', '--t', '-trace', '--trace', '-traceback', '--traceback')
 653 
 654 args = argv[1:]
 655 if any(seemsurl(e) for e in args):
 656     from io import TextIOWrapper
 657     from urllib.request import urlopen
 658 
 659 no_input = False
 660 trace_errors = False
 661 
 662 while len(args) > 0:
 663     if args[0] == '--':
 664         args = args[1:]
 665         break
 666 
 667     if args[0] in no_input_opts:
 668         no_input = True
 669         args = args[1:]
 670         continue
 671 
 672     if args[0] in modules_opts:
 673         try:
 674             if len(args) < 2:
 675                 msg = 'a module name or a comma-separated list of modules'
 676                 raise Exception('expected ' + msg)
 677 
 678             g = globals()
 679             from importlib import import_module
 680             for e in args[1].split(','):
 681                 g[e] = import_module(e)
 682 
 683             g = None
 684             import_module = None
 685             args = args[2:]
 686         except Exception as e:
 687             fail(e, 1)
 688 
 689         continue
 690 
 691     if args[0] in trace_opts:
 692         trace_errors = True
 693         args = args[1:]
 694         continue
 695 
 696     break
 697 
 698 
 699 try:
 700     expr = '.'
 701     if len(args) > 0:
 702         expr = args[0]
 703         args = args[1:]
 704 
 705     if expr == '.' and no_input:
 706         print(info.strip(), file=stderr)
 707         exit(0)
 708 
 709     if expr == '.':
 710         expr = 'line'
 711 
 712     expr = compile(expr, expr, mode='eval')
 713     compile = None
 714 
 715     if no_input:
 716         handle_no_input(expr)
 717         exit(0)
 718 
 719     if len(args) == 0:
 720         handle_lines(stdin, expr)
 721         exit(0)
 722 
 723     got_stdin = False
 724     all_stdin = None
 725     dashes = args.count('-')
 726 
 727     for path in args:
 728         if path == '-':
 729             if dashes > 1:
 730                 if not got_stdin:
 731                     all_stdin = []
 732                     handle_lines(hold_lines(stdin, all_stdin), expr)
 733                     got_stdin = True
 734                 else:
 735                     handle_lines(all_stdin, expr)
 736             else:
 737                 handle_lines(stdin, expr)
 738             continue
 739 
 740         if seemsurl(path):
 741             with urlopen(path) as inp:
 742                 with TextIOWrapper(inp, encoding='utf-8') as txt:
 743                     handle_lines(txt, expr)
 744             continue
 745 
 746         with open_utf8(path) as txt:
 747             handle_lines(txt, expr)
 748 except BrokenPipeError:
 749     # quit quietly, instead of showing a confusing error message
 750     stderr.close()
 751     exit(0)
 752 except KeyboardInterrupt:
 753     exit(2)
 754 except Exception as e:
 755     if trace_errors:
 756         raise e
 757     else:
 758         fail(e, 1)