File: tbp.py 1 #!/usr/bin/python3 2 3 # The MIT License (MIT) 4 # 5 # Copyright © 2020-2025 pacman64 6 # 7 # Permission is hereby granted, free of charge, to any person obtaining a copy 8 # of this software and associated documentation files (the “Software”), to deal 9 # in the Software without restriction, including without limitation the rights 10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 # copies of the Software, and to permit persons to whom the Software is 12 # furnished to do so, subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be included in 15 # all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23 # SOFTWARE. 24 25 26 info = ''' 27 tbp [options...] [python expression] [files/URIs...] 28 29 30 Transform Bytes with Python runs a python expression on each whole-input, 31 read as a bytes-type value. 32 33 The expression can use either `v`, `value`, `d`, or `data` for the current 34 input. 35 36 Input-sources can be either files or web-URIs. When not given any explicit 37 named sources, the standard input is used. It's even possible to reuse the 38 standard input using multiple single dashes (-) in the order needed: stdin 39 is only read once in this case, and kept for later reuse. 40 41 When the expression results in None, the current input is ignored. When the 42 expression results in a boolean, this determines whether the whole input is 43 copied/appended back to the standard output, or ignored. 44 ''' 45 46 47 from sys import argv, exit, stderr, stdin, stdout 48 from time import sleep 49 from typing import Generator 50 51 52 if len(argv) < 2: 53 print(info.strip(), file=stderr) 54 exit(0) 55 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'): 56 print(info.strip()) 57 exit(0) 58 59 60 class Skip: 61 pass 62 63 64 skip = Skip() 65 66 67 def cond(*args): 68 if len(args) == 0: 69 return None 70 71 for i, e in enumerate(args): 72 if i % 2 == 0 and i < len(args) - 1 and e: 73 return args[i + 1] 74 75 return args[-1] if len(args) % 2 == 1 else None 76 77 78 def rescue(attempt, fallback = None): 79 try: 80 return attempt() 81 except Exception as e: 82 if callable(fallback): 83 return fallback(e) 84 return fallback 85 86 catch = rescue 87 recover = rescue 88 rescued = rescue 89 90 91 def wait(seconds, result): 92 t = (int, float) 93 if (not isinstance(seconds, t)) and isinstance(result, t): 94 seconds, result = result, seconds 95 sleep(seconds) 96 return result 97 98 delay = wait 99 100 101 no_input_opts = ( 102 '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null', 103 ) 104 string_opts = ('-s', '--s', '-str', '--str', '-string', '--string') 105 106 args = argv[1:] 107 load_input = True 108 string_input = False 109 expression = None 110 111 # handle all other leading options; the explicit help options are 112 # handled earlier in the script 113 while len(args) > 0: 114 if args[0] in no_input_opts: 115 load_input = False 116 args = args[1:] 117 continue 118 119 if args[0] in string_opts: 120 string_input = True 121 args = args[1:] 122 continue 123 124 break 125 126 if len(args) > 0: 127 expression = args[0] 128 args = args[1:] 129 130 if expression is None: 131 print(info.strip(), file=stderr) 132 exit(0) 133 134 135 def make_open_read(open): 136 'Restrict the file-open func to a read-only-binary file-open func.' 137 def open_read(name): 138 return open(name, mode='rb') 139 return open_read 140 141 142 def fail(msg, code = 1): 143 print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr) 144 exit(code) 145 146 147 def message(msg, result = None): 148 print(msg, file=stderr) 149 return result 150 151 msg = message 152 153 154 def seemsurl(s): 155 protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') 156 return any(s.startswith(p) for p in protocols) 157 158 159 def tobytes(x): 160 if isinstance(x, (bytearray, bytes)): 161 return x 162 if isinstance(x, (bool, int)): 163 return bytes((int(x), )) 164 if isinstance(x, float): 165 return bytes(str(x), encoding='utf-8') 166 if isinstance(x, str): 167 return bytes(x, encoding='utf-8') 168 return bytes(x) 169 170 171 def tointorbytes(x): 172 return x if isinstance(x, int) else tobytes(x) 173 174 175 def adapt_result(x, default): 176 if x is True: 177 return default 178 if x is False: 179 return None 180 181 if isinstance(x, Skip): 182 return None 183 184 if callable(x): 185 return x(default) 186 return x 187 188 189 def emit_result(w, x): 190 if x is None: 191 return 192 193 if isinstance(x, int): 194 w.write(tobytes(x)) 195 return 196 197 if isinstance(x, (list, tuple, range, Generator)): 198 for e in x: 199 w.write(tobytes(e)) 200 return 201 202 w.write(tobytes(x)) 203 204 205 def eval_expr(expr, using): 206 global v, val, value, d, dat, data 207 # offer several aliases for the variable with the input bytes 208 v = val = value = d = dat = data = using 209 return adapt_result(eval(expr), using) 210 211 212 cr = '\r' if string_input else b'\r' 213 crlf = '\r\n' if string_input else b'\r\n' 214 dquo = '"' if string_input else b'"' 215 dquote = '"' if string_input else b'"' 216 empty = '' if string_input else b'' 217 lcurly = '{' if string_input else b'{' 218 lf = '\n' if string_input else b'\n' 219 rcurly = '}' if string_input else b'}' 220 space = ' ' if string_input else b' ' 221 squo = '\'' if string_input else b'\'' 222 squote = '\'' if string_input else b'\'' 223 tab = '\t' if string_input else b'\t' 224 # utf8bom = '\xef\xbb\xbf' if string_input else b'\xef\xbb\xbf' 225 if string_input: 226 bom = { 227 'utf8': '\xef\xbb\xbf', 228 'utf16be': '\xfe\xff', 229 'utf16le': '\xff\xfe', 230 'utf32be': '\x00\x00\xfe\xff', 231 'utf32le': '\xff\xfe\x00\x00', 232 } 233 else: 234 bom = { 235 'utf8': b'\xef\xbb\xbf', 236 'utf16be': b'\xfe\xff', 237 'utf16le': b'\xff\xfe', 238 'utf32be': b'\x00\x00\xfe\xff', 239 'utf32le': b'\xff\xfe\x00\x00', 240 } 241 242 nil = None 243 none = None 244 null = None 245 246 exec = None 247 open = make_open_read(open) 248 249 modules_opts = ( 250 '-m', '--m', '-mod', '--mod', '-module', '--module', 251 '-modules', '--modules', 252 ) 253 more_modules_opts = ('-mm', '--mm', '-more', '--more') 254 255 while len(args) > 0: 256 if args[0] in no_input_opts: 257 no_input = True 258 args = args[1:] 259 continue 260 261 if args[0] in modules_opts: 262 try: 263 if len(args) < 2: 264 msg = 'a module name or a comma-separated list of modules' 265 raise Exception('expected ' + msg) 266 267 g = globals() 268 from importlib import import_module 269 for e in args[1].split(','): 270 g[e] = import_module(e) 271 272 g = None 273 import_module = None 274 args = args[2:] 275 except Exception as e: 276 fail(e, 1) 277 278 continue 279 280 if args[0] in more_modules_opts: 281 import functools, itertools, json, math, random, statistics, string, time 282 args = args[1:] 283 continue 284 285 break 286 287 288 try: 289 if not expression or expression == '.': 290 expression = 'data' 291 expression = compile(expression, expression, 'eval') 292 293 got_stdin = False 294 all_stdin = None 295 dashes = args.count('-') 296 297 data = None 298 299 if not load_input: 300 emit_result(stdout.buffer, eval_expr(expression, None)) 301 exit(0) 302 303 if any(seemsurl(name) for name in args): 304 from urllib.request import urlopen 305 306 for name in args: 307 if name == '-': 308 if dashes > 1: 309 if not got_stdin: 310 all_stdin = stdin.buffer.read() 311 got_stdin = True 312 data = all_stdin 313 else: 314 data = stdin.buffer.read() 315 316 if string_input: 317 data = s = str(data, encoding='utf-8') 318 elif seemsurl(name): 319 with urlopen(name) as inp: 320 data = inp.read() 321 else: 322 with open(name) as inp: 323 data = inp.read() 324 325 if string_input: 326 data = s = str(data, encoding='utf-8') 327 emit_result(stdout.buffer, eval_expr(expression, data)) 328 329 if len(args) == 0: 330 data = stdin.buffer.read() 331 if string_input: 332 data = s = str(data, encoding='utf-8') 333 emit_result(stdout.buffer, eval_expr(expression, data)) 334 except BrokenPipeError: 335 # quit quietly, instead of showing a confusing error message 336 stderr.close() 337 exit(0) 338 except KeyboardInterrupt: 339 exit(2) 340 except Exception as e: 341 s = str(e) 342 s = s if s else '<generic exception>' 343 print(f'\x1b[31m{s}\x1b[0m', file=stderr) 344 exit(1)