#!/usr/bin/python3 # The MIT License (MIT) # # Copyright © 2024 pacman64 # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the “Software”), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. info = ''' tjp [options...] [python expression] [file/URI...] Transform Json with Python runs a python expression on a single JSON-encoded input. The expression can use either `v`, `value`, `d`, or `data` for the decoded input. Invalid-JSON inputs result in an error, with no chance of recovery. Input-sources can be either files or web-URIs. When not given a named input, the standard input is used. Examples # numbers from 0 to 5; no input is read/used tjp = 'range(6)' # using bases 1 to 5, find all their powers up to the 4th tjp = '((n**p for p in range(1, 4+1)) for n in range(1, 6))' # keep only the last 2 items from the input tjp = 'range(1, 6)' | tjp 'data[-2:]' # chunk/regroup input items into arrays of up to 3 items each tjp = 'range(1, 8)' | tjp 'chunk(data, 3)' # ignore errors/exceptions, in favor of a fallback value tjp = 'rescue(lambda: 2 * float("no way"), "fallback value")' # ignore errors/exceptions, calling a fallback func with the exception tjp = 'rescue(lambda: 2 * float("no way"), str)' # use dot-syntax on JSON data tjp = '{"abc": {"xyz": 123}}' | tjp -d 'data.abc.xyz' # use dot-syntax on JSON data; keywords as properties are syntax-errors tjp = '{"abc": {"def": 123}}' | tjp -d 'data.abc["def"]' # func results are automatically called on the input tjp = '{"abc": 123, "def": 456}' | tjp len ''' from itertools import islice from json import dump, load, loads compile_py = compile from re import compile as compile_uncached, IGNORECASE from sys import argv, exit, stderr, stdin, stdout from typing import Iterable if len(argv) < 2: print(info.strip(), file=stderr) exit(0) if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'): print(info.strip()) exit(0) class Skip: pass skip = Skip() class Dottable: 'Enable convenient dot-syntax access to dictionary values.' def __getattr__(self, key): return self.__dict__[key] if key in self.__dict__ else None def __getitem__(self, key): return self.__dict__[key] if key in self.__dict__ else None def __iter__(self): return iter(self.__dict__) def dotate(x): 'Recursively ensure all dictionaries in a value are dot-accessible.' if isinstance(x, dict): d = Dottable() d.__dict__ = {k: dotate(v) for k, v in x.items()} return d if isinstance(x, list): return [dotate(e) for e in x] if isinstance(x, tuple): return tuple(dotate(e) for e in x) return x dotated = dotate dote = dotate doted = dotate dotified = dotate dotify = dotate dottified = dotate dottify = dotate def chunk(items, chunk_size): 'Break iterable into chunks, each with up to the item-count given.' if isinstance(items, str): n = len(items) while n >= chunk_size: yield items[:chunk_size] items = items[chunk_size:] n -= chunk_size if n > 0: yield items return if not isinstance(chunk_size, int): raise Exception('non-integer chunk-size') if chunk_size < 1: raise Exception('non-positive chunk-size') it = iter(items) while True: head = tuple(islice(it, chunk_size)) if not head: return yield head chunked = chunk # re_cache is used by custom func compile to cache previously-compiled # regular-expressions, which makes them quicker to (re)use in formulas re_cache = {} # ire_cache is like re_cache, except it's for case-insensitive regexes ire_cache = {} def compile(expr, flags = 0): 'Speed-up using regexes across lines, by avoiding recompilations.' if flags != 0 and flags != IGNORECASE: msg = 'only the default and case-insensitive options are supported' raise Exception(msg) cache = re_cache if flags == 0 else ire_cache if expr in cache: return cache[expr] pat = compile_uncached(expr, flags) cache[expr] = pat return pat def icompile(expr): return compile(expr, IGNORECASE) def cond(*args): if len(args) == 0: return None for i, e in enumerate(args): if i % 2 == 0 and i < len(args) - 1 and e: return args[i + 1] return args[-1] if len(args) % 2 == 1 else None def dive(into, using): 'Depth-first recursive caller for 1-input functions.' if callable(into): into, using = using, into def rec(v): if isinstance(v, dict): return {k: rec(v) for k, v in v.items()} if isinstance(v, Iterable) and not isinstance(v, str): return [rec(v) for v in v] return using(v) return rec(into) def divekeys(into, using): 'Depth-first recursive caller for 2-input funcs which rename dict keys.' if callable(into): into, using = using, into def rec(v): if isinstance(v, dict): return {using(k): rec(v) for k, v in v.items()} if isinstance(v, Iterable) and not isinstance(v, str): return [rec(v) for i, v in enumerate(v)] return v return rec(None, into) def divekv(into, using, using2 = None): 'Depth-first recursive caller for 2-input functions.' if using2 is None: if callable(into): into, using = using, into else: if not callable(using2): into, using, using2 = using2, into, using def rec(k, v): if isinstance(v, dict): return {k: rec(k, v) for k, v in v.items()} if isinstance(v, Iterable) and not isinstance(v, str): return [rec(i, v) for i, v in enumerate(v)] return using(k, v) def rec2(k, v): if isinstance(v, dict): return {str(using(k, v)): rec2(k, v) for k, v in v.items()} if isinstance(v, Iterable) and not isinstance(v, str): # return {str(using(i, v)): rec2(i, v) for i, v in enumerate(v)} return [rec2(i, v) for i, v in enumerate(v)] return using2(k, v) return rec(None, into) if using2 is None else rec2(None, into) kvdive = divekv def drop(src, *what): if isinstance(src, str): for s in what: src = src.replace(s, '') return src def kdrop(src, what): kv = {} for k, v in src.items(): if not (k in what): kv[k] = v return kv if isinstance(src, dict): return kdrop(src, set(what)) if isinstance(src, Iterable): what = set(what) return [kdrop(e, what) for e in src] return None dropped = drop def join(x, y = ' '): 'Join values into a string, or make a dict from keys and values.' if isinstance(x, str): return x.join(str(v) for v in y) if isinstance(y, str): return y.join(str(v) for v in x) return {k: v for k, v in zip(x, y)} def pick(src, *keys): if isinstance(src, dict): return {k: src.get(k, None) for k in keys} return [{k: e.get(k, None) for k in keys} for e in src] picked = pick def rescue(attempt, fallback = None): try: return attempt() except Exception as e: if callable(fallback): return fallback(e) return fallback catch = rescue catched = rescue caught = rescue recover = rescue recovered = rescue rescued = rescue def retype(x): 'Try to narrow the type of the value given.' if isinstance(x, float): n = int(x) return n if float(n) == x else x if not isinstance(x, str): return x try: return loads(x) except Exception: pass try: return int(x) except Exception: pass try: return float(x) except Exception: pass return x autocast = retype autocasted = retype mold = retype molded = retype recast = retype recasted = retype remold = retype remolded = retype retyped = retype def typeof(x): # return str(type(x)) return { type(None): 'null', bool: 'boolean', dict: 'object', float: 'number', int: 'number', str: 'string', list: 'array', tuple: 'array', }.get(type(x), 'other') jstype = typeof def result_needs_fixing(x): if x is None or isinstance(x, (bool, int, float, str)): return False rec = result_needs_fixing if isinstance(x, dict): return any(rec(k) or rec(v) for k, v in x.items()) if isinstance(x, (list, tuple)): return any(rec(e) for e in x) return True def fix_result(x, default): if x is type: return type(default).__name__ # if expression results in a func, auto-call it with the original data if callable(x): x = x(default) if x is None or isinstance(x, (bool, int, float, str)): return x rec = fix_result if isinstance(x, dict): return { rec(k, default): rec(v, default) for k, v in x.items() if not (isinstance(k, Skip) or isinstance(v, Skip)) } if isinstance(x, Iterable): return tuple(rec(e, default) for e in x if not isinstance(e, Skip)) if isinstance(x, Dottable): return rec(x.__dict__, default) if isinstance(x, Exception): raise x return None if isinstance(x, Skip) else str(x) def fail(msg, code = 1): print(f'\x1b[31m{str(msg)}\x1b[0m', file=stderr) exit(code) def message(msg, result = None): print(msg, file=stderr) return result msg = message def seemsurl(path): protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') return any(path.startswith(p) for p in protocols) def matchkey(kv, key): if key in kv: return key low = key.lower() for k in kv.keys(): if low == k.lower(): return k try: i = int(key) l = len(kv) if i < 0: i += l if not (-l <= i < l): return key for j, k in enumerate(kv.keys()): if i == j: return k except Exception as _: return key return key def zoom(data, keys): for k in keys: if isinstance(data, dict): # m = matchkey(data, k) # if not (m in data): # raise Exception(f'{m}: object doesn\'t have that key') data = data.get(matchkey(data, k), None) continue if isinstance(data, (list, tuple)): try: k = int(k) l = len(data) data = data[k] if -l <= k < l else None except Exception as _: # raise Exception(f'{k}: arrays don\'t have keys like objects') data = None continue # return None # if not (data is None): # data = None # continue raise Exception(f'{k}: can\'t zoom on value of type {typeof(data)}') return data def make_eval_once(run): def eval_once(expr): global eval eval = None return run(expr) return eval_once eval = make_eval_once(eval) cr = '\r' crlf = '\r\n' dquo = '"' dquote = '"' empty = '' lcurly = '{' lf = '\n' rcurly = '}' s = '' squo = '\'' squote = '\'' # utf8bom = '\xef\xbb\xbf' nil = None none = None null = None no_input_opts = ( '=', '-n', '--n', '-nil', '--nil', '-none', '--none', '-null', '--null', ) compact_output_opts = ( '-c', '--c', '-compact', '--compact', '-j0', '--j0', '-json0', '--json0', ) dot_opts = ('-d', '--d', '-dot', '--dot', '-dots', '--dots') modules_opts = ( '-m', '--m', '-mod', '--mod', '-module', '--module', '-modules', '--modules', ) more_modules_opts = ('-mm', '--mm', '-more', '--more') trace_opts = ('-t', '--t', '-trace', '--trace', '-traceback', '--traceback') zoom_opts = ('-z', '--z', '-zj', '--zj', '-zoom', '--zoom') args = argv[1:] no_input = False zoom_stdin = False trace_errors = False dottable_input = False compact_output = False while len(args) > 0: if args[0] in no_input_opts: no_input = True args = args[1:] continue if args[0] in compact_output_opts: compact_output = True args = args[1:] continue if args[0] in dot_opts: dottable_input = True args = args[1:] continue if args[0] in modules_opts: try: if len(args) < 2: msg = 'a module name or a comma-separated list of modules' raise Exception('expected ' + msg) g = globals() from importlib import import_module for e in args[1].split(','): g[e] = import_module(e) g = None import_module = None args = args[2:] except Exception as e: fail(e, 1) continue if args[0] in more_modules_opts: import functools, itertools, json, math, random, statistics, string, time args = args[1:] continue if args[0] in trace_opts: trace_errors = True args = args[1:] continue if args[0] in zoom_opts: zoom_stdin = True args = args[1:] break break try: expr = 'data' if len(args) > 0: expr = args[0] args = args[1:] if expr == '.': expr = 'data' expr = compile_py(expr, expr, mode='eval') if len(args) > 1: raise Exception('can\'t use more than 1 input') path = '-' if len(args) == 0 else args[0] if no_input: data = None elif zoom_stdin: data = load(stdin) data = zoom(data, args) elif path == '-': data = load(stdin) elif seemsurl(path): from io import TextIOWrapper from urllib.request import urlopen with urlopen(path) as inp: with TextIOWrapper(inp, encoding='utf-8') as txt: data = load(txt) else: with open(path, encoding='utf-8') as inp: data = load(inp) if (not zoom_stdin) and dottable_input: data = dotate(data) v = value = d = data if not zoom_stdin: compile_py = None exec = None open = None v = eval(expr) if result_needs_fixing(v): v = fix_result(v, data) if compact_output: dump(v, stdout, indent=None, separators=(',', ':'), allow_nan=False) else: dump(v, stdout, indent=2, separators=(',', ': '), allow_nan=False) print() except BrokenPipeError: # quit quietly, instead of showing a confusing error message stderr.close() exit(0) except KeyboardInterrupt: # stderr.close() exit(2) except Exception as e: if trace_errors: raise e else: fail(e, 1)