File: tlp.py 1 #!/usr/bin/python3 2 3 # The MIT License (MIT) 4 # 5 # Copyright © 2026 pacman64 6 # 7 # Permission is hereby granted, free of charge, to any person obtaining a copy 8 # of this software and associated documentation files (the “Software”), to deal 9 # in the Software without restriction, including without limitation the rights 10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 # copies of the Software, and to permit persons to whom the Software is 12 # furnished to do so, subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be included in 15 # all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23 # SOFTWARE. 24 25 26 info = ''' 27 tlp [options...] [python expression] [files/URIs...] 28 29 30 Transform Lines with Python runs a python expression on each line of text 31 input, encoded as UTF-8. Carriage-returns are always ignored in lines, as 32 well as any UTF-8-BOM on the first line of each input. 33 34 The expression can use either `l` or `line` for the current line, and `i` as 35 a 0-based line counter which keeps growing even across input-sources, when 36 given multiple inputs. Also available is `n`, a 1-based line counter which 37 otherwise works the same way. 38 39 Each line is automatically parsed as JSON: when successful, the parsed line 40 is available to the expression as `v`, or `value`, with value `err` set to 41 None, since parsing succeeded; on failure, `v` and `value` are set to None, 42 while `err` has the exception as a value. You can check success/failure by 43 checking if `err` is None, or not. 44 45 Input-sources can be either files or web-URIs. When not given any explicit 46 named sources, the standard input is used. It's even possible to reuse the 47 standard input using multiple single dashes (-) in the order needed: stdin 48 is only read once in this case, and kept for later reuse. 49 50 When the expression results in None, the current input line is ignored. When 51 the expression results in a boolean, its value determines whether each line 52 is emitted to the standard output, or ignored. 53 54 When the expression emits lists, tuples, or generators, each item is emitted 55 as its own line/result. Since empty containers emit no lines, these are the 56 most general type of results, acting as either filters, or input-amplifiers. 57 58 59 Examples 60 61 # numbers from 0 to 5, each on its own output line; no input is read/used 62 tlp = 'range(6)' 63 64 # all powers up to the 4th, using each input line auto-parsed into a `float` 65 tlp = 'range(1, 6)' | tlp '(v**p for p in range(1, 4+1))' 66 67 # separate input lines with an empty line between each; global var `empty` 68 # can be used to avoid bothering with nested shell-quoting 69 tlp = 'range(6)' | tlp '["", l] if i > 0 else l' 70 71 # ignore errors/exceptions, in favor of the original lines/values 72 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), line)' 73 74 # ignore errors/exceptions, calling a fallback func with the exception 75 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), str)' 76 77 # filtering lines out via None values 78 head -c 1024 /dev/urandom | strings | tlp 'l if len(l) < 20 else None' 79 80 # boolean-valued results are concise ways to filter lines out 81 head -c 1024 /dev/urandom | strings | tlp 'len(l) < 20' 82 83 # function/callable results are automatically called on the current line 84 head -c 1024 /dev/urandom | strings | tlp len 85 86 # emit 10 random integers between 1 and 10 87 tlp -m random = '(random.randint(1, 10) for _ in range(10))' 88 89 # emit documentation for collections.defaultdict from the python stdlib 90 tlp = -m collections 'help(collections.defaultdict)' | cat 91 ''' 92 93 94 from itertools import islice 95 from json import dumps, loads 96 from re import compile as compile_uncached, IGNORECASE 97 from sys import argv, exit, stderr, stdin 98 from time import localtime, sleep, strftime 99 from typing import Generator, Iterable 100 101 102 if len(argv) < 2: 103 print(info.strip(), file=stderr) 104 exit(0) 105 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'): 106 print(info.strip()) 107 exit(0) 108 109 110 def handle_no_input(expr): 111 res = eval(expr) 112 if isinstance(res, (list, range, tuple, Generator)): 113 for e in res: 114 e = adapt_result(e, None) 115 if not (e is None): 116 print(e, flush=True) 117 return 118 119 res = adapt_result(res, None) 120 if not (res is None): 121 print(res, flush=True) 122 123 def handle_lines(src, expr): 124 # `comprehension` expressions seem to ignore local variables: even 125 # lambda-based workarounds fail 126 global i, n, l, line, v, val, value, e, err, error 127 128 i = 0 129 n = 1 130 e = err = error = None 131 132 for l in src: 133 l = l.rstrip('\r\n').rstrip('\n') 134 if i == 0: 135 l = l.lstrip('\xef\xbb\xbf') 136 137 line = l 138 try: 139 e = err = error = None 140 v = val = value = loads(l) 141 except BrokenPipeError as ex: 142 raise ex 143 except Exception as ex: 144 e = err = error = ex 145 v = val = value = Skip() 146 res = eval(expr) 147 i += 1 148 n += 1 149 150 if isinstance(res, (list, range, tuple, Generator)): 151 for e in res: 152 e = adapt_result(e, None) 153 if not (e is None): 154 print(e, flush=True) 155 continue 156 157 res = adapt_result(res, line) 158 if not (res is None): 159 print(res, flush=True) 160 161 def handle_pipe(src, funcs): 162 # `comprehension` expressions seem to ignore local variables: even 163 # lambda-based workarounds fail 164 global i, n, l, line, v, val, value, e, err, error 165 # variable names `o` and `p` work like in the `pyp` tool, except 166 # the pipeline steps were given as separate cmd-line arguments 167 global o, p 168 169 i = 0 170 n = 1 171 e = err = error = None 172 173 for l in src: 174 l = l.rstrip('\r\n').rstrip('\n') 175 if i == 0: 176 l = l.lstrip('\xef\xbb\xbf') 177 178 line = l 179 o = p = prev = line 180 # seen is used by func `once` to remember previously-given values 181 seen.clear() 182 183 try: 184 e = err = error = None 185 v = val = value = loads(l) 186 except BrokenPipeError as e: 187 raise e 188 except Exception as ex: 189 e = err = error = ex 190 v = val = value = Skip() 191 192 for f in funcs: 193 p = f(p) 194 if callable(p): 195 p = p(prev) 196 prev = p 197 198 res = p 199 i += 1 200 n += 1 201 202 if isinstance(res, (list, range, tuple, Generator)): 203 for e in res: 204 e = adapt_result(e, None) 205 if not (e is None): 206 print(e, flush=True) 207 continue 208 209 res = adapt_result(res, line) 210 if not (res is None): 211 print(res, flush=True) 212 213 def hold_lines(src, lines): 214 for e in src: 215 lines.append(e) 216 yield e 217 218 def adapt_result(res, fallback): 219 if isinstance(res, BaseException): 220 raise res 221 if isinstance(res, Skip) or res is None or res is False: 222 return None 223 if callable(res): 224 return res(fallback) 225 if res is True: 226 return fallback 227 if isinstance(res, dict): 228 return dumps(res, allow_nan=False) 229 return str(res) 230 231 def fail(msg, code = 1): 232 print(str(msg), file=stderr) 233 exit(code) 234 235 def make_open_utf8(open): 236 def open_utf8_readonly(path): 237 return open(path, encoding='utf-8') 238 return open_utf8_readonly 239 240 def seemsurl(path): 241 protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') 242 return any(path.startswith(p) for p in protocols) 243 244 class Skip: 245 pass 246 247 skip = Skip() 248 249 def chunk(items, chunk_size): 250 'Break iterable into chunks, each with up to the item-count given.' 251 252 if isinstance(items, str): 253 n = len(items) 254 while n >= chunk_size: 255 yield items[:chunk_size] 256 items = items[chunk_size:] 257 n -= chunk_size 258 if n > 0: 259 yield items 260 return 261 262 if not isinstance(chunk_size, int): 263 raise Exception('non-integer chunk-size') 264 if chunk_size < 1: 265 raise Exception('non-positive chunk-size') 266 267 it = iter(items) 268 while True: 269 head = tuple(islice(it, chunk_size)) 270 if not head: 271 return 272 yield head 273 274 chunked = chunk 275 276 # re_cache is used by custom func compile to cache previously-compiled 277 # regular-expressions, which makes them quicker to (re)use in formulas 278 re_cache = {} 279 280 def re_compile(expr, flags = 0): 281 'Speed-up using regexes across lines, by avoiding recompilations.' 282 283 if flags in re_cache: 284 cache = re_cache[flags] 285 else: 286 cache = {} 287 re_cache[flags] = cache 288 if expr in cache: 289 return cache[expr] 290 291 pat = compile_uncached(expr, flags) 292 cache[expr] = pat 293 return pat 294 295 def icompile(expr): 296 return re_compile(expr, IGNORECASE) 297 298 def cond(*args): 299 if len(args) == 0: 300 return None 301 302 for i, e in enumerate(args): 303 if i % 2 == 0 and i < len(args) - 1 and e: 304 return args[i + 1] 305 306 return args[-1] if len(args) % 2 == 1 else None 307 308 def dive(into, using): 309 'Depth-first recursive caller for 1-input functions.' 310 311 if callable(into): 312 into, using = using, into 313 314 def rec(v): 315 if isinstance(v, dict): 316 return {k: rec(v) for k, v in v.items()} 317 if isinstance(v, Iterable) and not isinstance(v, str): 318 return [rec(v) for v in v] 319 return using(v) 320 321 return rec(into) 322 323 def divekeys(into, using): 324 'Depth-first recursive caller for 2-input funcs which rename dict keys.' 325 326 if callable(into): 327 into, using = using, into 328 329 def rec(v): 330 if isinstance(v, dict): 331 return {using(k): rec(v) for k, v in v.items()} 332 if isinstance(v, Iterable) and not isinstance(v, str): 333 return [rec(v) for i, v in enumerate(v)] 334 return v 335 336 return rec(None, into) 337 338 def divekv(into, using, using2 = None): 339 'Depth-first recursive caller for 2-input functions.' 340 341 if using2 is None: 342 if callable(into): 343 into, using = using, into 344 else: 345 if not callable(using2): 346 into, using, using2 = using2, into, using 347 348 def rec(k, v): 349 if isinstance(v, dict): 350 return {k: rec(k, v) for k, v in v.items()} 351 if isinstance(v, Iterable) and not isinstance(v, str): 352 return [rec(i, v) for i, v in enumerate(v)] 353 return using(k, v) 354 355 def rec2(k, v): 356 if isinstance(v, dict): 357 return {str(using(k, v)): rec2(k, v) for k, v in v.items()} 358 if isinstance(v, Iterable) and not isinstance(v, str): 359 # return {str(using(i, v)): rec2(i, v) for i, v in enumerate(v)} 360 return [rec2(i, v) for i, v in enumerate(v)] 361 return using2(k, v) 362 363 return rec(None, into) if using2 is None else rec2(None, into) 364 365 kvdive = divekv 366 367 def drop(src, *what): 368 if isinstance(src, str): 369 for s in what: 370 src = src.replace(s, '') 371 return src 372 373 def kdrop(src, what): 374 return {k: v for (k, v) in src.items() if not (k in what)} 375 376 if isinstance(src, dict): 377 return kdrop(src, set(what)) 378 379 if isinstance(src, Iterable): 380 what = set(what) 381 return [kdrop(e, what) for e in src if isinstance(e, dict)] 382 383 return None 384 385 dropped = drop 386 387 def join(x, y = ' '): 388 'Join values into a string, or make a dict from keys and values.' 389 390 if isinstance(x, str): 391 return x.join(str(v) for v in y) 392 if isinstance(y, str): 393 return y.join(str(v) for v in x) 394 return {k: v for k, v in zip(x, y)} 395 396 def pick(src, *keys): 397 if isinstance(src, dict): 398 return {k: src.get(k, None) for k in keys} 399 return [{k: e.get(k, None) for k in keys} for e in src if isinstance(e, dict)] 400 401 picked = pick 402 403 def rescue(attempt, fallback = None): 404 try: 405 return attempt() 406 except BrokenPipeError as e: 407 raise e 408 except Exception as e: 409 if callable(fallback): 410 return fallback(e) 411 return fallback 412 413 rescued = rescue 414 415 def retype(x): 416 'Try to narrow the type of the value given.' 417 418 if isinstance(x, float): 419 n = int(x) 420 return n if float(n) == x else x 421 422 if not isinstance(x, str): 423 return x 424 425 try: 426 return loads(x) 427 except Exception: 428 pass 429 430 try: 431 return int(x) 432 except Exception: 433 pass 434 435 try: 436 return float(x) 437 except Exception: 438 pass 439 440 return x 441 442 autocast = autocasted = mold = molded = recast = recasted = remold = retype 443 remolded = retyped = retype 444 445 def json0(x): 446 return dumps(x, separators=(',', ':'), allow_nan=False, indent=None) 447 448 j0 = json0 449 450 def jsonl(x): 451 if isinstance(x, Skip): 452 return 453 454 def emit(x): 455 return dumps(x, separators=(', ', ': '), allow_nan=False, indent=None) 456 457 if x is None: 458 yield emit(x) 459 return 460 461 if isinstance(x, (bool, int, float, dict, str)): 462 yield emit(x) 463 return 464 465 if isinstance(x, Iterable): 466 for e in x: 467 if isinstance(e, Skip): 468 continue 469 yield emit(x) 470 return 471 472 yield emit(str(x)) 473 474 jl = jsonlines = ndjson = jsonl 475 476 def typeof(x): 477 # return str(type(x)) 478 return { 479 type(None): 'null', 480 bool: 'boolean', 481 dict: 'object', 482 float: 'number', 483 int: 'number', 484 str: 'string', 485 list: 'array', 486 tuple: 'array', 487 }.get(type(x), 'other') 488 489 jstype = typeof 490 491 def wait(seconds, result): 492 'Wait the given number of seconds, before returning its latter arg.' 493 494 t = (int, float) 495 if (not isinstance(seconds, t)) and isinstance(result, t): 496 seconds, result = result, seconds 497 sleep(seconds) 498 return result 499 500 delay = wait 501 502 def after(x, what): 503 i = x.find(what) 504 return '' if i < 0 else x[i+len(what):] 505 506 def afterlast(x, what): 507 i = x.rfind(what) 508 return '' if i < 0 else x[i+len(what):] 509 510 afterfinal = afterlast 511 512 def before(x, what): 513 i = x.find(what) 514 return x if i < 0 else x[:i] 515 516 def beforelast(x, what): 517 i = x.rfind(what) 518 return x if i < 0 else x[:i] 519 520 beforefinal = beforelast 521 522 def since(x, what): 523 i = x.find(what) 524 return '' if i < 0 else x[i:] 525 526 def sincelast(x, what): 527 i = x.rfind(what) 528 return '' if i < 0 else x[i:] 529 530 sincefinal = sincelast 531 532 def until(x, what): 533 i = x.find(what) 534 return x if i < 0 else x[:i+len(what)] 535 536 def untilfinal(x, what): 537 i = x.rfind(what) 538 return x if i < 0 else x[:i+len(what)] 539 540 untillast = untilfinal 541 542 def blue(s): 543 return f'\x1b[38;2;0;95;215m{s}\x1b[0m' 544 545 def blueback(s): 546 return f'\x1b[48;2;0;95;215m\x1b[38;2;238;238;238m{s}\x1b[0m' 547 548 bluebg = blueback 549 550 def bold(s): 551 return f'\x1b[1m{s}\x1b[0m' 552 553 bolded = bold 554 555 def gbm(s, good = False, bad = False, meh = False): 556 ''' 557 Good, Bad, Meh ANSI-styles a plain string via ANSI-style sequences, 558 according to 1..3 conditions given as boolean(ish) values: these are 559 checked in order, so the first truish one wins. 560 ''' 561 562 if good: 563 return green(s) 564 if bad: 565 return red(s) 566 if meh: 567 return gray(s) 568 return s 569 570 def gray(s): 571 return f'\x1b[38;2;168;168;168m{s}\x1b[0m' 572 573 def grayback(s): 574 return f'\x1b[48;2;168;168;168m\x1b[38;2;238;238;238m{s}\x1b[0m' 575 576 def green(s): 577 return f'\x1b[38;2;0;135;95m{s}\x1b[0m' 578 579 def greenback(s): 580 return f'\x1b[48;2;0;135;95m\x1b[38;2;238;238;238m{s}\x1b[0m' 581 582 def highlight(s): 583 return f'\x1b[7m{s}\x1b[0m' 584 585 hilite = highlight 586 587 def orange(s): 588 return f'\x1b[38;2;215;95;0m{s}\x1b[0m' 589 590 def orangeback(s): 591 return f'\x1b[48;2;215;95;0m\x1b[38;2;238;238;238m{s}\x1b[0m' 592 593 def purple(s): 594 return f'\x1b[38;2;135;95;255m{s}\x1b[0m' 595 596 def purpleback(s): 597 return f'\x1b[48;2;135;95;255m\x1b[38;2;238;238;238m{s}\x1b[0m' 598 599 def red(s): 600 return f'\x1b[38;2;204;0;0m{s}\x1b[0m' 601 602 def redback(s): 603 return f'\x1b[38;2;204;0;0m\x1b[38;2;238;238;238m{s}\x1b[0m' 604 605 def underline(s): 606 return f'\x1b[4m{s}\x1b[0m' 607 608 underlined = underline 609 610 def message(msg, result = None): 611 print(msg, file=stderr) 612 return result 613 614 msg = message 615 616 # seen is used by func `once` to remember previously-given values 617 seen = set() 618 619 def once(x): 620 if x in seen: 621 return None 622 seen.add(x) 623 return x 624 625 dedup = unique = once 626 627 def utf8(x): 628 try: 629 if isinstance(x, str): 630 x = x.encode('utf-8') 631 return str(x, 'utf-8') 632 except Exception: 633 return None 634 635 def ymdhms(when = None): 636 fmt = f'%Y-%m-%d %H:%M:%S' 637 if isinstance(when, (float, int)): 638 return strftime(fmt, localtime(float(when))) 639 if isinstance(when, tuple): 640 return strftime(fmt, when) 641 return strftime(fmt, localtime()) 642 643 644 cr = '\r' 645 crlf = '\r\n' 646 dquo = dquote = '"' 647 empty = '' 648 lcurly = '{' 649 lf = '\n' 650 rcurly = '}' 651 space = ' ' 652 squo = squote = '\'' 653 tab = '\t' 654 655 nil = none = null = None 656 657 658 exec = None 659 open_utf8 = make_open_utf8(open) 660 open = open_utf8 661 662 no_input_opts = ( 663 '=', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null', 664 ) 665 modules_opts = ( 666 '-m', '--m', '-mod', '--mod', '-module', '--module', 667 '-modules', '--modules', 668 ) 669 pipe_opts = ('-p', '--p', '-pipe', '--pipe') 670 trace_opts = ('-t', '--t', '-trace', '--trace', '-traceback', '--traceback') 671 672 args = argv[1:] 673 if any(seemsurl(e) for e in args): 674 from io import TextIOWrapper 675 from urllib.request import urlopen 676 677 no_input = False 678 pipe_mode = False 679 trace_errors = False 680 681 while len(args) > 0: 682 if args[0] == '--': 683 args = args[1:] 684 break 685 686 if args[0] in no_input_opts: 687 no_input = True 688 args = args[1:] 689 continue 690 691 if args[0] in pipe_opts: 692 pipe_mode = True 693 args = args[1:] 694 break 695 696 if args[0] in modules_opts: 697 try: 698 if len(args) < 2: 699 msg = 'a module name or a comma-separated list of modules' 700 raise Exception('expected ' + msg) 701 702 g = globals() 703 from importlib import import_module 704 for e in args[1].split(','): 705 g[e] = import_module(e) 706 707 g = None 708 import_module = None 709 args = args[2:] 710 except Exception as e: 711 fail(e, 1) 712 713 continue 714 715 if args[0] in trace_opts: 716 trace_errors = True 717 args = args[1:] 718 continue 719 720 break 721 722 723 try: 724 if pipe_mode: 725 if no_input: 726 raise Exception('can\'t use pipe-mode when input is disabled') 727 steps = [eval(s) for s in args] 728 compile = None 729 eval = None 730 exec = None 731 open = None 732 handle_pipe(stdin, steps) 733 exit(0) 734 735 expr = '.' 736 if len(args) > 0: 737 expr = args[0] 738 args = args[1:] 739 740 if expr == '.' and no_input: 741 print(info.strip(), file=stderr) 742 exit(0) 743 744 if expr == '.': 745 expr = 'line' 746 747 expr = compile(expr, expr, mode='eval') 748 compile = None 749 750 if no_input: 751 handle_no_input(expr) 752 exit(0) 753 754 if len(args) == 0: 755 handle_lines(stdin, expr) 756 exit(0) 757 758 got_stdin = False 759 all_stdin = None 760 dashes = args.count('-') 761 762 for path in args: 763 if path == '-': 764 if dashes > 1: 765 if not got_stdin: 766 all_stdin = [] 767 handle_lines(hold_lines(stdin, all_stdin), expr) 768 got_stdin = True 769 else: 770 handle_lines(all_stdin, expr) 771 else: 772 handle_lines(stdin, expr) 773 continue 774 775 if seemsurl(path): 776 with urlopen(path) as inp: 777 with TextIOWrapper(inp, encoding='utf-8') as txt: 778 handle_lines(txt, expr) 779 continue 780 781 with open_utf8(path) as txt: 782 handle_lines(txt, expr) 783 except BrokenPipeError: 784 # quit quietly, instead of showing a confusing error message 785 stderr.close() 786 exit(0) 787 except KeyboardInterrupt: 788 exit(2) 789 except Exception as e: 790 if trace_errors: 791 raise e 792 else: 793 fail(e, 1)