File: tbp.py 1 #!/usr/bin/python3 2 3 # The MIT License (MIT) 4 # 5 # Copyright © 2020-2025 pacman64 6 # 7 # Permission is hereby granted, free of charge, to any person obtaining a copy 8 # of this software and associated documentation files (the “Software”), to deal 9 # in the Software without restriction, including without limitation the rights 10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 # copies of the Software, and to permit persons to whom the Software is 12 # furnished to do so, subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be included in 15 # all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23 # SOFTWARE. 24 25 26 info = ''' 27 tbp [options...] [python expression] [files/URIs...] 28 29 30 Transform Bytes with Python runs a python expression on each whole-input, 31 read as a bytes-type value. 32 33 The expression can use either `v`, `value`, `d`, or `data` for the current 34 input. 35 36 Input-sources can be either files or web-URIs. When not given any explicit 37 named sources, the standard input is used. It's even possible to reuse the 38 standard input using multiple single dashes (-) in the order needed: stdin 39 is only read once in this case, and kept for later reuse. 40 41 When the expression results in None, the current input is ignored. When the 42 expression results in a boolean, this determines whether the whole input is 43 copied/appended back to the standard output, or ignored. 44 ''' 45 46 47 from sys import argv, exit, stderr, stdin, stdout 48 from time import sleep 49 from typing import Generator 50 51 52 if len(argv) < 2: 53 print(info.strip(), file=stderr) 54 exit(0) 55 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'): 56 print(info.strip()) 57 exit(0) 58 59 60 class Skip: 61 pass 62 63 skip = Skip() 64 65 def cond(*args): 66 if len(args) == 0: 67 return None 68 69 for i, e in enumerate(args): 70 if i % 2 == 0 and i < len(args) - 1 and e: 71 return args[i + 1] 72 73 return args[-1] if len(args) % 2 == 1 else None 74 75 def rescue(attempt, fallback = None): 76 try: 77 return attempt() 78 except Exception as e: 79 if callable(fallback): 80 return fallback(e) 81 return fallback 82 83 catch = recover = recovered = rescued = rescue 84 85 def wait(seconds, result): 86 t = (int, float) 87 if (not isinstance(seconds, t)) and isinstance(result, t): 88 seconds, result = result, seconds 89 sleep(seconds) 90 return result 91 92 delay = wait 93 94 def uint_big_endian(src, size, start = 0): 95 if not isinstance(src, bytes): 96 return ValueError('can only get unsigned integers from bytes') 97 if start + size >= len(src): 98 msg = f'not enough bytes for {8 * size}-bit unsigned integers' 99 raise ValueError(msg) 100 return sum(int(src[start + i]) << (8 * i) for i in range(size, 0, -1)) 101 102 def uint16_be(src, start = 0): 103 return uint_big_endian(src, 2, start) 104 105 uint16be = uint16_be 106 107 def uint32_be(src, start = 0): 108 return uint_big_endian(src, 4, start) 109 110 uint32be = uint32_be 111 112 def uint64_be(src, start = 0): 113 return uint_big_endian(src, 8, start) 114 115 uint64be = uint64_be 116 117 def uint_little_endian(src, size, start = 0): 118 if not isinstance(src, bytes): 119 return ValueError('can only get unsigned integers from bytes') 120 if start + size >= len(src): 121 msg = f'not enough bytes for {8 * size}-bit unsigned integers' 122 raise ValueError(msg) 123 return sum(int(src[start + i]) << (8 * i) for i in range(size)) 124 125 def uint16_le(src, start = 0): 126 return uint_little_endian(src, 2, start) 127 128 uint16le = uint16_le 129 130 def uint32_le(src, start = 0): 131 return uint_little_endian(src, 4, start) 132 133 uint32le = uint32_le 134 135 def uint64_le(src, start = 0): 136 return uint_little_endian(src, 8, start) 137 138 uint64le = uint64_le 139 140 def make_open_read(open): 141 'Restrict the file-open func to a read-only-binary file-open func.' 142 def open_read(name): 143 return open(name, mode='rb') 144 return open_read 145 146 def fail(msg, code = 1): 147 print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr) 148 exit(code) 149 150 def message(msg, result = None): 151 print(msg, file=stderr) 152 return result 153 154 msg = message 155 156 def seemsurl(s): 157 protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') 158 return any(s.startswith(p) for p in protocols) 159 160 def tobytes(x): 161 if isinstance(x, (bytearray, bytes)): 162 return x 163 if isinstance(x, (bool, int)): 164 return bytes((int(x), )) 165 if isinstance(x, float): 166 return bytes(str(x), encoding='utf-8') 167 if isinstance(x, str): 168 return bytes(x, encoding='utf-8') 169 return bytes(x) 170 171 def tointorbytes(x): 172 return x if isinstance(x, int) else tobytes(x) 173 174 def adapt_result(x, default): 175 if x is True: 176 return default 177 if x is False: 178 return None 179 180 if isinstance(x, Skip): 181 return None 182 183 if callable(x): 184 return x(default) 185 return x 186 187 def emit_result(w, x): 188 if x is None: 189 return 190 191 if isinstance(x, int): 192 w.write(tobytes(x)) 193 return 194 195 if isinstance(x, (list, tuple, range, Generator)): 196 for e in x: 197 w.write(tobytes(e)) 198 return 199 200 w.write(tobytes(x)) 201 202 def eval_expr(expr, using): 203 global v, val, value, d, dat, data 204 # offer several aliases for the variable with the input bytes 205 v = val = value = d = dat = data = using 206 return adapt_result(eval(expr), using) 207 208 209 nil = None 210 none = None 211 null = None 212 213 exec = None 214 open = make_open_read(open) 215 216 no_input_opts = ( 217 '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null', 218 ) 219 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string') 220 modules_opts = ( 221 '-m', '--m', '-mod', '--mod', '-module', '--module', 222 '-modules', '--modules', 223 ) 224 more_modules_opts = ('-mm', '--mm', '-more', '--more') 225 226 args = argv[1:] 227 load_input = True 228 string_input = False 229 expression = None 230 231 while len(args) > 0: 232 if args[0] == '--': 233 args = args[1:] 234 break 235 236 if args[0] in no_input_opts: 237 load_input = False 238 args = args[1:] 239 continue 240 241 if args[0] in string_opts: 242 string_input = True 243 args = args[1:] 244 continue 245 246 if args[0] in no_input_opts: 247 no_input = True 248 args = args[1:] 249 continue 250 251 if args[0] in modules_opts: 252 try: 253 if len(args) < 2: 254 msg = 'a module name or a comma-separated list of modules' 255 raise Exception('expected ' + msg) 256 257 g = globals() 258 from importlib import import_module 259 for e in args[1].split(','): 260 g[e] = import_module(e) 261 262 g = None 263 import_module = None 264 args = args[2:] 265 except Exception as e: 266 fail(e, 1) 267 268 continue 269 270 if args[0] in more_modules_opts: 271 import functools, itertools, json, math, random, statistics, string, time 272 args = args[1:] 273 continue 274 275 break 276 277 cr = '\r' if string_input else b'\r' 278 crlf = '\r\n' if string_input else b'\r\n' 279 dquo = '"' if string_input else b'"' 280 dquote = '"' if string_input else b'"' 281 empty = '' if string_input else b'' 282 lcurly = '{' if string_input else b'{' 283 lf = '\n' if string_input else b'\n' 284 rcurly = '}' if string_input else b'}' 285 space = ' ' if string_input else b' ' 286 squo = '\'' if string_input else b'\'' 287 squote = '\'' if string_input else b'\'' 288 tab = '\t' if string_input else b'\t' 289 utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf' 290 if string_input: 291 bom = { 292 'utf8': '\xef\xbb\xbf', 293 'utf16be': '\xfe\xff', 294 'utf16le': '\xff\xfe', 295 'utf32be': '\x00\x00\xfe\xff', 296 'utf32le': '\xff\xfe\x00\x00', 297 } 298 else: 299 bom = { 300 'utf8': b'\xef\xbb\xbf', 301 'utf16be': b'\xfe\xff', 302 'utf16le': b'\xff\xfe', 303 'utf32be': b'\x00\x00\xfe\xff', 304 'utf32le': b'\xff\xfe\x00\x00', 305 } 306 307 if len(args) > 0: 308 expression = args[0] 309 args = args[1:] 310 311 if expression is None: 312 print(info.strip(), file=stderr) 313 exit(0) 314 315 try: 316 if not expression or expression == '.': 317 expression = 'data' 318 expression = compile(expression, expression, 'eval') 319 320 got_stdin = False 321 all_stdin = None 322 dashes = args.count('-') 323 324 data = None 325 326 if not load_input: 327 emit_result(stdout.buffer, eval_expr(expression, None)) 328 exit(0) 329 330 if any(seemsurl(name) for name in args): 331 from urllib.request import urlopen 332 333 for name in args: 334 if name == '-': 335 if dashes > 1: 336 if not got_stdin: 337 all_stdin = stdin.buffer.read() 338 got_stdin = True 339 data = all_stdin 340 else: 341 data = stdin.buffer.read() 342 343 if string_input: 344 data = s = str(data, encoding='utf-8') 345 elif seemsurl(name): 346 with urlopen(name) as inp: 347 data = inp.read() 348 else: 349 with open(name) as inp: 350 data = inp.read() 351 352 if string_input: 353 data = s = str(data, encoding='utf-8') 354 emit_result(stdout.buffer, eval_expr(expression, data)) 355 356 if len(args) == 0: 357 data = stdin.buffer.read() 358 if string_input: 359 data = s = str(data, encoding='utf-8') 360 emit_result(stdout.buffer, eval_expr(expression, data)) 361 except BrokenPipeError: 362 # quit quietly, instead of showing a confusing error message 363 stderr.close() 364 exit(0) 365 except KeyboardInterrupt: 366 exit(2) 367 except Exception as e: 368 s = str(e) 369 s = s if s else '<generic exception>' 370 print(f'\x1b[31m{s}\x1b[0m', file=stderr) 371 exit(1)