File: tlp.py 1 #!/usr/bin/python3 2 3 # The MIT License (MIT) 4 # 5 # Copyright © 2026 pacman64 6 # 7 # Permission is hereby granted, free of charge, to any person obtaining a copy 8 # of this software and associated documentation files (the “Software”), to deal 9 # in the Software without restriction, including without limitation the rights 10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 # copies of the Software, and to permit persons to whom the Software is 12 # furnished to do so, subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be included in 15 # all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23 # SOFTWARE. 24 25 26 info = ''' 27 tlp [options...] [python expression] [files/URIs...] 28 29 30 Transform Lines with Python runs a python expression on each line of text 31 input, encoded as UTF-8. Carriage-returns are always ignored in lines, as 32 well as any UTF-8-BOM on the first line of each input. 33 34 The expression can use either `l` or `line` for the current line, and `i` as 35 a 0-based line counter which keeps growing even across input-sources, when 36 given multiple inputs. Also available is `n`, a 1-based line counter which 37 otherwise works the same way. 38 39 Each line is automatically parsed as JSON: when successful, the parsed line 40 is available to the expression as `v`, or `value`, with value `err` set to 41 None, since parsing succeeded; on failure, `v` and `value` are set to None, 42 while `err` has the exception as a value. You can check success/failure by 43 checking if `err` is None, or not. 44 45 Input-sources can be either files or web-URIs. When not given any explicit 46 named sources, the standard input is used. It's even possible to reuse the 47 standard input using multiple single dashes (-) in the order needed: stdin 48 is only read once in this case, and kept for later reuse. 49 50 When the expression results in None, the current input line is ignored. When 51 the expression results in a boolean, its value determines whether each line 52 is emitted to the standard output, or ignored. 53 54 When the expression emits lists, tuples, or generators, each item is emitted 55 as its own line/result. Since empty containers emit no lines, these are the 56 most general type of results, acting as either filters, or input-amplifiers. 57 58 59 Examples 60 61 # numbers from 0 to 5, each on its own output line; no input is read/used 62 tlp = 'range(6)' 63 64 # all powers up to the 4th, using each input line auto-parsed into a `float` 65 tlp = 'range(1, 6)' | tlp '(v**p for p in range(1, 4+1))' 66 67 # separate input lines with an empty line between each; global var `empty` 68 # can be used to avoid bothering with nested shell-quoting 69 tlp = 'range(6)' | tlp '["", l] if i > 0 else l' 70 71 # ignore errors/exceptions, in favor of the original lines/values 72 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), line)' 73 74 # ignore errors/exceptions, calling a fallback func with the exception 75 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), str)' 76 77 # filtering lines out via None values 78 head -c 1024 /dev/urandom | strings | tlp 'l if len(l) < 20 else None' 79 80 # boolean-valued results are concise ways to filter lines out 81 head -c 1024 /dev/urandom | strings | tlp 'len(l) < 20' 82 83 # function/callable results are automatically called on the current line 84 head -c 1024 /dev/urandom | strings | tlp len 85 86 # emit 10 random integers between 1 and 10 87 tlp -m random = '(random.randint(1, 10) for _ in range(10))' 88 89 # emit documentation for collections.defaultdict from the python stdlib 90 tlp = -m collections 'help(collections.defaultdict)' | cat 91 ''' 92 93 94 from itertools import islice 95 from json import dumps, loads 96 from re import compile as compile_uncached, IGNORECASE 97 from sys import argv, exit, stderr, stdin 98 from time import localtime, sleep, strftime 99 from typing import Generator, Iterable 100 101 102 if len(argv) < 2: 103 print(info.strip(), file=stderr) 104 exit(0) 105 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'): 106 print(info.strip()) 107 exit(0) 108 109 110 def handle_no_input(expr): 111 res = eval(expr) 112 if isinstance(res, (list, range, tuple, Generator)): 113 for e in res: 114 e = adapt_result(e, None) 115 if not (e is None): 116 print(e, flush=True) 117 return 118 119 res = adapt_result(res, None) 120 if not (res is None): 121 print(res, flush=True) 122 123 def handle_lines(src, expr): 124 # `comprehension` expressions seem to ignore local variables: even 125 # lambda-based workarounds fail 126 global i, n, l, line, v, val, value, e, err, error 127 128 i = 0 129 n = 1 130 e = err = error = None 131 132 for l in src: 133 l = l.rstrip('\r\n').rstrip('\n') 134 if i == 0: 135 l = l.lstrip('\xef\xbb\xbf') 136 137 line = l 138 try: 139 e = err = error = None 140 v = val = value = loads(l) 141 except Exception as ex: 142 e = err = error = ex 143 v = val = value = Skip() 144 res = eval(expr) 145 i += 1 146 n += 1 147 148 if isinstance(res, (list, range, tuple, Generator)): 149 for e in res: 150 e = adapt_result(e, None) 151 if not (e is None): 152 print(e, flush=True) 153 continue 154 155 res = adapt_result(res, line) 156 if not (res is None): 157 print(res, flush=True) 158 159 def handle_pipe(src, funcs): 160 # `comprehension` expressions seem to ignore local variables: even 161 # lambda-based workarounds fail 162 global i, n, l, line, v, val, value, e, err, error 163 # variable names `o` and `p` work like in the `pyp` tool, except 164 # the pipeline steps were given as separate cmd-line arguments 165 global o, p 166 167 i = 0 168 n = 1 169 e = err = error = None 170 171 for l in src: 172 l = l.rstrip('\r\n').rstrip('\n') 173 if i == 0: 174 l = l.lstrip('\xef\xbb\xbf') 175 176 line = l 177 o = p = prev = line 178 # seen is used by func `once` to remember previously-given values 179 seen.clear() 180 181 try: 182 e = err = error = None 183 v = val = value = loads(l) 184 except Exception as ex: 185 e = err = error = ex 186 v = val = value = Skip() 187 188 for f in funcs: 189 p = f(p) 190 if callable(p): 191 p = p(prev) 192 prev = p 193 194 res = p 195 i += 1 196 n += 1 197 198 if isinstance(res, (list, range, tuple, Generator)): 199 for e in res: 200 e = adapt_result(e, None) 201 if not (e is None): 202 print(e, flush=True) 203 continue 204 205 res = adapt_result(res, line) 206 if not (res is None): 207 print(res, flush=True) 208 209 def hold_lines(src, lines): 210 for e in src: 211 lines.append(e) 212 yield e 213 214 def adapt_result(res, fallback): 215 if isinstance(res, BaseException): 216 raise res 217 if isinstance(res, Skip) or res is None or res is False: 218 return None 219 if callable(res): 220 return res(fallback) 221 if res is True: 222 return fallback 223 if isinstance(res, dict): 224 return dumps(res, allow_nan=False) 225 return str(res) 226 227 def fail(msg, code = 1): 228 print(str(msg), file=stderr) 229 exit(code) 230 231 def make_open_utf8(open): 232 def open_utf8_readonly(path): 233 return open(path, encoding='utf-8') 234 return open_utf8_readonly 235 236 def seemsurl(path): 237 protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') 238 return any(path.startswith(p) for p in protocols) 239 240 class Skip: 241 pass 242 243 skip = Skip() 244 245 def chunk(items, chunk_size): 246 'Break iterable into chunks, each with up to the item-count given.' 247 248 if isinstance(items, str): 249 n = len(items) 250 while n >= chunk_size: 251 yield items[:chunk_size] 252 items = items[chunk_size:] 253 n -= chunk_size 254 if n > 0: 255 yield items 256 return 257 258 if not isinstance(chunk_size, int): 259 raise Exception('non-integer chunk-size') 260 if chunk_size < 1: 261 raise Exception('non-positive chunk-size') 262 263 it = iter(items) 264 while True: 265 head = tuple(islice(it, chunk_size)) 266 if not head: 267 return 268 yield head 269 270 chunked = chunk 271 272 # re_cache is used by custom func compile to cache previously-compiled 273 # regular-expressions, which makes them quicker to (re)use in formulas 274 re_cache = {} 275 276 def re_compile(expr, flags = 0): 277 'Speed-up using regexes across lines, by avoiding recompilations.' 278 279 if flags in re_cache: 280 cache = re_cache[flags] 281 else: 282 cache = {} 283 re_cache[flags] = cache 284 if expr in cache: 285 return cache[expr] 286 287 pat = compile_uncached(expr, flags) 288 cache[expr] = pat 289 return pat 290 291 def icompile(expr): 292 return re_compile(expr, IGNORECASE) 293 294 def cond(*args): 295 if len(args) == 0: 296 return None 297 298 for i, e in enumerate(args): 299 if i % 2 == 0 and i < len(args) - 1 and e: 300 return args[i + 1] 301 302 return args[-1] if len(args) % 2 == 1 else None 303 304 def dive(into, using): 305 'Depth-first recursive caller for 1-input functions.' 306 307 if callable(into): 308 into, using = using, into 309 310 def rec(v): 311 if isinstance(v, dict): 312 return {k: rec(v) for k, v in v.items()} 313 if isinstance(v, Iterable) and not isinstance(v, str): 314 return [rec(v) for v in v] 315 return using(v) 316 317 return rec(into) 318 319 def divekeys(into, using): 320 'Depth-first recursive caller for 2-input funcs which rename dict keys.' 321 322 if callable(into): 323 into, using = using, into 324 325 def rec(v): 326 if isinstance(v, dict): 327 return {using(k): rec(v) for k, v in v.items()} 328 if isinstance(v, Iterable) and not isinstance(v, str): 329 return [rec(v) for i, v in enumerate(v)] 330 return v 331 332 return rec(None, into) 333 334 def divekv(into, using, using2 = None): 335 'Depth-first recursive caller for 2-input functions.' 336 337 if using2 is None: 338 if callable(into): 339 into, using = using, into 340 else: 341 if not callable(using2): 342 into, using, using2 = using2, into, using 343 344 def rec(k, v): 345 if isinstance(v, dict): 346 return {k: rec(k, v) for k, v in v.items()} 347 if isinstance(v, Iterable) and not isinstance(v, str): 348 return [rec(i, v) for i, v in enumerate(v)] 349 return using(k, v) 350 351 def rec2(k, v): 352 if isinstance(v, dict): 353 return {str(using(k, v)): rec2(k, v) for k, v in v.items()} 354 if isinstance(v, Iterable) and not isinstance(v, str): 355 # return {str(using(i, v)): rec2(i, v) for i, v in enumerate(v)} 356 return [rec2(i, v) for i, v in enumerate(v)] 357 return using2(k, v) 358 359 return rec(None, into) if using2 is None else rec2(None, into) 360 361 kvdive = divekv 362 363 def drop(src, *what): 364 if isinstance(src, str): 365 for s in what: 366 src = src.replace(s, '') 367 return src 368 369 def kdrop(src, what): 370 return {k: v for (k, v) in src.items() if not (k in what)} 371 372 if isinstance(src, dict): 373 return kdrop(src, set(what)) 374 375 if isinstance(src, Iterable): 376 what = set(what) 377 return [kdrop(e, what) for e in src if isinstance(e, dict)] 378 379 return None 380 381 dropped = drop 382 383 def join(x, y = ' '): 384 'Join values into a string, or make a dict from keys and values.' 385 386 if isinstance(x, str): 387 return x.join(str(v) for v in y) 388 if isinstance(y, str): 389 return y.join(str(v) for v in x) 390 return {k: v for k, v in zip(x, y)} 391 392 def pick(src, *keys): 393 if isinstance(src, dict): 394 return {k: src.get(k, None) for k in keys} 395 return [{k: e.get(k, None) for k in keys} for e in src if isinstance(e, dict)] 396 397 picked = pick 398 399 def rescue(attempt, fallback = None): 400 try: 401 return attempt() 402 except Exception as e: 403 if callable(fallback): 404 return fallback(e) 405 return fallback 406 407 rescued = rescue 408 409 def retype(x): 410 'Try to narrow the type of the value given.' 411 412 if isinstance(x, float): 413 n = int(x) 414 return n if float(n) == x else x 415 416 if not isinstance(x, str): 417 return x 418 419 try: 420 return loads(x) 421 except Exception: 422 pass 423 424 try: 425 return int(x) 426 except Exception: 427 pass 428 429 try: 430 return float(x) 431 except Exception: 432 pass 433 434 return x 435 436 autocast = autocasted = mold = molded = recast = recasted = remold = retype 437 remolded = retyped = retype 438 439 def json0(x): 440 return dumps(x, separators=(',', ':'), allow_nan=False, indent=None) 441 442 j0 = json0 443 444 def jsonl(x): 445 if isinstance(x, Skip): 446 return 447 448 def emit(x): 449 return dumps(x, separators=(', ', ': '), allow_nan=False, indent=None) 450 451 if x is None: 452 yield emit(x) 453 return 454 455 if isinstance(x, (bool, int, float, dict, str)): 456 yield emit(x) 457 return 458 459 if isinstance(x, Iterable): 460 for e in x: 461 if isinstance(e, Skip): 462 continue 463 yield emit(x) 464 return 465 466 yield emit(str(x)) 467 468 jl = jsonlines = ndjson = jsonl 469 470 def typeof(x): 471 # return str(type(x)) 472 return { 473 type(None): 'null', 474 bool: 'boolean', 475 dict: 'object', 476 float: 'number', 477 int: 'number', 478 str: 'string', 479 list: 'array', 480 tuple: 'array', 481 }.get(type(x), 'other') 482 483 jstype = typeof 484 485 def wait(seconds, result): 486 'Wait the given number of seconds, before returning its latter arg.' 487 488 t = (int, float) 489 if (not isinstance(seconds, t)) and isinstance(result, t): 490 seconds, result = result, seconds 491 sleep(seconds) 492 return result 493 494 delay = wait 495 496 def after(x, what): 497 i = x.find(what) 498 return '' if i < 0 else x[i+len(what):] 499 500 def afterlast(x, what): 501 i = x.rfind(what) 502 return '' if i < 0 else x[i+len(what):] 503 504 afterfinal = afterlast 505 506 def before(x, what): 507 i = x.find(what) 508 return x if i < 0 else x[:i] 509 510 def beforelast(x, what): 511 i = x.rfind(what) 512 return x if i < 0 else x[:i] 513 514 beforefinal = beforelast 515 516 def since(x, what): 517 i = x.find(what) 518 return '' if i < 0 else x[i:] 519 520 def sincelast(x, what): 521 i = x.rfind(what) 522 return '' if i < 0 else x[i:] 523 524 sincefinal = sincelast 525 526 def until(x, what): 527 i = x.find(what) 528 return x if i < 0 else x[:i+len(what)] 529 530 def untilfinal(x, what): 531 i = x.rfind(what) 532 return x if i < 0 else x[:i+len(what)] 533 534 untillast = untilfinal 535 536 def blue(s): 537 return f'\x1b[38;2;0;95;215m{s}\x1b[0m' 538 539 def blueback(s): 540 return f'\x1b[48;2;0;95;215m\x1b[38;2;238;238;238m{s}\x1b[0m' 541 542 bluebg = blueback 543 544 def bold(s): 545 return f'\x1b[1m{s}\x1b[0m' 546 547 bolded = bold 548 549 def gbm(s, good = False, bad = False, meh = False): 550 ''' 551 Good, Bad, Meh ANSI-styles a plain string via ANSI-style sequences, 552 according to 1..3 conditions given as boolean(ish) values: these are 553 checked in order, so the first truish one wins. 554 ''' 555 556 if good: 557 return green(s) 558 if bad: 559 return red(s) 560 if meh: 561 return gray(s) 562 return s 563 564 def gray(s): 565 return f'\x1b[38;2;168;168;168m{s}\x1b[0m' 566 567 def grayback(s): 568 return f'\x1b[48;2;168;168;168m\x1b[38;2;238;238;238m{s}\x1b[0m' 569 570 def green(s): 571 return f'\x1b[38;2;0;135;95m{s}\x1b[0m' 572 573 def greenback(s): 574 return f'\x1b[48;2;0;135;95m\x1b[38;2;238;238;238m{s}\x1b[0m' 575 576 def highlight(s): 577 return f'\x1b[7m{s}\x1b[0m' 578 579 hilite = highlight 580 581 def orange(s): 582 return f'\x1b[38;2;215;95;0m{s}\x1b[0m' 583 584 def orangeback(s): 585 return f'\x1b[48;2;215;95;0m\x1b[38;2;238;238;238m{s}\x1b[0m' 586 587 def purple(s): 588 return f'\x1b[38;2;135;95;255m{s}\x1b[0m' 589 590 def purpleback(s): 591 return f'\x1b[48;2;135;95;255m\x1b[38;2;238;238;238m{s}\x1b[0m' 592 593 def red(s): 594 return f'\x1b[38;2;204;0;0m{s}\x1b[0m' 595 596 def redback(s): 597 return f'\x1b[38;2;204;0;0m\x1b[38;2;238;238;238m{s}\x1b[0m' 598 599 def underline(s): 600 return f'\x1b[4m{s}\x1b[0m' 601 602 underlined = underline 603 604 def message(msg, result = None): 605 print(msg, file=stderr) 606 return result 607 608 msg = message 609 610 # seen is used by func `once` to remember previously-given values 611 seen = set() 612 613 def once(x): 614 if x in seen: 615 return None 616 seen.add(x) 617 return x 618 619 dedup = unique = once 620 621 def utf8(x): 622 try: 623 if isinstance(x, str): 624 x = x.encode('utf-8') 625 return str(x, 'utf-8') 626 except Exception: 627 return None 628 629 def ymdhms(when = None): 630 fmt = f'%Y-%m-%d %H:%M:%S' 631 if isinstance(when, (float, int)): 632 return strftime(fmt, localtime(float(when))) 633 if isinstance(when, tuple): 634 return strftime(fmt, when) 635 return strftime(fmt, localtime()) 636 637 638 cr = '\r' 639 crlf = '\r\n' 640 dquo = dquote = '"' 641 empty = '' 642 lcurly = '{' 643 lf = '\n' 644 rcurly = '}' 645 space = ' ' 646 squo = squote = '\'' 647 tab = '\t' 648 649 nil = none = null = None 650 651 652 exec = None 653 open_utf8 = make_open_utf8(open) 654 open = open_utf8 655 656 no_input_opts = ( 657 '=', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null', 658 ) 659 modules_opts = ( 660 '-m', '--m', '-mod', '--mod', '-module', '--module', 661 '-modules', '--modules', 662 ) 663 pipe_opts = ('-p', '--p', '-pipe', '--pipe') 664 trace_opts = ('-t', '--t', '-trace', '--trace', '-traceback', '--traceback') 665 666 args = argv[1:] 667 if any(seemsurl(e) for e in args): 668 from io import TextIOWrapper 669 from urllib.request import urlopen 670 671 no_input = False 672 pipe_mode = False 673 trace_errors = False 674 675 while len(args) > 0: 676 if args[0] == '--': 677 args = args[1:] 678 break 679 680 if args[0] in no_input_opts: 681 no_input = True 682 args = args[1:] 683 continue 684 685 if args[0] in pipe_opts: 686 pipe_mode = True 687 args = args[1:] 688 break 689 690 if args[0] in modules_opts: 691 try: 692 if len(args) < 2: 693 msg = 'a module name or a comma-separated list of modules' 694 raise Exception('expected ' + msg) 695 696 g = globals() 697 from importlib import import_module 698 for e in args[1].split(','): 699 g[e] = import_module(e) 700 701 g = None 702 import_module = None 703 args = args[2:] 704 except Exception as e: 705 fail(e, 1) 706 707 continue 708 709 if args[0] in trace_opts: 710 trace_errors = True 711 args = args[1:] 712 continue 713 714 break 715 716 717 try: 718 if pipe_mode: 719 if no_input: 720 raise Exception('can\'t use pipe-mode when input is disabled') 721 steps = [eval(s) for s in args] 722 compile = None 723 eval = None 724 exec = None 725 open = None 726 handle_pipe(stdin, steps) 727 exit(0) 728 729 expr = '.' 730 if len(args) > 0: 731 expr = args[0] 732 args = args[1:] 733 734 if expr == '.' and no_input: 735 print(info.strip(), file=stderr) 736 exit(0) 737 738 if expr == '.': 739 expr = 'line' 740 741 expr = compile(expr, expr, mode='eval') 742 compile = None 743 744 if no_input: 745 handle_no_input(expr) 746 exit(0) 747 748 if len(args) == 0: 749 handle_lines(stdin, expr) 750 exit(0) 751 752 got_stdin = False 753 all_stdin = None 754 dashes = args.count('-') 755 756 for path in args: 757 if path == '-': 758 if dashes > 1: 759 if not got_stdin: 760 all_stdin = [] 761 handle_lines(hold_lines(stdin, all_stdin), expr) 762 got_stdin = True 763 else: 764 handle_lines(all_stdin, expr) 765 else: 766 handle_lines(stdin, expr) 767 continue 768 769 if seemsurl(path): 770 with urlopen(path) as inp: 771 with TextIOWrapper(inp, encoding='utf-8') as txt: 772 handle_lines(txt, expr) 773 continue 774 775 with open_utf8(path) as txt: 776 handle_lines(txt, expr) 777 except BrokenPipeError: 778 # quit quietly, instead of showing a confusing error message 779 stderr.close() 780 exit(0) 781 except KeyboardInterrupt: 782 exit(2) 783 except Exception as e: 784 if trace_errors: 785 raise e 786 else: 787 fail(e, 1)