File: tbp.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2020-2025 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tbp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Bytes with Python runs a python expression on each whole-input,
  31 read as a bytes-type value.
  32 
  33 The expression can use either `v`, `value`, `d`, or `data` for the current
  34 input.
  35 
  36 Input-sources can be either files or web-URIs. When not given any explicit
  37 named sources, the standard input is used. It's even possible to reuse the
  38 standard input using multiple single dashes (-) in the order needed: stdin
  39 is only read once in this case, and kept for later reuse.
  40 
  41 When the expression results in None, the current input is ignored. When the
  42 expression results in a boolean, this determines whether the whole input is
  43 copied/appended back to the standard output, or ignored.
  44 '''
  45 
  46 
  47 from sys import argv, exit, stderr, stdin, stdout
  48 from time import sleep
  49 from typing import Generator
  50 
  51 
  52 if len(argv) < 2:
  53     print(info.strip(), file=stderr)
  54     exit(0)
  55 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'):
  56     print(info.strip())
  57     exit(0)
  58 
  59 
  60 class Skip:
  61     pass
  62 
  63 
  64 skip = Skip()
  65 
  66 
  67 def cond(*args):
  68     if len(args) == 0:
  69         return None
  70 
  71     for i, e in enumerate(args):
  72         if i % 2 == 0 and i < len(args) - 1 and e:
  73             return args[i + 1]
  74 
  75     return args[-1] if len(args) % 2 == 1 else None
  76 
  77 
  78 def rescue(attempt, fallback = None):
  79     try:
  80         return attempt()
  81     except Exception as e:
  82         if callable(fallback):
  83             return fallback(e)
  84         return fallback
  85 
  86 catch = rescue
  87 recover = rescue
  88 rescued = rescue
  89 
  90 
  91 def wait(seconds, result):
  92     t = (int, float)
  93     if (not isinstance(seconds, t)) and isinstance(result, t):
  94         seconds, result = result, seconds
  95     sleep(seconds)
  96     return result
  97 
  98 delay = wait
  99 
 100 
 101 no_input_opts = (
 102     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 103 )
 104 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string')
 105 
 106 args = argv[1:]
 107 load_input = True
 108 string_input = False
 109 expression = None
 110 
 111 # handle all other leading options; the explicit help options are
 112 # handled earlier in the script
 113 while len(args) > 0:
 114     if args[0] in no_input_opts:
 115         load_input = False
 116         args = args[1:]
 117         continue
 118 
 119     if args[0] in string_opts:
 120         string_input = True
 121         args = args[1:]
 122         continue
 123 
 124     break
 125 
 126 if len(args) > 0:
 127     expression = args[0]
 128     args = args[1:]
 129 
 130 if expression is None:
 131     print(info.strip(), file=stderr)
 132     exit(0)
 133 
 134 
 135 def make_open_read(open):
 136     'Restrict the file-open func to a read-only-binary file-open func.'
 137     def open_read(name):
 138         return open(name, mode='rb')
 139     return open_read
 140 
 141 
 142 def fail(msg, code = 1):
 143     print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr)
 144     exit(code)
 145 
 146 
 147 def message(msg, result = None):
 148     print(msg, file=stderr)
 149     return result
 150 
 151 msg = message
 152 
 153 
 154 def seemsurl(s):
 155     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 156     return any(s.startswith(p) for p in protocols)
 157 
 158 
 159 def tobytes(x):
 160     if isinstance(x, (bytearray, bytes)):
 161         return x
 162     if isinstance(x, (bool, int)):
 163         return bytes((int(x), ))
 164     if isinstance(x, float):
 165         return bytes(str(x), encoding='utf-8')
 166     if isinstance(x, str):
 167         return bytes(x, encoding='utf-8')
 168     return bytes(x)
 169 
 170 
 171 def tointorbytes(x):
 172     return x if isinstance(x, int) else tobytes(x)
 173 
 174 
 175 def adapt_result(x, default):
 176     if x is True:
 177         return default
 178     if x is False:
 179         return None
 180 
 181     if isinstance(x, Skip):
 182         return None
 183 
 184     if callable(x):
 185         return x(default)
 186     return x
 187 
 188 
 189 def emit_result(w, x):
 190     if x is None:
 191         return
 192 
 193     if isinstance(x, int):
 194         w.write(tobytes(x))
 195         return
 196 
 197     if isinstance(x, (list, tuple, range, Generator)):
 198         for e in x:
 199             w.write(tobytes(e))
 200         return
 201 
 202     w.write(tobytes(x))
 203 
 204 
 205 def eval_expr(expr, using):
 206     global v, val, value, d, dat, data
 207     # offer several aliases for the variable with the input bytes
 208     v = val = value = d = dat = data = using
 209     return adapt_result(eval(expr), using)
 210 
 211 
 212 cr = '\r' if string_input else b'\r'
 213 crlf = '\r\n' if string_input else b'\r\n'
 214 dquo = '"' if string_input else b'"'
 215 dquote = '"' if string_input else b'"'
 216 empty = '' if string_input else b''
 217 lcurly = '{' if string_input else b'{'
 218 lf = '\n' if string_input else b'\n'
 219 rcurly = '}' if string_input else b'}'
 220 space = ' ' if string_input else b' '
 221 squo = '\'' if string_input else b'\''
 222 squote = '\'' if string_input else b'\''
 223 tab = '\t' if string_input else b'\t'
 224 utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf'
 225 
 226 nil = None
 227 none = None
 228 null = None
 229 
 230 exec = None
 231 open = make_open_read(open)
 232 
 233 modules_opts = (
 234     '-m', '--m', '-mod', '--mod', '-module', '--module',
 235     '-modules', '--modules',
 236 )
 237 more_modules_opts = ('-mm', '--mm', '-more', '--more')
 238 
 239 while len(args) > 0:
 240     if args[0] in no_input_opts:
 241         no_input = True
 242         args = args[1:]
 243         continue
 244 
 245     if args[0] in modules_opts:
 246         try:
 247             if len(args) < 2:
 248                 msg = 'a module name or a comma-separated list of modules'
 249                 raise Exception('expected ' + msg)
 250 
 251             g = globals()
 252             from importlib import import_module
 253             for e in args[1].split(','):
 254                 g[e] = import_module(e)
 255 
 256             g = None
 257             import_module = None
 258             args = args[2:]
 259         except Exception as e:
 260             fail(e, 1)
 261 
 262         continue
 263 
 264     if args[0] in more_modules_opts:
 265         import functools, itertools, json, math, random, statistics, string, time
 266         args = args[1:]
 267         continue
 268 
 269     break
 270 
 271 
 272 try:
 273     if not expression or expression == '.':
 274         expression = 'data'
 275     expression = compile(expression, expression, 'eval')
 276 
 277     got_stdin = False
 278     all_stdin = None
 279     dashes = args.count('-')
 280 
 281     data = None
 282 
 283     if not load_input:
 284         emit_result(stdout.buffer, eval_expr(expression, None))
 285         exit(0)
 286 
 287     if any(seemsurl(name) for name in args):
 288         from urllib.request import urlopen
 289 
 290     for name in args:
 291         if name == '-':
 292             if dashes > 1:
 293                 if not got_stdin:
 294                     all_stdin = stdin.buffer.read()
 295                     got_stdin = True
 296                 data = all_stdin
 297             else:
 298                 data = stdin.buffer.read()
 299 
 300             if string_input:
 301                 data = s = str(data, encoding='utf-8')
 302         elif seemsurl(name):
 303             with urlopen(name) as inp:
 304                 data = inp.read()
 305         else:
 306             with open(name) as inp:
 307                 data = inp.read()
 308 
 309         if string_input:
 310             data = s = str(data, encoding='utf-8')
 311         emit_result(stdout.buffer, eval_expr(expression, data))
 312 
 313     if len(args) == 0:
 314         data = stdin.buffer.read()
 315         if string_input:
 316             data = s = str(data, encoding='utf-8')
 317         emit_result(stdout.buffer, eval_expr(expression, data))
 318 except BrokenPipeError:
 319     # quit quietly, instead of showing a confusing error message
 320     stderr.close()
 321     exit(0)
 322 except KeyboardInterrupt:
 323     exit(2)
 324 except Exception as e:
 325     s = str(e)
 326     s = s if s else '<generic exception>'
 327     print(f'\x1b[31m{s}\x1b[0m', file=stderr)
 328     exit(1)