File: tbp.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2020-2025 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tbp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Bytes with Python runs a python expression on each whole-input,
  31 read as a bytes-type value.
  32 
  33 The expression can use either `v`, `value`, `d`, or `data` for the current
  34 input.
  35 
  36 Input-sources can be either files or web-URIs. When not given any explicit
  37 named sources, the standard input is used. It's even possible to reuse the
  38 standard input using multiple single dashes (-) in the order needed: stdin
  39 is only read once in this case, and kept for later reuse.
  40 
  41 When the expression results in None, the current input is ignored. When the
  42 expression results in a boolean, this determines whether the whole input is
  43 copied/appended back to the standard output, or ignored.
  44 '''
  45 
  46 
  47 from sys import argv, exit, stderr, stdin, stdout
  48 from time import sleep
  49 from typing import Generator
  50 
  51 
  52 if len(argv) < 2:
  53     print(info.strip(), file=stderr)
  54     exit(0)
  55 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'):
  56     print(info.strip())
  57     exit(0)
  58 
  59 
  60 class Skip:
  61     pass
  62 
  63 
  64 skip = Skip()
  65 
  66 
  67 def cond(*args):
  68     if len(args) == 0:
  69         return None
  70 
  71     for i, e in enumerate(args):
  72         if i % 2 == 0 and i < len(args) - 1 and e:
  73             return args[i + 1]
  74 
  75     return args[-1] if len(args) % 2 == 1 else None
  76 
  77 
  78 def rescue(attempt, fallback = None):
  79     try:
  80         return attempt()
  81     except Exception as e:
  82         if callable(fallback):
  83             return fallback(e)
  84         return fallback
  85 
  86 catch = rescue
  87 recover = rescue
  88 rescued = rescue
  89 
  90 
  91 def wait(seconds, result):
  92     t = (int, float)
  93     if (not isinstance(seconds, t)) and isinstance(result, t):
  94         seconds, result = result, seconds
  95     sleep(seconds)
  96     return result
  97 
  98 delay = wait
  99 
 100 
 101 def uint_big_endian(src, size, start=0):
 102     if not isinstance(src, bytes):
 103         return ValueError('can only get unsigned integers from bytes')
 104     if start + size >= len(src):
 105         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 106         raise ValueError(msg)
 107     return sum(int(src[start + i]) << (8 * i) for i in range(size, 0, -1))
 108 
 109 
 110 def uint16_be(src, start=0):
 111     return uint_big_endian(src, 2, start)
 112 
 113 uint16be = uint16_be
 114 
 115 def uint32_be(src, start=0):
 116     return uint_big_endian(src, 4, start)
 117 
 118 uint32be = uint32_be
 119 
 120 def uint64_be(src, start=0):
 121     return uint_big_endian(src, 8, start)
 122 
 123 uint64be = uint64_be
 124 
 125 
 126 def uint_little_endian(src, size, start=0):
 127     if not isinstance(src, bytes):
 128         return ValueError('can only get unsigned integers from bytes')
 129     if start + size >= len(src):
 130         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 131         raise ValueError(msg)
 132     return sum(int(src[start + i]) << (8 * i) for i in range(size))
 133 
 134 
 135 def uint16_le(src, start=0):
 136     return uint_little_endian(src, 2, start)
 137 
 138 uint16le = uint16_le
 139 
 140 def uint32_le(src, start=0):
 141     return uint_little_endian(src, 4, start)
 142 
 143 uint32le = uint32_le
 144 
 145 def uint64_le(src, start=0):
 146     return uint_little_endian(src, 8, start)
 147 
 148 uint64le = uint64_le
 149 
 150 
 151 no_input_opts = (
 152     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 153 )
 154 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string')
 155 
 156 args = argv[1:]
 157 load_input = True
 158 string_input = False
 159 expression = None
 160 
 161 # handle all other leading options; the explicit help options are
 162 # handled earlier in the script
 163 while len(args) > 0:
 164     if args[0] in no_input_opts:
 165         load_input = False
 166         args = args[1:]
 167         continue
 168 
 169     if args[0] in string_opts:
 170         string_input = True
 171         args = args[1:]
 172         continue
 173 
 174     break
 175 
 176 if len(args) > 0:
 177     expression = args[0]
 178     args = args[1:]
 179 
 180 if expression is None:
 181     print(info.strip(), file=stderr)
 182     exit(0)
 183 
 184 
 185 def make_open_read(open):
 186     'Restrict the file-open func to a read-only-binary file-open func.'
 187     def open_read(name):
 188         return open(name, mode='rb')
 189     return open_read
 190 
 191 
 192 def fail(msg, code = 1):
 193     print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr)
 194     exit(code)
 195 
 196 
 197 def message(msg, result = None):
 198     print(msg, file=stderr)
 199     return result
 200 
 201 msg = message
 202 
 203 
 204 def seemsurl(s):
 205     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 206     return any(s.startswith(p) for p in protocols)
 207 
 208 
 209 def tobytes(x):
 210     if isinstance(x, (bytearray, bytes)):
 211         return x
 212     if isinstance(x, (bool, int)):
 213         return bytes((int(x), ))
 214     if isinstance(x, float):
 215         return bytes(str(x), encoding='utf-8')
 216     if isinstance(x, str):
 217         return bytes(x, encoding='utf-8')
 218     return bytes(x)
 219 
 220 
 221 def tointorbytes(x):
 222     return x if isinstance(x, int) else tobytes(x)
 223 
 224 
 225 def adapt_result(x, default):
 226     if x is True:
 227         return default
 228     if x is False:
 229         return None
 230 
 231     if isinstance(x, Skip):
 232         return None
 233 
 234     if callable(x):
 235         return x(default)
 236     return x
 237 
 238 
 239 def emit_result(w, x):
 240     if x is None:
 241         return
 242 
 243     if isinstance(x, int):
 244         w.write(tobytes(x))
 245         return
 246 
 247     if isinstance(x, (list, tuple, range, Generator)):
 248         for e in x:
 249             w.write(tobytes(e))
 250         return
 251 
 252     w.write(tobytes(x))
 253 
 254 
 255 def eval_expr(expr, using):
 256     global v, val, value, d, dat, data
 257     # offer several aliases for the variable with the input bytes
 258     v = val = value = d = dat = data = using
 259     return adapt_result(eval(expr), using)
 260 
 261 
 262 cr = '\r' if string_input else b'\r'
 263 crlf = '\r\n' if string_input else b'\r\n'
 264 dquo = '"' if string_input else b'"'
 265 dquote = '"' if string_input else b'"'
 266 empty = '' if string_input else b''
 267 lcurly = '{' if string_input else b'{'
 268 lf = '\n' if string_input else b'\n'
 269 rcurly = '}' if string_input else b'}'
 270 space = ' ' if string_input else b' '
 271 squo = '\'' if string_input else b'\''
 272 squote = '\'' if string_input else b'\''
 273 tab = '\t' if string_input else b'\t'
 274 # utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf'
 275 if string_input:
 276     bom = {
 277         'utf8': '\xef\xbb\xbf',
 278         'utf16be': '\xfe\xff',
 279         'utf16le': '\xff\xfe',
 280         'utf32be': '\x00\x00\xfe\xff',
 281         'utf32le': '\xff\xfe\x00\x00',
 282     }
 283 else:
 284     bom = {
 285         'utf8': b'\xef\xbb\xbf',
 286         'utf16be': b'\xfe\xff',
 287         'utf16le': b'\xff\xfe',
 288         'utf32be': b'\x00\x00\xfe\xff',
 289         'utf32le': b'\xff\xfe\x00\x00',
 290     }
 291 
 292 nil = None
 293 none = None
 294 null = None
 295 
 296 exec = None
 297 open = make_open_read(open)
 298 
 299 modules_opts = (
 300     '-m', '--m', '-mod', '--mod', '-module', '--module',
 301     '-modules', '--modules',
 302 )
 303 more_modules_opts = ('-mm', '--mm', '-more', '--more')
 304 
 305 while len(args) > 0:
 306     if args[0] in no_input_opts:
 307         no_input = True
 308         args = args[1:]
 309         continue
 310 
 311     if args[0] in modules_opts:
 312         try:
 313             if len(args) < 2:
 314                 msg = 'a module name or a comma-separated list of modules'
 315                 raise Exception('expected ' + msg)
 316 
 317             g = globals()
 318             from importlib import import_module
 319             for e in args[1].split(','):
 320                 g[e] = import_module(e)
 321 
 322             g = None
 323             import_module = None
 324             args = args[2:]
 325         except Exception as e:
 326             fail(e, 1)
 327 
 328         continue
 329 
 330     if args[0] in more_modules_opts:
 331         import functools, itertools, json, math, random, statistics, string, time
 332         args = args[1:]
 333         continue
 334 
 335     break
 336 
 337 
 338 try:
 339     if not expression or expression == '.':
 340         expression = 'data'
 341     expression = compile(expression, expression, 'eval')
 342 
 343     got_stdin = False
 344     all_stdin = None
 345     dashes = args.count('-')
 346 
 347     data = None
 348 
 349     if not load_input:
 350         emit_result(stdout.buffer, eval_expr(expression, None))
 351         exit(0)
 352 
 353     if any(seemsurl(name) for name in args):
 354         from urllib.request import urlopen
 355 
 356     for name in args:
 357         if name == '-':
 358             if dashes > 1:
 359                 if not got_stdin:
 360                     all_stdin = stdin.buffer.read()
 361                     got_stdin = True
 362                 data = all_stdin
 363             else:
 364                 data = stdin.buffer.read()
 365 
 366             if string_input:
 367                 data = s = str(data, encoding='utf-8')
 368         elif seemsurl(name):
 369             with urlopen(name) as inp:
 370                 data = inp.read()
 371         else:
 372             with open(name) as inp:
 373                 data = inp.read()
 374 
 375         if string_input:
 376             data = s = str(data, encoding='utf-8')
 377         emit_result(stdout.buffer, eval_expr(expression, data))
 378 
 379     if len(args) == 0:
 380         data = stdin.buffer.read()
 381         if string_input:
 382             data = s = str(data, encoding='utf-8')
 383         emit_result(stdout.buffer, eval_expr(expression, data))
 384 except BrokenPipeError:
 385     # quit quietly, instead of showing a confusing error message
 386     stderr.close()
 387     exit(0)
 388 except KeyboardInterrupt:
 389     exit(2)
 390 except Exception as e:
 391     s = str(e)
 392     s = s if s else '<generic exception>'
 393     print(f'\x1b[31m{s}\x1b[0m', file=stderr)
 394     exit(1)