File: tbp.py
   1 #!/usr/bin/python
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright (c) 2026 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the "Software"), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tbp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Bytes with Python runs a python expression on each whole-input,
  31 read as a bytes-type value.
  32 
  33 The expression can use either `v`, `value`, `d`, or `data` for the current
  34 input.
  35 
  36 Input-sources can be either files or web-URIs. When not given any explicit
  37 named sources, the standard input is used. It's even possible to reuse the
  38 standard input using multiple single dashes (-) in the order needed: stdin
  39 is only read once in this case, and kept for later reuse.
  40 
  41 When the expression results in None, the current input is ignored. When the
  42 expression results in a boolean, this determines whether the whole input is
  43 copied/appended back to the standard output, or ignored.
  44 '''
  45 
  46 
  47 from sys import argv, exit, stderr, stdin, stdout
  48 from time import sleep
  49 from typing import Generator
  50 
  51 
  52 if len(argv) < 2:
  53     print(info.strip(), file=stderr)
  54     exit(0)
  55 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'):
  56     print(info.strip())
  57     exit(0)
  58 
  59 
  60 class Skip:
  61     pass
  62 
  63 skip = Skip()
  64 
  65 def cond(*args):
  66     if len(args) == 0:
  67         return None
  68 
  69     for i, e in enumerate(args):
  70         if i % 2 == 0 and i < len(args) - 1 and e:
  71             return args[i + 1]
  72 
  73     return args[-1] if len(args) % 2 == 1 else None
  74 
  75 def maybe(f, x):
  76     try:
  77         return f(x)
  78     except Exception as _:
  79         return x
  80 
  81 def number(x):
  82     try:
  83         return int(x)
  84     except Exception as _:
  85         pass
  86     try:
  87         return float(x)
  88     except Exception as _:
  89         return x
  90 
  91 def rescue(attempt, fallback = None):
  92     try:
  93         return attempt()
  94     except BrokenPipeError as e:
  95         raise e
  96     except Exception as e:
  97         if callable(fallback):
  98             return fallback(e)
  99         return fallback
 100 
 101 rescued = rescue
 102 
 103 def wait(seconds, result):
 104     t = (int, float)
 105     if (not isinstance(seconds, t)) and isinstance(result, t):
 106         seconds, result = result, seconds
 107     sleep(seconds)
 108     return result
 109 
 110 delay = wait
 111 
 112 def uint_big_endian(src, size, start = 0):
 113     if not isinstance(src, bytes):
 114         return ValueError('can only get unsigned integers from bytes')
 115     if start + size >= len(src):
 116         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 117         raise ValueError(msg)
 118     return sum(int(src[start + i]) << (8 * i) for i in range(size, 0, -1))
 119 
 120 def uint16_be(src, start = 0):
 121     return uint_big_endian(src, 2, start)
 122 
 123 uint16be = uint16_be
 124 
 125 def uint32_be(src, start = 0):
 126     return uint_big_endian(src, 4, start)
 127 
 128 uint32be = uint32_be
 129 
 130 def uint64_be(src, start = 0):
 131     return uint_big_endian(src, 8, start)
 132 
 133 uint64be = uint64_be
 134 
 135 def uint_little_endian(src, size, start = 0):
 136     if not isinstance(src, bytes):
 137         return ValueError('can only get unsigned integers from bytes')
 138     if start + size >= len(src):
 139         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 140         raise ValueError(msg)
 141     return sum(int(src[start + i]) << (8 * i) for i in range(size))
 142 
 143 def uint16_le(src, start = 0):
 144     return uint_little_endian(src, 2, start)
 145 
 146 uint16le = uint16_le
 147 
 148 def uint32_le(src, start = 0):
 149     return uint_little_endian(src, 4, start)
 150 
 151 uint32le = uint32_le
 152 
 153 def uint64_le(src, start = 0):
 154     return uint_little_endian(src, 8, start)
 155 
 156 uint64le = uint64_le
 157 
 158 def make_open_read(open):
 159     'Restrict the file-open func to a read-only-binary file-open func.'
 160     def open_read(name):
 161         return open(name, mode='rb')
 162     return open_read
 163 
 164 def fail(msg, code = 1):
 165     print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr)
 166     exit(code)
 167 
 168 def message(msg, result = None):
 169     print(msg, file=stderr)
 170     return result
 171 
 172 msg = message
 173 
 174 def seemsurl(s):
 175     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 176     return any(s.startswith(p) for p in protocols)
 177 
 178 def tobytes(x):
 179     if isinstance(x, (bytearray, bytes)):
 180         return x
 181     if isinstance(x, (bool, int)):
 182         return bytes([int(x)])
 183     if isinstance(x, float):
 184         return bytes(str(x), encoding='utf-8')
 185     if isinstance(x, str):
 186         return bytes(x, encoding='utf-8')
 187     return bytes(x)
 188 
 189 def tointorbytes(x):
 190     return x if isinstance(x, int) else tobytes(x)
 191 
 192 def adapt_result(x, default):
 193     if x is True:
 194         return default
 195     if x is False:
 196         return None
 197 
 198     if isinstance(x, Skip):
 199         return None
 200 
 201     if callable(x):
 202         return x(default)
 203     return x
 204 
 205 def emit_result(w, x):
 206     if x is None:
 207         return
 208 
 209     if isinstance(x, int):
 210         w.write(tobytes(x))
 211         return
 212 
 213     if isinstance(x, (list, tuple, range, Generator)):
 214         for e in x:
 215             w.write(tobytes(e))
 216         return
 217 
 218     w.write(tobytes(x))
 219 
 220 def eval_expr(expr, using):
 221     global v, val, value, d, dat, data
 222     # offer several aliases for the variable with the input bytes
 223     v = val = value = d = dat = data = using
 224     return adapt_result(eval(expr), using)
 225 
 226 
 227 nil = none = null = None
 228 
 229 
 230 exec = None
 231 open = make_open_read(open)
 232 
 233 no_input_opts = (
 234     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 235 )
 236 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string')
 237 modules_opts = (
 238     '-m', '--m', '-mod', '--mod', '-module', '--module',
 239     '-modules', '--modules',
 240 )
 241 more_modules_opts = ('-mm', '--mm', '-more', '--more')
 242 
 243 args = argv[1:]
 244 load_input = True
 245 string_input = False
 246 expression = None
 247 
 248 while len(args) > 0:
 249     if args[0] == '--':
 250         args = args[1:]
 251         break
 252 
 253     if args[0] in no_input_opts:
 254         load_input = False
 255         args = args[1:]
 256         continue
 257 
 258     if args[0] in string_opts:
 259         string_input = True
 260         args = args[1:]
 261         continue
 262 
 263     if args[0] in no_input_opts:
 264         no_input = True
 265         args = args[1:]
 266         continue
 267 
 268     if args[0] in modules_opts:
 269         try:
 270             if len(args) < 2:
 271                 msg = 'a module name or a comma-separated list of modules'
 272                 raise Exception('expected ' + msg)
 273 
 274             g = globals()
 275             from importlib import import_module
 276             for e in args[1].split(','):
 277                 g[e] = import_module(e)
 278 
 279             g = None
 280             import_module = None
 281             args = args[2:]
 282         except Exception as e:
 283             fail(e, 1)
 284 
 285         continue
 286 
 287     if args[0] in more_modules_opts:
 288         import functools, itertools, json, math, random, statistics, string, time
 289         args = args[1:]
 290         continue
 291 
 292     break
 293 
 294 cr = '\r' if string_input else b'\r'
 295 crlf = '\r\n' if string_input else b'\r\n'
 296 dquo = dquote = '"' if string_input else b'"'
 297 empty = '' if string_input else b''
 298 lcurly = '{' if string_input else b'{'
 299 lf = '\n' if string_input else b'\n'
 300 rcurly = '}' if string_input else b'}'
 301 space = ' ' if string_input else b' '
 302 squo = squote = '\'' if string_input else b'\''
 303 tab = '\t' if string_input else b'\t'
 304 utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf'
 305 if string_input:
 306     bom = {
 307         'utf8': '\xef\xbb\xbf',
 308         'utf16be': '\xfe\xff',
 309         'utf16le': '\xff\xfe',
 310         'utf32be': '\x00\x00\xfe\xff',
 311         'utf32le': '\xff\xfe\x00\x00',
 312     }
 313 else:
 314     bom = {
 315         'utf8': b'\xef\xbb\xbf',
 316         'utf16be': b'\xfe\xff',
 317         'utf16le': b'\xff\xfe',
 318         'utf32be': b'\x00\x00\xfe\xff',
 319         'utf32le': b'\xff\xfe\x00\x00',
 320     }
 321 
 322 if len(args) > 0:
 323     expression = args[0]
 324     args = args[1:]
 325 
 326 if expression is None:
 327     print(info.strip(), file=stderr)
 328     exit(0)
 329 
 330 try:
 331     if not expression or expression == '.':
 332         expression = 'data'
 333     expression = compile(expression, expression, 'eval')
 334 
 335     got_stdin = False
 336     all_stdin = None
 337     dashes = args.count('-')
 338 
 339     data = None
 340 
 341     if not load_input:
 342         emit_result(stdout.buffer, eval_expr(expression, None))
 343         exit(0)
 344 
 345     if any(seemsurl(name) for name in args):
 346         from urllib.request import urlopen
 347 
 348     for name in args:
 349         if name == '-':
 350             if dashes > 1:
 351                 if not got_stdin:
 352                     all_stdin = stdin.buffer.read()
 353                     got_stdin = True
 354                 data = all_stdin
 355             else:
 356                 data = stdin.buffer.read()
 357 
 358             if string_input:
 359                 data = s = str(data, encoding='utf-8')
 360         elif seemsurl(name):
 361             with urlopen(name) as inp:
 362                 data = inp.read()
 363         else:
 364             with open(name) as inp:
 365                 data = inp.read()
 366 
 367         if string_input:
 368             data = s = str(data, encoding='utf-8')
 369         emit_result(stdout.buffer, eval_expr(expression, data))
 370 
 371     if len(args) == 0:
 372         data = stdin.buffer.read()
 373         if string_input:
 374             data = s = str(data, encoding='utf-8')
 375         emit_result(stdout.buffer, eval_expr(expression, data))
 376 except BrokenPipeError:
 377     # quit quietly, instead of showing a confusing error message
 378     stderr.close()
 379     exit(0)
 380 except KeyboardInterrupt:
 381     exit(2)
 382 except Exception as e:
 383     s = str(e)
 384     s = s if s else '<generic exception>'
 385     print(s, file=stderr)
 386     exit(1)