File: tbp.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2020-2025 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 info = '''
  27 tbp [options...] [python expression] [files/URIs...]
  28 
  29 
  30 Transform Bytes with Python runs a python expression on each whole-input,
  31 read as a bytes-type value.
  32 
  33 The expression can use either `v`, `value`, `d`, or `data` for the current
  34 input.
  35 
  36 Input-sources can be either files or web-URIs. When not given any explicit
  37 named sources, the standard input is used. It's even possible to reuse the
  38 standard input using multiple single dashes (-) in the order needed: stdin
  39 is only read once in this case, and kept for later reuse.
  40 
  41 When the expression results in None, the current input is ignored. When the
  42 expression results in a boolean, this determines whether the whole input is
  43 copied/appended back to the standard output, or ignored.
  44 '''
  45 
  46 
  47 from sys import argv, exit, stderr, stdin, stdout
  48 from time import sleep
  49 from typing import Generator
  50 
  51 
  52 if len(argv) < 2:
  53     print(info.strip(), file=stderr)
  54     exit(0)
  55 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'):
  56     print(info.strip())
  57     exit(0)
  58 
  59 
  60 class Skip:
  61     pass
  62 
  63 
  64 skip = Skip()
  65 
  66 
  67 def cond(*args):
  68     if len(args) == 0:
  69         return None
  70 
  71     for i, e in enumerate(args):
  72         if i % 2 == 0 and i < len(args) - 1 and e:
  73             return args[i + 1]
  74 
  75     return args[-1] if len(args) % 2 == 1 else None
  76 
  77 
  78 def rescue(attempt, fallback = None):
  79     try:
  80         return attempt()
  81     except Exception as e:
  82         if callable(fallback):
  83             return fallback(e)
  84         return fallback
  85 
  86 catch = rescue
  87 catched = rescue
  88 caught = rescue
  89 recover = rescue
  90 recovered = rescue
  91 rescued = rescue
  92 
  93 
  94 def wait(seconds, result):
  95     t = (int, float)
  96     if (not isinstance(seconds, t)) and isinstance(result, t):
  97         seconds, result = result, seconds
  98     sleep(seconds)
  99     return result
 100 
 101 delay = wait
 102 
 103 
 104 no_input_opts = (
 105     '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null',
 106 )
 107 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string')
 108 
 109 args = argv[1:]
 110 load_input = True
 111 string_input = False
 112 expression = None
 113 
 114 # handle all other leading options; the explicit help options are
 115 # handled earlier in the script
 116 while len(args) > 0:
 117     if args[0] in no_input_opts:
 118         load_input = False
 119         args = args[1:]
 120         continue
 121 
 122     if args[0] in string_opts:
 123         string_input = True
 124         args = args[1:]
 125         continue
 126 
 127     break
 128 
 129 if len(args) > 0:
 130     expression = args[0]
 131     args = args[1:]
 132 
 133 if expression is None:
 134     print(info.strip(), file=stderr)
 135     exit(0)
 136 
 137 
 138 def make_open_read(open):
 139     'Restrict the file-open func to a read-only-binary file-open func.'
 140     def open_read(name):
 141         return open(name, mode='rb')
 142     return open_read
 143 
 144 
 145 def fail(msg, code = 1):
 146     print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr)
 147     exit(code)
 148 
 149 
 150 def message(msg, result = None):
 151     print(msg, file=stderr)
 152     return result
 153 
 154 msg = message
 155 
 156 
 157 def seemsurl(s):
 158     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
 159     return any(s.startswith(p) for p in protocols)
 160 
 161 
 162 def tobytes(x):
 163     if isinstance(x, (bytearray, bytes)):
 164         return x
 165     if isinstance(x, (bool, int)):
 166         return bytes((int(x), ))
 167     if isinstance(x, float):
 168         return bytes(str(x), encoding='utf-8')
 169     if isinstance(x, str):
 170         return bytes(x, encoding='utf-8')
 171     return bytes(x)
 172 
 173 
 174 def tointorbytes(x):
 175     return x if isinstance(x, int) else tobytes(x)
 176 
 177 
 178 def adapt_result(x, default):
 179     if x is True:
 180         return default
 181     if x is False:
 182         return None
 183 
 184     if isinstance(x, Skip):
 185         return None
 186 
 187     if callable(x):
 188         return x(default)
 189     return x
 190 
 191 
 192 def emit_result(w, x):
 193     if x is None:
 194         return
 195 
 196     if isinstance(x, int):
 197         w.write(tobytes(x))
 198         return
 199 
 200     if isinstance(x, (list, tuple, range, Generator)):
 201         for e in x:
 202             w.write(tobytes(e))
 203         return
 204 
 205     w.write(tobytes(x))
 206 
 207 
 208 def eval_expr(expr, using):
 209     global v, val, value, d, dat, data
 210     # offer several aliases for the variable with the input bytes
 211     v = val = value = d = dat = data = using
 212     return adapt_result(eval(expr), using)
 213 
 214 
 215 cr = '\r' if string_input else b'\r'
 216 crlf = '\r\n' if string_input else b'\r\n'
 217 dquo = '"' if string_input else b'"'
 218 dquote = '"' if string_input else b'"'
 219 empty = '' if string_input else b''
 220 lcurly = '{' if string_input else b'{'
 221 lf = '\n' if string_input else b'\n'
 222 rcurly = '}' if string_input else b'}'
 223 space = ' ' if string_input else b' '
 224 squo = '\'' if string_input else b'\''
 225 squote = '\'' if string_input else b'\''
 226 tab = '\t' if string_input else b'\t'
 227 utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf'
 228 
 229 nil = None
 230 none = None
 231 null = None
 232 
 233 exec = None
 234 open = make_open_read(open)
 235 
 236 modules_opts = (
 237     '-m', '--m', '-mod', '--mod', '-module', '--module',
 238     '-modules', '--modules',
 239 )
 240 more_modules_opts = ('-mm', '--mm', '-more', '--more')
 241 
 242 while len(args) > 0:
 243     if args[0] in no_input_opts:
 244         no_input = True
 245         args = args[1:]
 246         continue
 247 
 248     if args[0] in modules_opts:
 249         try:
 250             if len(args) < 2:
 251                 msg = 'a module name or a comma-separated list of modules'
 252                 raise Exception('expected ' + msg)
 253 
 254             g = globals()
 255             from importlib import import_module
 256             for e in args[1].split(','):
 257                 g[e] = import_module(e)
 258 
 259             g = None
 260             import_module = None
 261             args = args[2:]
 262         except Exception as e:
 263             fail(e, 1)
 264 
 265         continue
 266 
 267     if args[0] in more_modules_opts:
 268         import functools, itertools, json, math, random, statistics, string, time
 269         args = args[1:]
 270         continue
 271 
 272     break
 273 
 274 
 275 try:
 276     if not expression or expression == '.':
 277         expression = 'data'
 278     expression = compile(expression, expression, 'eval')
 279 
 280     got_stdin = False
 281     all_stdin = None
 282     dashes = args.count('-')
 283 
 284     data = None
 285 
 286     if not load_input:
 287         emit_result(stdout.buffer, eval_expr(expression, None))
 288         exit(0)
 289 
 290     if any(seemsurl(name) for name in args):
 291         from urllib.request import urlopen
 292 
 293     for name in args:
 294         if name == '-':
 295             if dashes > 1:
 296                 if not got_stdin:
 297                     all_stdin = stdin.buffer.read()
 298                     got_stdin = True
 299                 data = all_stdin
 300             else:
 301                 data = stdin.buffer.read()
 302 
 303             if string_input:
 304                 data = s = str(data, encoding='utf-8')
 305         elif seemsurl(name):
 306             with urlopen(name) as inp:
 307                 data = inp.read()
 308         else:
 309             with open(name) as inp:
 310                 data = inp.read()
 311 
 312         if string_input:
 313             data = s = str(data, encoding='utf-8')
 314         emit_result(stdout.buffer, eval_expr(expression, data))
 315 
 316     if len(args) == 0:
 317         data = stdin.buffer.read()
 318         if string_input:
 319             data = s = str(data, encoding='utf-8')
 320         emit_result(stdout.buffer, eval_expr(expression, data))
 321 except BrokenPipeError:
 322     # quit quietly, instead of showing a confusing error message
 323     stderr.close()
 324     exit(0)
 325 except KeyboardInterrupt:
 326     exit(2)
 327 except Exception as e:
 328     s = str(e)
 329     s = s if s else '<generic exception>'
 330     print(f'\x1b[31m{s}\x1b[0m', file=stderr)
 331     exit(1)