File: tbp.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2020-2025 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tbp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Bytes with Python runs a python expression on each whole-input,
  31 read as a bytes-type value.
  32 
  33 The expression can use either `v`, `value`, `d`, or `data` for the current
  34 input.
  35 
  36 Input-sources can be either files or web-URIs. When not given any explicit
  37 named sources, the standard input is used. It's even possible to reuse the
  38 standard input using multiple single dashes (-) in the order needed: stdin
  39 is only read once in this case, and kept for later reuse.
  40 
  41 When the expression results in None, the current input is ignored. When the
  42 expression results in a boolean, this determines whether the whole input is
  43 copied/appended back to the standard output, or ignored.
  44 '''
  45 
  46 
  47 from sys import argv, exit, stderr, stdin, stdout
  48 from time import sleep
  49 from typing import Generator
  50 
  51 
  52 if len(argv) < 2:
  53     print(info.strip(), file=stderr)
  54     exit(0)
  55 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'):
  56     print(info.strip())
  57     exit(0)
  58 
  59 
  60 class Skip:
  61     pass
  62 
  63 skip = Skip()
  64 
  65 def cond(*args):
  66     if len(args) == 0:
  67         return None
  68 
  69     for i, e in enumerate(args):
  70         if i % 2 == 0 and i < len(args) - 1 and e:
  71             return args[i + 1]
  72 
  73     return args[-1] if len(args) % 2 == 1 else None
  74 
  75 def rescue(attempt, fallback = None):
  76     try:
  77         return attempt()
  78     except Exception as e:
  79         if callable(fallback):
  80             return fallback(e)
  81         return fallback
  82 
  83 catch = recover = recovered = rescued = rescue
  84 
  85 def wait(seconds, result):
  86     t = (int, float)
  87     if (not isinstance(seconds, t)) and isinstance(result, t):
  88         seconds, result = result, seconds
  89     sleep(seconds)
  90     return result
  91 
  92 delay = wait
  93 
  94 def uint_big_endian(src, size, start = 0):
  95     if not isinstance(src, bytes):
  96         return ValueError('can only get unsigned integers from bytes')
  97     if start + size >= len(src):
  98         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
  99         raise ValueError(msg)
 100     return sum(int(src[start + i]) << (8 * i) for i in range(size, 0, -1))
 101 
 102 def uint16_be(src, start = 0):
 103     return uint_big_endian(src, 2, start)
 104 
 105 uint16be = uint16_be
 106 
 107 def uint32_be(src, start = 0):
 108     return uint_big_endian(src, 4, start)
 109 
 110 uint32be = uint32_be
 111 
 112 def uint64_be(src, start = 0):
 113     return uint_big_endian(src, 8, start)
 114 
 115 uint64be = uint64_be
 116 
 117 def uint_little_endian(src, size, start = 0):
 118     if not isinstance(src, bytes):
 119         return ValueError('can only get unsigned integers from bytes')
 120     if start + size >= len(src):
 121         msg = f'not enough bytes for {8 * size}-bit unsigned integers'
 122         raise ValueError(msg)
 123     return sum(int(src[start + i]) << (8 * i) for i in range(size))
 124 
 125 def uint16_le(src, start = 0):
 126     return uint_little_endian(src, 2, start)
 127 
 128 uint16le = uint16_le
 129 
 130 def uint32_le(src, start = 0):
 131     return uint_little_endian(src, 4, start)
 132 
 133 uint32le = uint32_le
 134 
 135 def uint64_le(src, start = 0):
 136     return uint_little_endian(src, 8, start)
 137 
 138 uint64le = uint64_le
 139 
 140 def make_open_read(open):
 141     'Restrict the file-open func to a read-only-binary file-open func.'
 142     def open_read(name):
 143         return open(name, mode='rb')
 144     return open_read
 145 
 146 def fail(msg, code = 1):
 147     print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr)
 148     exit(code)
 149 
 150 def message(msg, result = None):
 151     print(msg, file=stderr)
 152     return result
 153 
 154 msg = message
 155 
 156 def seemsurl(s):
 157     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 158     return any(s.startswith(p) for p in protocols)
 159 
 160 def tobytes(x):
 161     if isinstance(x, (bytearray, bytes)):
 162         return x
 163     if isinstance(x, (bool, int)):
 164         return bytes((int(x), ))
 165     if isinstance(x, float):
 166         return bytes(str(x), encoding='utf-8')
 167     if isinstance(x, str):
 168         return bytes(x, encoding='utf-8')
 169     return bytes(x)
 170 
 171 def tointorbytes(x):
 172     return x if isinstance(x, int) else tobytes(x)
 173 
 174 def adapt_result(x, default):
 175     if x is True:
 176         return default
 177     if x is False:
 178         return None
 179 
 180     if isinstance(x, Skip):
 181         return None
 182 
 183     if callable(x):
 184         return x(default)
 185     return x
 186 
 187 def emit_result(w, x):
 188     if x is None:
 189         return
 190 
 191     if isinstance(x, int):
 192         w.write(tobytes(x))
 193         return
 194 
 195     if isinstance(x, (list, tuple, range, Generator)):
 196         for e in x:
 197             w.write(tobytes(e))
 198         return
 199 
 200     w.write(tobytes(x))
 201 
 202 def eval_expr(expr, using):
 203     global v, val, value, d, dat, data
 204     # offer several aliases for the variable with the input bytes
 205     v = val = value = d = dat = data = using
 206     return adapt_result(eval(expr), using)
 207 
 208 
 209 nil = None
 210 none = None
 211 null = None
 212 
 213 exec = None
 214 open = make_open_read(open)
 215 
 216 no_input_opts = (
 217     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 218 )
 219 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string')
 220 modules_opts = (
 221     '-m', '--m', '-mod', '--mod', '-module', '--module',
 222     '-modules', '--modules',
 223 )
 224 more_modules_opts = ('-mm', '--mm', '-more', '--more')
 225 
 226 args = argv[1:]
 227 load_input = True
 228 string_input = False
 229 expression = None
 230 
 231 while len(args) > 0:
 232     if args[0] == '--':
 233         args = args[1:]
 234         break
 235 
 236     if args[0] in no_input_opts:
 237         load_input = False
 238         args = args[1:]
 239         continue
 240 
 241     if args[0] in string_opts:
 242         string_input = True
 243         args = args[1:]
 244         continue
 245 
 246     if args[0] in no_input_opts:
 247         no_input = True
 248         args = args[1:]
 249         continue
 250 
 251     if args[0] in modules_opts:
 252         try:
 253             if len(args) < 2:
 254                 msg = 'a module name or a comma-separated list of modules'
 255                 raise Exception('expected ' + msg)
 256 
 257             g = globals()
 258             from importlib import import_module
 259             for e in args[1].split(','):
 260                 g[e] = import_module(e)
 261 
 262             g = None
 263             import_module = None
 264             args = args[2:]
 265         except Exception as e:
 266             fail(e, 1)
 267 
 268         continue
 269 
 270     if args[0] in more_modules_opts:
 271         import functools, itertools, json, math, random, statistics, string, time
 272         args = args[1:]
 273         continue
 274 
 275     break
 276 
 277 cr = '\r' if string_input else b'\r'
 278 crlf = '\r\n' if string_input else b'\r\n'
 279 dquo = '"' if string_input else b'"'
 280 dquote = '"' if string_input else b'"'
 281 empty = '' if string_input else b''
 282 lcurly = '{' if string_input else b'{'
 283 lf = '\n' if string_input else b'\n'
 284 rcurly = '}' if string_input else b'}'
 285 space = ' ' if string_input else b' '
 286 squo = '\'' if string_input else b'\''
 287 squote = '\'' if string_input else b'\''
 288 tab = '\t' if string_input else b'\t'
 289 utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf'
 290 if string_input:
 291     bom = {
 292         'utf8': '\xef\xbb\xbf',
 293         'utf16be': '\xfe\xff',
 294         'utf16le': '\xff\xfe',
 295         'utf32be': '\x00\x00\xfe\xff',
 296         'utf32le': '\xff\xfe\x00\x00',
 297     }
 298 else:
 299     bom = {
 300         'utf8': b'\xef\xbb\xbf',
 301         'utf16be': b'\xfe\xff',
 302         'utf16le': b'\xff\xfe',
 303         'utf32be': b'\x00\x00\xfe\xff',
 304         'utf32le': b'\xff\xfe\x00\x00',
 305     }
 306 
 307 if len(args) > 0:
 308     expression = args[0]
 309     args = args[1:]
 310 
 311 if expression is None:
 312     print(info.strip(), file=stderr)
 313     exit(0)
 314 
 315 try:
 316     if not expression or expression == '.':
 317         expression = 'data'
 318     expression = compile(expression, expression, 'eval')
 319 
 320     got_stdin = False
 321     all_stdin = None
 322     dashes = args.count('-')
 323 
 324     data = None
 325 
 326     if not load_input:
 327         emit_result(stdout.buffer, eval_expr(expression, None))
 328         exit(0)
 329 
 330     if any(seemsurl(name) for name in args):
 331         from urllib.request import urlopen
 332 
 333     for name in args:
 334         if name == '-':
 335             if dashes > 1:
 336                 if not got_stdin:
 337                     all_stdin = stdin.buffer.read()
 338                     got_stdin = True
 339                 data = all_stdin
 340             else:
 341                 data = stdin.buffer.read()
 342 
 343             if string_input:
 344                 data = s = str(data, encoding='utf-8')
 345         elif seemsurl(name):
 346             with urlopen(name) as inp:
 347                 data = inp.read()
 348         else:
 349             with open(name) as inp:
 350                 data = inp.read()
 351 
 352         if string_input:
 353             data = s = str(data, encoding='utf-8')
 354         emit_result(stdout.buffer, eval_expr(expression, data))
 355 
 356     if len(args) == 0:
 357         data = stdin.buffer.read()
 358         if string_input:
 359             data = s = str(data, encoding='utf-8')
 360         emit_result(stdout.buffer, eval_expr(expression, data))
 361 except BrokenPipeError:
 362     # quit quietly, instead of showing a confusing error message
 363     stderr.close()
 364     exit(0)
 365 except KeyboardInterrupt:
 366     exit(2)
 367 except Exception as e:
 368     s = str(e)
 369     s = s if s else '<generic exception>'
 370     print(f'\x1b[31m{s}\x1b[0m', file=stderr)
 371     exit(1)