File: tbp.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2020-2025 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tbp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Bytes with Python runs a python expression on each whole-input,
  31 read as a bytes-type value.
  32 
  33 The expression can use either `v`, `value`, `d`, or `data` for the current
  34 input.
  35 
  36 Input-sources can be either files or web-URIs. When not given any explicit
  37 named sources, the standard input is used. It's even possible to reuse the
  38 standard input using multiple single dashes (-) in the order needed: stdin
  39 is only read once in this case, and kept for later reuse.
  40 
  41 When the expression results in None, the current input is ignored. When the
  42 expression results in a boolean, this determines whether the whole input is
  43 copied/appended back to the standard output, or ignored.
  44 '''
  45 
  46 
  47 from sys import argv, exit, stderr, stdin, stdout
  48 from time import sleep
  49 from typing import Generator
  50 
  51 
  52 if len(argv) < 2:
  53     print(info.strip(), file=stderr)
  54     exit(0)
  55 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'):
  56     print(info.strip())
  57     exit(0)
  58 
  59 
  60 class Skip:
  61     pass
  62 
  63 skip = Skip()
  64 
  65 def cond(*args):
  66     if len(args) == 0:
  67         return None
  68 
  69     for i, e in enumerate(args):
  70         if i % 2 == 0 and i < len(args) - 1 and e:
  71             return args[i + 1]
  72 
  73     return args[-1] if len(args) % 2 == 1 else None
  74 
  75 def rescue(attempt, fallback = None):
  76     try:
  77         return attempt()
  78     except Exception as e:
  79         if callable(fallback):
  80             return fallback(e)
  81         return fallback
  82 
  83 catch = rescue
  84 recover = rescue
  85 rescued = rescue
  86 
  87 def wait(seconds, result):
  88     t = (int, float)
  89     if (not isinstance(seconds, t)) and isinstance(result, t):
  90         seconds, result = result, seconds
  91     sleep(seconds)
  92     return result
  93 
  94 delay = wait
  95 
  96 def uint_big_endian(src, size, start=0):
  97     if not isinstance(src, bytes):
  98         return ValueError('can only get unsigned integers from bytes')
  99     if start + size >= len(src):
 100         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 101         raise ValueError(msg)
 102     return sum(int(src[start + i]) << (8 * i) for i in range(size, 0, -1))
 103 
 104 def uint16_be(src, start=0):
 105     return uint_big_endian(src, 2, start)
 106 
 107 uint16be = uint16_be
 108 
 109 def uint32_be(src, start=0):
 110     return uint_big_endian(src, 4, start)
 111 
 112 uint32be = uint32_be
 113 
 114 def uint64_be(src, start=0):
 115     return uint_big_endian(src, 8, start)
 116 
 117 uint64be = uint64_be
 118 
 119 def uint_little_endian(src, size, start=0):
 120     if not isinstance(src, bytes):
 121         return ValueError('can only get unsigned integers from bytes')
 122     if start + size >= len(src):
 123         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 124         raise ValueError(msg)
 125     return sum(int(src[start + i]) << (8 * i) for i in range(size))
 126 
 127 def uint16_le(src, start=0):
 128     return uint_little_endian(src, 2, start)
 129 
 130 uint16le = uint16_le
 131 
 132 def uint32_le(src, start=0):
 133     return uint_little_endian(src, 4, start)
 134 
 135 uint32le = uint32_le
 136 
 137 def uint64_le(src, start=0):
 138     return uint_little_endian(src, 8, start)
 139 
 140 uint64le = uint64_le
 141 
 142 
 143 no_input_opts = (
 144     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 145 )
 146 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string')
 147 
 148 args = argv[1:]
 149 load_input = True
 150 string_input = False
 151 expression = None
 152 
 153 # handle all other leading options; the explicit help options are
 154 # handled earlier in the script
 155 while len(args) > 0:
 156     if args[0] in no_input_opts:
 157         load_input = False
 158         args = args[1:]
 159         continue
 160 
 161     if args[0] in string_opts:
 162         string_input = True
 163         args = args[1:]
 164         continue
 165 
 166     break
 167 
 168 if len(args) > 0:
 169     expression = args[0]
 170     args = args[1:]
 171 
 172 if expression is None:
 173     print(info.strip(), file=stderr)
 174     exit(0)
 175 
 176 
 177 def make_open_read(open):
 178     'Restrict the file-open func to a read-only-binary file-open func.'
 179     def open_read(name):
 180         return open(name, mode='rb')
 181     return open_read
 182 
 183 
 184 def fail(msg, code = 1):
 185     print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr)
 186     exit(code)
 187 
 188 
 189 def message(msg, result = None):
 190     print(msg, file=stderr)
 191     return result
 192 
 193 msg = message
 194 
 195 
 196 def seemsurl(s):
 197     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 198     return any(s.startswith(p) for p in protocols)
 199 
 200 
 201 def tobytes(x):
 202     if isinstance(x, (bytearray, bytes)):
 203         return x
 204     if isinstance(x, (bool, int)):
 205         return bytes((int(x), ))
 206     if isinstance(x, float):
 207         return bytes(str(x), encoding='utf-8')
 208     if isinstance(x, str):
 209         return bytes(x, encoding='utf-8')
 210     return bytes(x)
 211 
 212 
 213 def tointorbytes(x):
 214     return x if isinstance(x, int) else tobytes(x)
 215 
 216 
 217 def adapt_result(x, default):
 218     if x is True:
 219         return default
 220     if x is False:
 221         return None
 222 
 223     if isinstance(x, Skip):
 224         return None
 225 
 226     if callable(x):
 227         return x(default)
 228     return x
 229 
 230 
 231 def emit_result(w, x):
 232     if x is None:
 233         return
 234 
 235     if isinstance(x, int):
 236         w.write(tobytes(x))
 237         return
 238 
 239     if isinstance(x, (list, tuple, range, Generator)):
 240         for e in x:
 241             w.write(tobytes(e))
 242         return
 243 
 244     w.write(tobytes(x))
 245 
 246 
 247 def eval_expr(expr, using):
 248     global v, val, value, d, dat, data
 249     # offer several aliases for the variable with the input bytes
 250     v = val = value = d = dat = data = using
 251     return adapt_result(eval(expr), using)
 252 
 253 
 254 cr = '\r' if string_input else b'\r'
 255 crlf = '\r\n' if string_input else b'\r\n'
 256 dquo = '"' if string_input else b'"'
 257 dquote = '"' if string_input else b'"'
 258 empty = '' if string_input else b''
 259 lcurly = '{' if string_input else b'{'
 260 lf = '\n' if string_input else b'\n'
 261 rcurly = '}' if string_input else b'}'
 262 space = ' ' if string_input else b' '
 263 squo = '\'' if string_input else b'\''
 264 squote = '\'' if string_input else b'\''
 265 tab = '\t' if string_input else b'\t'
 266 # utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf'
 267 if string_input:
 268     bom = {
 269         'utf8': '\xef\xbb\xbf',
 270         'utf16be': '\xfe\xff',
 271         'utf16le': '\xff\xfe',
 272         'utf32be': '\x00\x00\xfe\xff',
 273         'utf32le': '\xff\xfe\x00\x00',
 274     }
 275 else:
 276     bom = {
 277         'utf8': b'\xef\xbb\xbf',
 278         'utf16be': b'\xfe\xff',
 279         'utf16le': b'\xff\xfe',
 280         'utf32be': b'\x00\x00\xfe\xff',
 281         'utf32le': b'\xff\xfe\x00\x00',
 282     }
 283 
 284 nil = None
 285 none = None
 286 null = None
 287 
 288 exec = None
 289 open = make_open_read(open)
 290 
 291 modules_opts = (
 292     '-m', '--m', '-mod', '--mod', '-module', '--module',
 293     '-modules', '--modules',
 294 )
 295 more_modules_opts = ('-mm', '--mm', '-more', '--more')
 296 
 297 while len(args) > 0:
 298     if args[0] == '--':
 299         args = args[1:]
 300         break
 301 
 302     if args[0] in no_input_opts:
 303         no_input = True
 304         args = args[1:]
 305         continue
 306 
 307     if args[0] in modules_opts:
 308         try:
 309             if len(args) < 2:
 310                 msg = 'a module name or a comma-separated list of modules'
 311                 raise Exception('expected ' + msg)
 312 
 313             g = globals()
 314             from importlib import import_module
 315             for e in args[1].split(','):
 316                 g[e] = import_module(e)
 317 
 318             g = None
 319             import_module = None
 320             args = args[2:]
 321         except Exception as e:
 322             fail(e, 1)
 323 
 324         continue
 325 
 326     if args[0] in more_modules_opts:
 327         import functools, itertools, json, math, random, statistics, string, time
 328         args = args[1:]
 329         continue
 330 
 331     break
 332 
 333 
 334 try:
 335     if not expression or expression == '.':
 336         expression = 'data'
 337     expression = compile(expression, expression, 'eval')
 338 
 339     got_stdin = False
 340     all_stdin = None
 341     dashes = args.count('-')
 342 
 343     data = None
 344 
 345     if not load_input:
 346         emit_result(stdout.buffer, eval_expr(expression, None))
 347         exit(0)
 348 
 349     if any(seemsurl(name) for name in args):
 350         from urllib.request import urlopen
 351 
 352     for name in args:
 353         if name == '-':
 354             if dashes > 1:
 355                 if not got_stdin:
 356                     all_stdin = stdin.buffer.read()
 357                     got_stdin = True
 358                 data = all_stdin
 359             else:
 360                 data = stdin.buffer.read()
 361 
 362             if string_input:
 363                 data = s = str(data, encoding='utf-8')
 364         elif seemsurl(name):
 365             with urlopen(name) as inp:
 366                 data = inp.read()
 367         else:
 368             with open(name) as inp:
 369                 data = inp.read()
 370 
 371         if string_input:
 372             data = s = str(data, encoding='utf-8')
 373         emit_result(stdout.buffer, eval_expr(expression, data))
 374 
 375     if len(args) == 0:
 376         data = stdin.buffer.read()
 377         if string_input:
 378             data = s = str(data, encoding='utf-8')
 379         emit_result(stdout.buffer, eval_expr(expression, data))
 380 except BrokenPipeError:
 381     # quit quietly, instead of showing a confusing error message
 382     stderr.close()
 383     exit(0)
 384 except KeyboardInterrupt:
 385     exit(2)
 386 except Exception as e:
 387     s = str(e)
 388     s = s if s else '<generic exception>'
 389     print(f'\x1b[31m{s}\x1b[0m', file=stderr)
 390     exit(1)