File: tlp.py 1 #!/usr/bin/python 2 3 # The MIT License (MIT) 4 # 5 # Copyright (c) 2026 pacman64 6 # 7 # Permission is hereby granted, free of charge, to any person obtaining a copy 8 # of this software and associated documentation files (the "Software"), to deal 9 # in the Software without restriction, including without limitation the rights 10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 # copies of the Software, and to permit persons to whom the Software is 12 # furnished to do so, subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be included in 15 # all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23 # SOFTWARE. 24 25 26 info = ''' 27 tlp [options...] [python expression] [files/URIs...] 28 29 30 Transform Lines with Python runs a python expression on each line of text 31 input, encoded as UTF-8. Carriage-returns are always ignored in lines, as 32 well as any UTF-8-BOM on the first line of each input. 33 34 The expression can use either `l` or `line` for the current line, and `i` as 35 a 0-based line counter which keeps growing even across input-sources, when 36 given multiple inputs. Also available is `n`, a 1-based line counter which 37 otherwise works the same way. 38 39 Each line is automatically parsed as JSON: when successful, the parsed line 40 is available to the expression as `v`, or `value`, with value `err` set to 41 None, since parsing succeeded; on failure, `v` and `value` are set to None, 42 while `err` has the exception as a value. You can check success/failure by 43 checking if `err` is None, or not. 44 45 Input-sources can be either files or web-URIs. When not given any explicit 46 named sources, the standard input is used. It's even possible to reuse the 47 standard input using multiple single dashes (-) in the order needed: stdin 48 is only read once in this case, and kept for later reuse. 49 50 When the expression results in None, the current input line is ignored. When 51 the expression results in a boolean, its value determines whether each line 52 is emitted to the standard output, or ignored. 53 54 When the expression emits lists, tuples, or generators, each item is emitted 55 as its own line/result. Since empty containers emit no lines, these are the 56 most general type of results, acting as either filters, or input-amplifiers. 57 58 59 Examples 60 61 # numbers from 0 to 5, each on its own output line; no input is read/used 62 tlp = 'range(6)' 63 64 # all powers up to the 4th, using each input line auto-parsed into a `float` 65 tlp = 'range(1, 6)' | tlp '(v**p for p in range(1, 4+1))' 66 67 # separate input lines with an empty line between each; global var `empty` 68 # can be used to avoid bothering with nested shell-quoting 69 tlp = 'range(6)' | tlp '["", l] if i > 0 else l' 70 71 # ignore errors/exceptions, in favor of the original lines/values 72 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), line)' 73 74 # ignore errors/exceptions, calling a fallback func with the exception 75 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), str)' 76 77 # filtering lines out via None values 78 head -c 1024 /dev/urandom | strings | tlp 'l if len(l) < 20 else None' 79 80 # boolean-valued results are concise ways to filter lines out 81 head -c 1024 /dev/urandom | strings | tlp 'len(l) < 20' 82 83 # function/callable results are automatically called on the current line 84 head -c 1024 /dev/urandom | strings | tlp len 85 86 # emit 10 random integers between 1 and 10 87 tlp -m random = '(random.randint(1, 10) for _ in range(10))' 88 89 # emit standard input lines slowly, delaying output 0.5 seconds each time 90 tlp -m time '(time.sleep(0.5), line)[-1]' 91 92 # emit documentation for collections.defaultdict from the python stdlib 93 tlp = -m collections 'help(collections.defaultdict)' | cat 94 ''' 95 96 97 from itertools import islice 98 from json import dumps, loads 99 from re import compile as compile_uncached, IGNORECASE 100 from sys import argv, exit, stderr, stdin 101 from time import localtime, sleep, strftime 102 from typing import Generator, Iterable 103 104 105 if len(argv) < 2: 106 print(info.strip(), file=stderr) 107 exit(0) 108 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'): 109 print(info.strip()) 110 exit(0) 111 112 113 def handle_no_input(expr): 114 res = eval(expr) 115 if isinstance(res, (list, range, tuple, Generator)): 116 for e in res: 117 e = adapt_result(e, None) 118 if not (e is None): 119 print(e, flush=True) 120 return 121 122 res = adapt_result(res, None) 123 if not (res is None): 124 print(res, flush=True) 125 126 def handle_lines(src, expr): 127 # `comprehension` expressions seem to ignore local variables: even 128 # lambda-based workarounds fail 129 global i, n, l, line, v, val, value, e, err, error 130 131 i = 0 132 n = 1 133 e = err = error = None 134 135 for l in src: 136 l = l.rstrip('\r\n').rstrip('\n') 137 if i == 0: 138 l = l.lstrip('\xef\xbb\xbf') 139 140 line = l 141 try: 142 e = err = error = None 143 v = val = value = loads(l) 144 except BrokenPipeError as ex: 145 raise ex 146 except Exception as ex: 147 e = err = error = ex 148 v = val = value = Skip() 149 res = eval(expr) 150 i += 1 151 n += 1 152 153 if isinstance(res, (list, range, tuple, Generator)): 154 for e in res: 155 e = adapt_result(e, None) 156 if not (e is None): 157 print(e, flush=True) 158 continue 159 160 res = adapt_result(res, line) 161 if not (res is None): 162 print(res, flush=True) 163 164 def handle_pipe(src, funcs): 165 # `comprehension` expressions seem to ignore local variables: even 166 # lambda-based workarounds fail 167 global i, n, l, line, v, val, value, e, err, error 168 # variable names `o` and `p` work like in the `pyp` tool, except 169 # the pipeline steps were given as separate cmd-line arguments 170 global o, p 171 172 i = 0 173 n = 1 174 e = err = error = None 175 176 for l in src: 177 l = l.rstrip('\r\n').rstrip('\n') 178 if i == 0: 179 l = l.lstrip('\xef\xbb\xbf') 180 181 line = l 182 o = p = prev = line 183 # seen is used by func `once` to remember previously-given values 184 seen.clear() 185 186 try: 187 e = err = error = None 188 v = val = value = loads(l) 189 except BrokenPipeError as e: 190 raise e 191 except Exception as ex: 192 e = err = error = ex 193 v = val = value = Skip() 194 195 for f in funcs: 196 p = f(p) 197 if callable(p): 198 p = p(prev) 199 prev = p 200 201 res = p 202 i += 1 203 n += 1 204 205 if isinstance(res, (list, range, tuple, Generator)): 206 for e in res: 207 e = adapt_result(e, None) 208 if not (e is None): 209 print(e, flush=True) 210 continue 211 212 res = adapt_result(res, line) 213 if not (res is None): 214 print(res, flush=True) 215 216 def hold_lines(src, lines): 217 for e in src: 218 lines.append(e) 219 yield e 220 221 def adapt_result(res, fallback): 222 if isinstance(res, BaseException): 223 raise res 224 if isinstance(res, Skip) or res is None or res is False: 225 return None 226 if callable(res): 227 return res(fallback) 228 if res is True: 229 return fallback 230 if isinstance(res, dict): 231 return dumps(res, allow_nan=False) 232 return str(res) 233 234 def fail(msg, code = 1): 235 print(str(msg), file=stderr) 236 exit(code) 237 238 def make_open_utf8(open): 239 def open_utf8_readonly(path): 240 return open(path, encoding='utf-8') 241 return open_utf8_readonly 242 243 def seemsurl(path): 244 protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') 245 return any(path.startswith(p) for p in protocols) 246 247 class Skip: 248 pass 249 250 skip = Skip() 251 252 def chunk(items, chunk_size): 253 'Break iterable into chunks, each with up to the item-count given.' 254 255 if isinstance(items, str): 256 n = len(items) 257 while n >= chunk_size: 258 yield items[:chunk_size] 259 items = items[chunk_size:] 260 n -= chunk_size 261 if n > 0: 262 yield items 263 return 264 265 if not isinstance(chunk_size, int): 266 raise Exception('non-integer chunk-size') 267 if chunk_size < 1: 268 raise Exception('non-positive chunk-size') 269 270 it = iter(items) 271 while True: 272 head = tuple(islice(it, chunk_size)) 273 if not head: 274 return 275 yield head 276 277 chunked = chunk 278 279 # re_cache is used by custom func compile to cache previously-compiled 280 # regular-expressions, which makes them quicker to (re)use in formulas 281 re_cache = {} 282 283 def re_compile(expr, flags = 0): 284 'Speed-up using regexes across lines, by avoiding recompilations.' 285 286 if flags in re_cache: 287 cache = re_cache[flags] 288 else: 289 cache = {} 290 re_cache[flags] = cache 291 if expr in cache: 292 return cache[expr] 293 294 pat = compile_uncached(expr, flags) 295 cache[expr] = pat 296 return pat 297 298 def icompile(expr): 299 return re_compile(expr, IGNORECASE) 300 301 def cond(*args): 302 if len(args) == 0: 303 return None 304 305 for i, e in enumerate(args): 306 if i % 2 == 0 and i < len(args) - 1 and e: 307 return args[i + 1] 308 309 return args[-1] if len(args) % 2 == 1 else None 310 311 def dive(into, using): 312 'Depth-first recursive caller for 1-input functions.' 313 314 if callable(into): 315 into, using = using, into 316 317 def rec(v): 318 if isinstance(v, dict): 319 return {k: rec(v) for k, v in v.items()} 320 if isinstance(v, Iterable) and not isinstance(v, str): 321 return [rec(v) for v in v] 322 return using(v) 323 324 return rec(into) 325 326 def divekeys(into, using): 327 'Depth-first recursive caller for 2-input funcs which rename dict keys.' 328 329 if callable(into): 330 into, using = using, into 331 332 def rec(v): 333 if isinstance(v, dict): 334 return {using(k): rec(v) for k, v in v.items()} 335 if isinstance(v, Iterable) and not isinstance(v, str): 336 return [rec(v) for i, v in enumerate(v)] 337 return v 338 339 return rec(None, into) 340 341 def divekv(into, using, using2 = None): 342 'Depth-first recursive caller for 2-input functions.' 343 344 if using2 is None: 345 if callable(into): 346 into, using = using, into 347 else: 348 if not callable(using2): 349 into, using, using2 = using2, into, using 350 351 def rec(k, v): 352 if isinstance(v, dict): 353 return {k: rec(k, v) for k, v in v.items()} 354 if isinstance(v, Iterable) and not isinstance(v, str): 355 return [rec(i, v) for i, v in enumerate(v)] 356 return using(k, v) 357 358 def rec2(k, v): 359 if isinstance(v, dict): 360 return {str(using(k, v)): rec2(k, v) for k, v in v.items()} 361 if isinstance(v, Iterable) and not isinstance(v, str): 362 # return {str(using(i, v)): rec2(i, v) for i, v in enumerate(v)} 363 return [rec2(i, v) for i, v in enumerate(v)] 364 return using2(k, v) 365 366 return rec(None, into) if using2 is None else rec2(None, into) 367 368 kvdive = divekv 369 370 def drop(src, *what): 371 if isinstance(src, str): 372 for s in what: 373 src = src.replace(s, '') 374 return src 375 376 def kdrop(src, what): 377 return {k: v for (k, v) in src.items() if not (k in what)} 378 379 if isinstance(src, dict): 380 return kdrop(src, set(what)) 381 382 if isinstance(src, Iterable): 383 what = set(what) 384 return [kdrop(e, what) for e in src if isinstance(e, dict)] 385 386 return None 387 388 dropped = drop 389 390 def join(x, y = ' '): 391 'Join values into a string, or make a dict from keys and values.' 392 393 if isinstance(x, str): 394 return x.join(str(v) for v in y) 395 if isinstance(y, str): 396 return y.join(str(v) for v in x) 397 return {k: v for k, v in zip(x, y)} 398 399 def pick(src, *keys): 400 if isinstance(src, dict): 401 return {k: src.get(k, None) for k in keys} 402 return [{k: e.get(k, None) for k in keys} for e in src if isinstance(e, dict)] 403 404 picked = pick 405 406 def plain(s): 407 'Ignore all ANSI-style sequences in a string.' 408 return re_compile('''\x1b\\[([0-9;]+m|[0-9]*[A-HJKST])''').sub('', s) 409 410 def predicate(x): 411 'Helps various higher-order funcs, by standardizing `predicate` values.' 412 if callable(x): 413 return x 414 if isinstance(x, float): 415 if isnan(x): 416 return lambda y: isinstance(y, float) and isnan(y) 417 if isinf(x): 418 return lambda y: isinstance(y, float) and isinf(y) 419 return lambda y: x == y 420 421 def rescue(attempt, fallback = None): 422 try: 423 return attempt() 424 except BrokenPipeError as e: 425 raise e 426 except Exception as e: 427 if callable(fallback): 428 return fallback(e) 429 return fallback 430 431 rescued = rescue 432 433 def retype(x): 434 'Try to narrow the type of the value given.' 435 436 if isinstance(x, float): 437 n = int(x) 438 return n if float(n) == x else x 439 440 if not isinstance(x, str): 441 return x 442 443 try: 444 return loads(x) 445 except Exception: 446 pass 447 448 try: 449 return int(x) 450 except Exception: 451 pass 452 453 try: 454 return float(x) 455 except Exception: 456 pass 457 458 return x 459 460 autocast = autocasted = mold = molded = recast = recasted = remold = retype 461 remolded = retyped = retype 462 463 def json0(x): 464 return dumps(x, separators=(',', ':'), allow_nan=False, indent=None) 465 466 j0 = json0 467 468 def jsonl(x): 469 if isinstance(x, Skip): 470 return 471 472 def emit(x): 473 return dumps(x, separators=(', ', ': '), allow_nan=False, indent=None) 474 475 if x is None: 476 yield emit(x) 477 return 478 479 if isinstance(x, (bool, int, float, dict, str)): 480 yield emit(x) 481 return 482 483 if isinstance(x, Iterable): 484 for e in x: 485 if isinstance(e, Skip): 486 continue 487 yield emit(x) 488 return 489 490 yield emit(str(x)) 491 492 jl = jsonlines = ndjson = jsonl 493 494 def typeof(x): 495 # return str(type(x)) 496 return { 497 type(None): 'null', 498 bool: 'boolean', 499 dict: 'object', 500 float: 'number', 501 int: 'number', 502 str: 'string', 503 list: 'array', 504 tuple: 'array', 505 }.get(type(x), 'other') 506 507 jstype = typeof 508 509 def wait(seconds, result): 510 'Wait the given number of seconds, before returning its latter arg.' 511 512 if not isinstance(seconds, (int, float)): 513 if isinstance(result, (int, float)): 514 seconds, result = result, seconds 515 sleep(seconds) 516 return result 517 518 delay = wait 519 520 def after(x, what): 521 i = x.find(what) 522 return '' if i < 0 else x[i+len(what):] 523 524 def afterlast(x, what): 525 i = x.rfind(what) 526 return '' if i < 0 else x[i+len(what):] 527 528 afterfinal = afterlast 529 530 def before(x, what): 531 i = x.find(what) 532 return x if i < 0 else x[:i] 533 534 def beforelast(x, what): 535 i = x.rfind(what) 536 return x if i < 0 else x[:i] 537 538 beforefinal = beforelast 539 540 def since(x, what): 541 i = x.find(what) 542 return '' if i < 0 else x[i:] 543 544 def sincelast(x, what): 545 i = x.rfind(what) 546 return '' if i < 0 else x[i:] 547 548 sincefinal = sincelast 549 550 def until(x, what): 551 i = x.find(what) 552 return x if i < 0 else x[:i+len(what)] 553 554 def untilfinal(x, what): 555 i = x.rfind(what) 556 return x if i < 0 else x[:i+len(what)] 557 558 untillast = untilfinal 559 560 def blue(s): 561 return f'\x1b[38;2;0;95;215m{s}\x1b[0m' 562 563 def blueback(s): 564 return f'\x1b[48;2;0;95;215m\x1b[38;2;238;238;238m{s}\x1b[0m' 565 566 bluebg = blueback 567 568 def bold(s): 569 return f'\x1b[1m{s}\x1b[0m' 570 571 bolded = bold 572 573 def gbm(s, good = False, bad = False, meh = False): 574 ''' 575 Good, Bad, Meh ANSI-styles a plain string via ANSI-style sequences, 576 according to 1..3 conditions given as boolean(ish) values: these are 577 checked in order, so the first truish one wins. 578 ''' 579 580 if good: 581 return green(s) 582 if bad: 583 return red(s) 584 if meh: 585 return gray(s) 586 return s 587 588 def gray(s): 589 return f'\x1b[38;2;168;168;168m{s}\x1b[0m' 590 591 def grayback(s): 592 return f'\x1b[48;2;168;168;168m\x1b[38;2;238;238;238m{s}\x1b[0m' 593 594 def green(s): 595 return f'\x1b[38;2;0;135;95m{s}\x1b[0m' 596 597 def greenback(s): 598 return f'\x1b[48;2;0;135;95m\x1b[38;2;238;238;238m{s}\x1b[0m' 599 600 def highlight(s): 601 return f'\x1b[7m{s}\x1b[0m' 602 603 hilite = highlight 604 605 def orange(s): 606 return f'\x1b[38;2;215;95;0m{s}\x1b[0m' 607 608 def orangeback(s): 609 return f'\x1b[48;2;215;95;0m\x1b[38;2;238;238;238m{s}\x1b[0m' 610 611 def purple(s): 612 return f'\x1b[38;2;135;95;255m{s}\x1b[0m' 613 614 def purpleback(s): 615 return f'\x1b[48;2;135;95;255m\x1b[38;2;238;238;238m{s}\x1b[0m' 616 617 def red(s): 618 return f'\x1b[38;2;204;0;0m{s}\x1b[0m' 619 620 def redback(s): 621 return f'\x1b[38;2;204;0;0m\x1b[38;2;238;238;238m{s}\x1b[0m' 622 623 def underline(s): 624 return f'\x1b[4m{s}\x1b[0m' 625 626 underlined = underline 627 628 def message(msg, result = None): 629 print(msg, file=stderr) 630 return result 631 632 msg = message 633 634 # seen is used by func `once` to remember previously-given values 635 seen = set() 636 637 def once(x): 638 if x in seen: 639 return None 640 seen.add(x) 641 return x 642 643 dedup = unique = once 644 645 def utf8(x): 646 try: 647 if isinstance(x, str): 648 x = x.encode('utf-8') 649 return str(x, 'utf-8') 650 except Exception: 651 return None 652 653 def ymdhms(when = None): 654 fmt = f'%Y-%m-%d %H:%M:%S' 655 if isinstance(when, (float, int)): 656 return strftime(fmt, localtime(float(when))) 657 if isinstance(when, tuple): 658 return strftime(fmt, when) 659 return strftime(fmt, localtime()) 660 661 662 cr = '\r' 663 crlf = '\r\n' 664 dquo = dquote = '"' 665 empty = '' 666 lcurly = '{' 667 lf = '\n' 668 rcurly = '}' 669 space = ' ' 670 squo = squote = '\'' 671 tab = '\t' 672 673 nil = none = null = None 674 675 676 exec = None 677 open_utf8 = make_open_utf8(open) 678 open = open_utf8 679 680 no_input_opts = ( 681 '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null', 682 ) 683 modules_opts = ( 684 '-m', '--m', '-mod', '--mod', '-module', '--module', 685 '-modules', '--modules', 686 ) 687 pipe_opts = ('-p', '--p', '-pipe', '--pipe') 688 trace_opts = ('-t', '--t', '-trace', '--trace', '-traceback', '--traceback') 689 690 args = argv[1:] 691 if any(seemsurl(e) for e in args): 692 from io import TextIOWrapper 693 from urllib.request import urlopen 694 695 no_input = False 696 pipe_mode = False 697 trace_errors = False 698 699 while len(args) > 0: 700 if args[0] == '--': 701 args = args[1:] 702 break 703 704 if args[0] in no_input_opts: 705 no_input = True 706 args = args[1:] 707 continue 708 709 if args[0] in pipe_opts: 710 pipe_mode = True 711 args = args[1:] 712 break 713 714 if args[0] in modules_opts: 715 try: 716 if len(args) < 2: 717 msg = 'a module name or a comma-separated list of modules' 718 raise Exception('expected ' + msg) 719 720 g = globals() 721 from importlib import import_module 722 for e in args[1].split(','): 723 g[e] = import_module(e) 724 725 g = None 726 import_module = None 727 args = args[2:] 728 except Exception as e: 729 fail(e, 1) 730 731 continue 732 733 if args[0] in trace_opts: 734 trace_errors = True 735 args = args[1:] 736 continue 737 738 break 739 740 741 try: 742 if pipe_mode: 743 if no_input: 744 raise Exception('can\'t use pipe-mode when input is disabled') 745 steps = [eval(s) for s in args] 746 compile = None 747 eval = None 748 exec = None 749 open = None 750 handle_pipe(stdin, steps) 751 exit(0) 752 753 expr = '.' 754 if len(args) > 0: 755 expr = args[0] 756 args = args[1:] 757 758 if expr == '.' and no_input: 759 print(info.strip(), file=stderr) 760 exit(0) 761 762 if expr == '.': 763 expr = 'line' 764 765 expr = compile(expr, expr, mode='eval') 766 compile = None 767 768 if no_input: 769 handle_no_input(expr) 770 exit(0) 771 772 if len(args) == 0: 773 handle_lines(stdin, expr) 774 exit(0) 775 776 got_stdin = False 777 all_stdin = None 778 dashes = args.count('-') 779 780 for path in args: 781 if path == '-': 782 if dashes > 1: 783 if not got_stdin: 784 all_stdin = [] 785 handle_lines(hold_lines(stdin, all_stdin), expr) 786 got_stdin = True 787 else: 788 handle_lines(all_stdin, expr) 789 else: 790 handle_lines(stdin, expr) 791 continue 792 793 if seemsurl(path): 794 with urlopen(path) as inp: 795 with TextIOWrapper(inp, encoding='utf-8') as txt: 796 handle_lines(txt, expr) 797 continue 798 799 with open_utf8(path) as txt: 800 handle_lines(txt, expr) 801 except BrokenPipeError: 802 # quit quietly, instead of showing a confusing error message 803 stderr.close() 804 exit(0) 805 except KeyboardInterrupt: 806 exit(2) 807 except Exception as e: 808 if trace_errors: 809 raise e 810 else: 811 fail(e, 1)