File: tlp.py
   1 #!/usr/bin/python
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright (c) 2026 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the "Software"), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tlp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Lines with Python runs a python expression on each line of text
  31 input, encoded as UTF-8. Carriage-returns are always ignored in lines, as
  32 well as any UTF-8-BOM on the first line of each input.
  33 
  34 The expression can use either `l` or `line` for the current line, and `i` as
  35 a 0-based line counter which keeps growing even across input-sources, when
  36 given multiple inputs. Also available is `n`, a 1-based line counter which
  37 otherwise works the same way.
  38 
  39 Each line is automatically parsed as JSON: when successful, the parsed line
  40 is available to the expression as `v`, or `value`, with value `err` set to
  41 None, since parsing succeeded; on failure, `v` and `value` are set to None,
  42 while `err` has the exception as a value. You can check success/failure by
  43 checking if `err` is None, or not.
  44 
  45 Input-sources can be either files or web-URIs. When not given any explicit
  46 named sources, the standard input is used. It's even possible to reuse the
  47 standard input using multiple single dashes (-) in the order needed: stdin
  48 is only read once in this case, and kept for later reuse.
  49 
  50 When the expression results in None, the current input line is ignored. When
  51 the expression results in a boolean, its value determines whether each line
  52 is emitted to the standard output, or ignored.
  53 
  54 When the expression emits lists, tuples, or generators, each item is emitted
  55 as its own line/result. Since empty containers emit no lines, these are the
  56 most general type of results, acting as either filters, or input-amplifiers.
  57 
  58 
  59 Examples
  60 
  61 # numbers from 0 to 5, each on its own output line; no input is read/used
  62 tlp = 'range(6)'
  63 
  64 # all powers up to the 4th, using each input line auto-parsed into a `float`
  65 tlp = 'range(1, 6)' | tlp '(v**p for p in range(1, 4+1))'
  66 
  67 # separate input lines with an empty line between each; global var `empty`
  68 # can be used to avoid bothering with nested shell-quoting
  69 tlp = 'range(6)' | tlp '["", l] if i > 0 else l'
  70 
  71 # ignore errors/exceptions, in favor of the original lines/values
  72 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), line)'
  73 
  74 # ignore errors/exceptions, calling a fallback func with the exception
  75 tlp = '("abc", "123")' | tlp 'rescue(lambda: 2 * float(line), str)'
  76 
  77 # filtering lines out via None values
  78 head -c 1024 /dev/urandom | strings | tlp 'l if len(l) < 20 else None'
  79 
  80 # boolean-valued results are concise ways to filter lines out
  81 head -c 1024 /dev/urandom | strings | tlp 'len(l) < 20'
  82 
  83 # function/callable results are automatically called on the current line
  84 head -c 1024 /dev/urandom | strings | tlp len
  85 
  86 # emit 10 random integers between 1 and 10
  87 tlp -m random = '(random.randint(1, 10) for _ in range(10))'
  88 
  89 # emit standard input lines slowly, delaying output 0.5 seconds each time
  90 tlp -m time '(time.sleep(0.5), line)[-1]'
  91 
  92 # emit documentation for collections.defaultdict from the python stdlib
  93 tlp = -m collections 'help(collections.defaultdict)' | cat
  94 '''
  95 
  96 
  97 from itertools import islice
  98 from json import dumps, loads
  99 from re import compile as compile_uncached, IGNORECASE
 100 from sys import argv, exit, stderr, stdin
 101 from time import localtime, sleep, strftime
 102 from typing import Generator, Iterable
 103 
 104 
 105 if len(argv) < 2:
 106     print(info.strip(), file=stderr)
 107     exit(0)
 108 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'):
 109     print(info.strip())
 110     exit(0)
 111 
 112 
 113 def handle_no_input(expr):
 114     res = eval(expr)
 115     if isinstance(res, (list, range, tuple, Generator)):
 116         for e in res:
 117             e = adapt_result(e, None)
 118             if not (e is None):
 119                 print(e, flush=True)
 120         return
 121 
 122     res = adapt_result(res, None)
 123     if not (res is None):
 124         print(res, flush=True)
 125 
 126 def handle_lines(src, expr):
 127     # `comprehension` expressions seem to ignore local variables: even
 128     # lambda-based workarounds fail
 129     global i, n, l, line, v, val, value, e, err, error
 130 
 131     i = 0
 132     n = 1
 133     e = err = error = None
 134 
 135     for l in src:
 136         l = l.rstrip('\r\n').rstrip('\n')
 137         if i == 0:
 138             l = l.lstrip('\xef\xbb\xbf')
 139 
 140         line = l
 141         try:
 142             e = err = error = None
 143             v = val = value = loads(l)
 144         except BrokenPipeError as ex:
 145             raise ex
 146         except Exception as ex:
 147             e = err = error = ex
 148             v = val = value = Skip()
 149         res = eval(expr)
 150         i += 1
 151         n += 1
 152 
 153         if isinstance(res, (list, range, tuple, Generator)):
 154             for e in res:
 155                 e = adapt_result(e, None)
 156                 if not (e is None):
 157                     print(e, flush=True)
 158             continue
 159 
 160         res = adapt_result(res, line)
 161         if not (res is None):
 162             print(res, flush=True)
 163 
 164 def hold_lines(src, lines):
 165     for e in src:
 166         lines.append(e)
 167         yield e
 168 
 169 def adapt_result(res, fallback):
 170     if isinstance(res, BaseException):
 171         raise res
 172     if isinstance(res, Skip) or res is None or res is False:
 173         return None
 174     if callable(res):
 175         return res(fallback)
 176     if res is True:
 177         return fallback
 178     if isinstance(res, dict):
 179         return dumps(res, allow_nan=False)
 180     return str(res)
 181 
 182 def fail(msg, code = 1):
 183     print(str(msg), file=stderr)
 184     exit(code)
 185 
 186 def make_open_utf8(open):
 187     def open_utf8_readonly(path):
 188         return open(path, encoding='utf-8')
 189     return open_utf8_readonly
 190 
 191 def seemsurl(path):
 192     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 193     return any(path.startswith(p) for p in protocols)
 194 
 195 class Skip:
 196     pass
 197 
 198 skip = Skip()
 199 
 200 def chunk(items, chunk_size):
 201     'Break iterable into chunks, each with up to the item-count given.'
 202 
 203     if isinstance(items, str):
 204         n = len(items)
 205         while n >= chunk_size:
 206             yield items[:chunk_size]
 207             items = items[chunk_size:]
 208             n -= chunk_size
 209         if n > 0:
 210             yield items
 211         return
 212 
 213     if not isinstance(chunk_size, int):
 214         raise Exception('non-integer chunk-size')
 215     if chunk_size < 1:
 216         raise Exception('non-positive chunk-size')
 217 
 218     it = iter(items)
 219     while True:
 220         head = tuple(islice(it, chunk_size))
 221         if not head:
 222             return
 223         yield head
 224 
 225 chunked = chunk
 226 
 227 # re_cache is used by custom func compile to cache previously-compiled
 228 # regular-expressions, which makes them quicker to (re)use in formulas
 229 re_cache = {}
 230 
 231 def re_compile(expr, flags = 0):
 232     'Speed-up using regexes across lines, by avoiding recompilations.'
 233 
 234     if flags in re_cache:
 235         cache = re_cache[flags]
 236     else:
 237         cache = {}
 238         re_cache[flags] = cache
 239     if expr in cache:
 240         return cache[expr]
 241 
 242     pat = compile_uncached(expr, flags)
 243     cache[expr] = pat
 244     return pat
 245 
 246 def icompile(expr):
 247     return re_compile(expr, IGNORECASE)
 248 
 249 def cond(*args):
 250     if len(args) == 0:
 251         return None
 252 
 253     for i, e in enumerate(args):
 254         if i % 2 == 0 and i < len(args) - 1 and e:
 255             return args[i + 1]
 256 
 257     return args[-1] if len(args) % 2 == 1 else None
 258 
 259 def dive(into, using):
 260     'Depth-first recursive caller for 1-input functions.'
 261 
 262     if callable(into):
 263         into, using = using, into
 264 
 265     def rec(v):
 266         if isinstance(v, dict):
 267             return {k: rec(v) for k, v in v.items()}
 268         if isinstance(v, Iterable) and not isinstance(v, str):
 269             return [rec(v) for v in v]
 270         return using(v)
 271 
 272     return rec(into)
 273 
 274 def divekeys(into, using):
 275     'Depth-first recursive caller for 2-input funcs which rename dict keys.'
 276 
 277     if callable(into):
 278         into, using = using, into
 279 
 280     def rec(v):
 281         if isinstance(v, dict):
 282             return {using(k): rec(v) for k, v in v.items()}
 283         if isinstance(v, Iterable) and not isinstance(v, str):
 284             return [rec(v) for i, v in enumerate(v)]
 285         return v
 286 
 287     return rec(None, into)
 288 
 289 def divekv(into, using, using2 = None):
 290     'Depth-first recursive caller for 2-input functions.'
 291 
 292     if using2 is None:
 293         if callable(into):
 294             into, using = using, into
 295     else:
 296         if not callable(using2):
 297             into, using, using2 = using2, into, using
 298 
 299     def rec(k, v):
 300         if isinstance(v, dict):
 301             return {k: rec(k, v) for k, v in v.items()}
 302         if isinstance(v, Iterable) and not isinstance(v, str):
 303             return [rec(i, v) for i, v in enumerate(v)]
 304         return using(k, v)
 305 
 306     def rec2(k, v):
 307         if isinstance(v, dict):
 308             return {str(using(k, v)): rec2(k, v) for k, v in v.items()}
 309         if isinstance(v, Iterable) and not isinstance(v, str):
 310             # return {str(using(i, v)): rec2(i, v) for i, v in enumerate(v)}
 311             return [rec2(i, v) for i, v in enumerate(v)]
 312         return using2(k, v)
 313 
 314     return rec(None, into) if using2 is None else rec2(None, into)
 315 
 316 kvdive = divekv
 317 
 318 def drop(src, *what):
 319     if isinstance(src, str):
 320         for s in what:
 321             src = src.replace(s, '')
 322         return src
 323 
 324     def kdrop(src, what):
 325         return {k: v for (k, v) in src.items() if not (k in what)}
 326 
 327     if isinstance(src, dict):
 328         return kdrop(src, set(what))
 329 
 330     if isinstance(src, Iterable):
 331         what = set(what)
 332         return [kdrop(e, what) for e in src if isinstance(e, dict)]
 333 
 334     return None
 335 
 336 dropped = drop
 337 
 338 def join(x, y = ' '):
 339     'Join values into a string, or make a dict from keys and values.'
 340 
 341     if isinstance(x, str):
 342         return x.join(str(v) for v in y)
 343     if isinstance(y, str):
 344         return y.join(str(v) for v in x)
 345     return {k: v for k, v in zip(x, y)}
 346 
 347 def pick(src, *keys):
 348     if isinstance(src, dict):
 349         return {k: src.get(k, None) for k in keys}
 350     return [{k: e.get(k, None) for k in keys} for e in src if isinstance(e, dict)]
 351 
 352 picked = pick
 353 
 354 def plain(s):
 355     'Ignore all ANSI-style sequences in a string.'
 356     return re_compile('''\x1b\\[([0-9;]+m|[0-9]*[A-HJKST])''').sub('', s)
 357 
 358 def predicate(x):
 359     'Helps various higher-order funcs, by standardizing `predicate` values.'
 360     if callable(x):
 361         return x
 362     if isinstance(x, float):
 363         if isnan(x):
 364             return lambda y: isinstance(y, float) and isnan(y)
 365         if isinf(x):
 366             return lambda y: isinstance(y, float) and isinf(y)
 367     return lambda y: x == y
 368 
 369 def rescue(attempt, fallback = None):
 370     try:
 371         return attempt()
 372     except BrokenPipeError as e:
 373         raise e
 374     except Exception as e:
 375         if callable(fallback):
 376             return fallback(e)
 377         return fallback
 378 
 379 rescued = rescue
 380 
 381 def retype(x):
 382     'Try to narrow the type of the value given.'
 383 
 384     if isinstance(x, float):
 385         n = int(x)
 386         return n if float(n) == x else x
 387 
 388     if not isinstance(x, str):
 389         return x
 390 
 391     try:
 392         return loads(x)
 393     except Exception:
 394         pass
 395 
 396     try:
 397         return int(x)
 398     except Exception:
 399         pass
 400 
 401     try:
 402         return float(x)
 403     except Exception:
 404         pass
 405 
 406     return x
 407 
 408 autocast = autocasted = mold = molded = recast = recasted = remold = retype
 409 remolded = retyped = retype
 410 
 411 def json0(x):
 412     return dumps(x, separators=(',', ':'), allow_nan=False, indent=None)
 413 
 414 j0 = json0
 415 
 416 def jsonl(x):
 417     if isinstance(x, Skip):
 418         return
 419 
 420     def emit(x):
 421         return dumps(x, separators=(', ', ': '), allow_nan=False, indent=None)
 422 
 423     if x is None:
 424         yield emit(x)
 425         return
 426 
 427     if isinstance(x, (bool, int, float, dict, str)):
 428         yield emit(x)
 429         return
 430 
 431     if isinstance(x, Iterable):
 432         for e in x:
 433             if isinstance(e, Skip):
 434                 continue
 435             yield emit(x)
 436         return
 437 
 438     yield emit(str(x))
 439 
 440 jl = jsonlines = ndjson = jsonl
 441 
 442 def typeof(x):
 443     # return str(type(x))
 444     return {
 445         type(None): 'null',
 446         bool: 'boolean',
 447         dict: 'object',
 448         float: 'number',
 449         int: 'number',
 450         str: 'string',
 451         list: 'array',
 452         tuple: 'array',
 453     }.get(type(x), 'other')
 454 
 455 jstype = typeof
 456 
 457 def wait(seconds, result):
 458     'Wait the given number of seconds, before returning its latter arg.'
 459 
 460     if not isinstance(seconds, (int, float)):
 461         if isinstance(result, (int, float)):
 462             seconds, result = result, seconds
 463     sleep(seconds)
 464     return result
 465 
 466 delay = wait
 467 
 468 def after(x, what):
 469     i = x.find(what)
 470     return '' if i < 0 else x[i+len(what):]
 471 
 472 def afterlast(x, what):
 473     i = x.rfind(what)
 474     return '' if i < 0 else x[i+len(what):]
 475 
 476 afterfinal = afterlast
 477 
 478 def before(x, what):
 479     i = x.find(what)
 480     return x if i < 0 else x[:i]
 481 
 482 def beforelast(x, what):
 483     i = x.rfind(what)
 484     return x if i < 0 else x[:i]
 485 
 486 beforefinal = beforelast
 487 
 488 def since(x, what):
 489     i = x.find(what)
 490     return '' if i < 0 else x[i:]
 491 
 492 def sincelast(x, what):
 493     i = x.rfind(what)
 494     return '' if i < 0 else x[i:]
 495 
 496 sincefinal = sincelast
 497 
 498 def until(x, what):
 499     i = x.find(what)
 500     return x if i < 0 else x[:i+len(what)]
 501 
 502 def untilfinal(x, what):
 503     i = x.rfind(what)
 504     return x if i < 0 else x[:i+len(what)]
 505 
 506 untillast = untilfinal
 507 
 508 def blue(s):
 509     return f'\x1b[38;2;0;95;215m{s}\x1b[0m'
 510 
 511 def blueback(s):
 512     return f'\x1b[48;2;0;95;215m\x1b[38;2;238;238;238m{s}\x1b[0m'
 513 
 514 bluebg = blueback
 515 
 516 def bold(s):
 517     return f'\x1b[1m{s}\x1b[0m'
 518 
 519 bolded = bold
 520 
 521 def gbm(s, good = False, bad = False, meh = False):
 522     '''
 523     Good, Bad, Meh ANSI-styles a plain string via ANSI-style sequences,
 524     according to 1..3 conditions given as boolean(ish) values: these are
 525     checked in order, so the first truish one wins.
 526     '''
 527 
 528     if good:
 529         return green(s)
 530     if bad:
 531         return red(s)
 532     if meh:
 533         return gray(s)
 534     return s
 535 
 536 def gray(s):
 537     return f'\x1b[38;2;168;168;168m{s}\x1b[0m'
 538 
 539 def grayback(s):
 540     return f'\x1b[48;2;168;168;168m\x1b[38;2;238;238;238m{s}\x1b[0m'
 541 
 542 def green(s):
 543     return f'\x1b[38;2;0;135;95m{s}\x1b[0m'
 544 
 545 def greenback(s):
 546     return f'\x1b[48;2;0;135;95m\x1b[38;2;238;238;238m{s}\x1b[0m'
 547 
 548 def highlight(s):
 549     return f'\x1b[7m{s}\x1b[0m'
 550 
 551 hilite = highlight
 552 
 553 def orange(s):
 554     return f'\x1b[38;2;215;95;0m{s}\x1b[0m'
 555 
 556 def orangeback(s):
 557     return f'\x1b[48;2;215;95;0m\x1b[38;2;238;238;238m{s}\x1b[0m'
 558 
 559 def purple(s):
 560     return f'\x1b[38;2;135;95;255m{s}\x1b[0m'
 561 
 562 def purpleback(s):
 563     return f'\x1b[48;2;135;95;255m\x1b[38;2;238;238;238m{s}\x1b[0m'
 564 
 565 def red(s):
 566     return f'\x1b[38;2;204;0;0m{s}\x1b[0m'
 567 
 568 def redback(s):
 569     return f'\x1b[38;2;204;0;0m\x1b[38;2;238;238;238m{s}\x1b[0m'
 570 
 571 def underline(s):
 572     return f'\x1b[4m{s}\x1b[0m'
 573 
 574 underlined = underline
 575 
 576 def message(msg, result = None):
 577     print(msg, file=stderr)
 578     return result
 579 
 580 msg = message
 581 
 582 # seen is used by func `once` to remember previously-given values
 583 seen = set()
 584 
 585 def once(x):
 586     if x in seen:
 587         return None
 588     seen.add(x)
 589     return x
 590 
 591 dedup = unique = once
 592 
 593 def utf8(x):
 594     try:
 595         if isinstance(x, str):
 596             x = x.encode('utf-8')
 597         return str(x, 'utf-8')
 598     except Exception:
 599         return None
 600 
 601 def ymdhms(when = None):
 602     fmt = f'%Y-%m-%d %H:%M:%S'
 603     if isinstance(when, (float, int)):
 604         return strftime(fmt, localtime(float(when)))
 605     if isinstance(when, tuple):
 606         return strftime(fmt, when)
 607     return strftime(fmt, localtime())
 608 
 609 
 610 cr = '\r'
 611 crlf = '\r\n'
 612 dquo = dquote = '"'
 613 empty = ''
 614 lcurly = '{'
 615 lf = '\n'
 616 rcurly = '}'
 617 space = ' '
 618 squo = squote = '\''
 619 tab = '\t'
 620 
 621 nil = none = null = None
 622 
 623 
 624 exec = None
 625 open_utf8 = make_open_utf8(open)
 626 open = open_utf8
 627 
 628 no_input_opts = (
 629     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 630 )
 631 modules_opts = (
 632     '-m', '--m', '-mod', '--mod', '-module', '--module',
 633     '-modules', '--modules',
 634 )
 635 trace_opts = ('-t', '--t', '-trace', '--trace', '-traceback', '--traceback')
 636 
 637 args = argv[1:]
 638 if any(seemsurl(e) for e in args):
 639     from io import TextIOWrapper
 640     from urllib.request import urlopen
 641 
 642 no_input = False
 643 trace_errors = False
 644 
 645 while len(args) > 0:
 646     if args[0] == '--':
 647         args = args[1:]
 648         break
 649 
 650     if args[0] in no_input_opts:
 651         no_input = True
 652         args = args[1:]
 653         continue
 654 
 655     if args[0] in modules_opts:
 656         try:
 657             if len(args) < 2:
 658                 msg = 'a module name or a comma-separated list of modules'
 659                 raise Exception('expected ' + msg)
 660 
 661             g = globals()
 662             from importlib import import_module
 663             for e in args[1].split(','):
 664                 g[e] = import_module(e)
 665 
 666             g = None
 667             import_module = None
 668             args = args[2:]
 669         except Exception as e:
 670             fail(e, 1)
 671 
 672         continue
 673 
 674     if args[0] in trace_opts:
 675         trace_errors = True
 676         args = args[1:]
 677         continue
 678 
 679     break
 680 
 681 
 682 try:
 683     expr = '.'
 684     if len(args) > 0:
 685         expr = args[0]
 686         args = args[1:]
 687 
 688     if expr == '.' and no_input:
 689         print(info.strip(), file=stderr)
 690         exit(0)
 691 
 692     if expr == '.':
 693         expr = 'line'
 694 
 695     expr = compile(expr, expr, mode='eval')
 696     compile = None
 697 
 698     if no_input:
 699         handle_no_input(expr)
 700         exit(0)
 701 
 702     if len(args) == 0:
 703         handle_lines(stdin, expr)
 704         exit(0)
 705 
 706     got_stdin = False
 707     all_stdin = None
 708     dashes = args.count('-')
 709 
 710     for path in args:
 711         if path == '-':
 712             if dashes > 1:
 713                 if not got_stdin:
 714                     all_stdin = []
 715                     handle_lines(hold_lines(stdin, all_stdin), expr)
 716                     got_stdin = True
 717                 else:
 718                     handle_lines(all_stdin, expr)
 719             else:
 720                 handle_lines(stdin, expr)
 721             continue
 722 
 723         if seemsurl(path):
 724             with urlopen(path) as inp:
 725                 with TextIOWrapper(inp, encoding='utf-8') as txt:
 726                     handle_lines(txt, expr)
 727             continue
 728 
 729         with open_utf8(path) as txt:
 730             handle_lines(txt, expr)
 731 except BrokenPipeError:
 732     # quit quietly, instead of showing a confusing error message
 733     stderr.close()
 734     exit(0)
 735 except KeyboardInterrupt:
 736     exit(2)
 737 except Exception as e:
 738     if trace_errors:
 739         raise e
 740     else:
 741         fail(e, 1)