File: tbp.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2026 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tbp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Bytes with Python runs a python expression on each whole-input,
  31 read as a bytes-type value.
  32 
  33 The expression can use either `v`, `value`, `d`, or `data` for the current
  34 input.
  35 
  36 Input-sources can be either files or web-URIs. When not given any explicit
  37 named sources, the standard input is used. It's even possible to reuse the
  38 standard input using multiple single dashes (-) in the order needed: stdin
  39 is only read once in this case, and kept for later reuse.
  40 
  41 When the expression results in None, the current input is ignored. When the
  42 expression results in a boolean, this determines whether the whole input is
  43 copied/appended back to the standard output, or ignored.
  44 '''
  45 
  46 
  47 from sys import argv, exit, stderr, stdin, stdout
  48 from time import sleep
  49 from typing import Generator
  50 
  51 
  52 if len(argv) < 2:
  53     print(info.strip(), file=stderr)
  54     exit(0)
  55 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'):
  56     print(info.strip())
  57     exit(0)
  58 
  59 
  60 class Skip:
  61     pass
  62 
  63 skip = Skip()
  64 
  65 def cond(*args):
  66     if len(args) == 0:
  67         return None
  68 
  69     for i, e in enumerate(args):
  70         if i % 2 == 0 and i < len(args) - 1 and e:
  71             return args[i + 1]
  72 
  73     return args[-1] if len(args) % 2 == 1 else None
  74 
  75 def rescue(attempt, fallback = None):
  76     try:
  77         return attempt()
  78     except BrokenPipeError as e:
  79         raise e
  80     except Exception as e:
  81         if callable(fallback):
  82             return fallback(e)
  83         return fallback
  84 
  85 rescued = rescue
  86 
  87 def wait(seconds, result):
  88     t = (int, float)
  89     if (not isinstance(seconds, t)) and isinstance(result, t):
  90         seconds, result = result, seconds
  91     sleep(seconds)
  92     return result
  93 
  94 delay = wait
  95 
  96 def uint_big_endian(src, size, start = 0):
  97     if not isinstance(src, bytes):
  98         return ValueError('can only get unsigned integers from bytes')
  99     if start + size >= len(src):
 100         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 101         raise ValueError(msg)
 102     return sum(int(src[start + i]) << (8 * i) for i in range(size, 0, -1))
 103 
 104 def uint16_be(src, start = 0):
 105     return uint_big_endian(src, 2, start)
 106 
 107 uint16be = uint16_be
 108 
 109 def uint32_be(src, start = 0):
 110     return uint_big_endian(src, 4, start)
 111 
 112 uint32be = uint32_be
 113 
 114 def uint64_be(src, start = 0):
 115     return uint_big_endian(src, 8, start)
 116 
 117 uint64be = uint64_be
 118 
 119 def uint_little_endian(src, size, start = 0):
 120     if not isinstance(src, bytes):
 121         return ValueError('can only get unsigned integers from bytes')
 122     if start + size >= len(src):
 123         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 124         raise ValueError(msg)
 125     return sum(int(src[start + i]) << (8 * i) for i in range(size))
 126 
 127 def uint16_le(src, start = 0):
 128     return uint_little_endian(src, 2, start)
 129 
 130 uint16le = uint16_le
 131 
 132 def uint32_le(src, start = 0):
 133     return uint_little_endian(src, 4, start)
 134 
 135 uint32le = uint32_le
 136 
 137 def uint64_le(src, start = 0):
 138     return uint_little_endian(src, 8, start)
 139 
 140 uint64le = uint64_le
 141 
 142 def make_open_read(open):
 143     'Restrict the file-open func to a read-only-binary file-open func.'
 144     def open_read(name):
 145         return open(name, mode='rb')
 146     return open_read
 147 
 148 def fail(msg, code = 1):
 149     print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr)
 150     exit(code)
 151 
 152 def message(msg, result = None):
 153     print(msg, file=stderr)
 154     return result
 155 
 156 msg = message
 157 
 158 def seemsurl(s):
 159     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 160     return any(s.startswith(p) for p in protocols)
 161 
 162 def tobytes(x):
 163     if isinstance(x, (bytearray, bytes)):
 164         return x
 165     if isinstance(x, (bool, int)):
 166         return bytes((int(x), ))
 167     if isinstance(x, float):
 168         return bytes(str(x), encoding='utf-8')
 169     if isinstance(x, str):
 170         return bytes(x, encoding='utf-8')
 171     return bytes(x)
 172 
 173 def tointorbytes(x):
 174     return x if isinstance(x, int) else tobytes(x)
 175 
 176 def adapt_result(x, default):
 177     if x is True:
 178         return default
 179     if x is False:
 180         return None
 181 
 182     if isinstance(x, Skip):
 183         return None
 184 
 185     if callable(x):
 186         return x(default)
 187     return x
 188 
 189 def emit_result(w, x):
 190     if x is None:
 191         return
 192 
 193     if isinstance(x, int):
 194         w.write(tobytes(x))
 195         return
 196 
 197     if isinstance(x, (list, tuple, range, Generator)):
 198         for e in x:
 199             w.write(tobytes(e))
 200         return
 201 
 202     w.write(tobytes(x))
 203 
 204 def eval_expr(expr, using):
 205     global v, val, value, d, dat, data
 206     # offer several aliases for the variable with the input bytes
 207     v = val = value = d = dat = data = using
 208     return adapt_result(eval(expr), using)
 209 
 210 
 211 nil = none = null = None
 212 
 213 
 214 exec = None
 215 open = make_open_read(open)
 216 
 217 no_input_opts = (
 218     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 219 )
 220 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string')
 221 modules_opts = (
 222     '-m', '--m', '-mod', '--mod', '-module', '--module',
 223     '-modules', '--modules',
 224 )
 225 more_modules_opts = ('-mm', '--mm', '-more', '--more')
 226 
 227 args = argv[1:]
 228 load_input = True
 229 string_input = False
 230 expression = None
 231 
 232 while len(args) > 0:
 233     if args[0] == '--':
 234         args = args[1:]
 235         break
 236 
 237     if args[0] in no_input_opts:
 238         load_input = False
 239         args = args[1:]
 240         continue
 241 
 242     if args[0] in string_opts:
 243         string_input = True
 244         args = args[1:]
 245         continue
 246 
 247     if args[0] in no_input_opts:
 248         no_input = True
 249         args = args[1:]
 250         continue
 251 
 252     if args[0] in modules_opts:
 253         try:
 254             if len(args) < 2:
 255                 msg = 'a module name or a comma-separated list of modules'
 256                 raise Exception('expected ' + msg)
 257 
 258             g = globals()
 259             from importlib import import_module
 260             for e in args[1].split(','):
 261                 g[e] = import_module(e)
 262 
 263             g = None
 264             import_module = None
 265             args = args[2:]
 266         except Exception as e:
 267             fail(e, 1)
 268 
 269         continue
 270 
 271     if args[0] in more_modules_opts:
 272         import functools, itertools, json, math, random, statistics, string, time
 273         args = args[1:]
 274         continue
 275 
 276     break
 277 
 278 cr = '\r' if string_input else b'\r'
 279 crlf = '\r\n' if string_input else b'\r\n'
 280 dquo = dquote = '"' if string_input else b'"'
 281 empty = '' if string_input else b''
 282 lcurly = '{' if string_input else b'{'
 283 lf = '\n' if string_input else b'\n'
 284 rcurly = '}' if string_input else b'}'
 285 space = ' ' if string_input else b' '
 286 squo = squote = '\'' if string_input else b'\''
 287 tab = '\t' if string_input else b'\t'
 288 utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf'
 289 if string_input:
 290     bom = {
 291         'utf8': '\xef\xbb\xbf',
 292         'utf16be': '\xfe\xff',
 293         'utf16le': '\xff\xfe',
 294         'utf32be': '\x00\x00\xfe\xff',
 295         'utf32le': '\xff\xfe\x00\x00',
 296     }
 297 else:
 298     bom = {
 299         'utf8': b'\xef\xbb\xbf',
 300         'utf16be': b'\xfe\xff',
 301         'utf16le': b'\xff\xfe',
 302         'utf32be': b'\x00\x00\xfe\xff',
 303         'utf32le': b'\xff\xfe\x00\x00',
 304     }
 305 
 306 if len(args) > 0:
 307     expression = args[0]
 308     args = args[1:]
 309 
 310 if expression is None:
 311     print(info.strip(), file=stderr)
 312     exit(0)
 313 
 314 try:
 315     if not expression or expression == '.':
 316         expression = 'data'
 317     expression = compile(expression, expression, 'eval')
 318 
 319     got_stdin = False
 320     all_stdin = None
 321     dashes = args.count('-')
 322 
 323     data = None
 324 
 325     if not load_input:
 326         emit_result(stdout.buffer, eval_expr(expression, None))
 327         exit(0)
 328 
 329     if any(seemsurl(name) for name in args):
 330         from urllib.request import urlopen
 331 
 332     for name in args:
 333         if name == '-':
 334             if dashes > 1:
 335                 if not got_stdin:
 336                     all_stdin = stdin.buffer.read()
 337                     got_stdin = True
 338                 data = all_stdin
 339             else:
 340                 data = stdin.buffer.read()
 341 
 342             if string_input:
 343                 data = s = str(data, encoding='utf-8')
 344         elif seemsurl(name):
 345             with urlopen(name) as inp:
 346                 data = inp.read()
 347         else:
 348             with open(name) as inp:
 349                 data = inp.read()
 350 
 351         if string_input:
 352             data = s = str(data, encoding='utf-8')
 353         emit_result(stdout.buffer, eval_expr(expression, data))
 354 
 355     if len(args) == 0:
 356         data = stdin.buffer.read()
 357         if string_input:
 358             data = s = str(data, encoding='utf-8')
 359         emit_result(stdout.buffer, eval_expr(expression, data))
 360 except BrokenPipeError:
 361     # quit quietly, instead of showing a confusing error message
 362     stderr.close()
 363     exit(0)
 364 except KeyboardInterrupt:
 365     exit(2)
 366 except Exception as e:
 367     s = str(e)
 368     s = s if s else '<generic exception>'
 369     print(s, file=stderr)
 370     exit(1)