#!/usr/bin/python3 # The MIT License (MIT) # # Copyright © 2024 pacman64 # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the “Software”), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from inspect import getfullargspec from itertools import islice from json import load, dump from sys import argv, stderr, stdin, stdout from typing import Any, Callable, Dict, Iterable, NoReturn, Tuple # extra imports for the `python-lambda` option from decimal import Decimal, getcontext from fractions import Fraction import functools from functools import \ cache, cached_property, cmp_to_key, get_cache_token, lru_cache, \ namedtuple, partial, partialmethod, recursive_repr, reduce, \ singledispatch, singledispatchmethod, total_ordering, update_wrapper, \ wraps import itertools from itertools import \ accumulate, chain, combinations, combinations_with_replacement, \ compress, count, cycle, dropwhile, filterfalse, groupby, islice, \ permutations, product, repeat, starmap, takewhile, tee, zip_longest try: from itertools import pairwise from itertools import batched except Exception: pass from json import dumps, loads import math Math = math from math import \ acos, acosh, asin, asinh, atan, atan2, atanh, ceil, comb, \ copysign, cos, cosh, degrees, dist, e, erf, erfc, exp, expm1, \ fabs, factorial, floor, fmod, frexp, fsum, gamma, gcd, hypot, inf, \ isclose, isfinite, isinf, isnan, isqrt, lcm, ldexp, lgamma, log, \ log10, log1p, log2, modf, nan, nextafter, perm, pi, pow, prod, \ radians, remainder, sin, sinh, sqrt, tan, tanh, tau, trunc, ulp try: from math import cbrt, exp2 except Exception: pass power = pow import operator import statistics from statistics import \ bisect_left, bisect_right, fmean, \ geometric_mean, harmonic_mean, mean, median, \ median_grouped, median_high, median_low, mode, multimode, pstdev, \ pvariance, quantiles, stdev, variance try: from statistics import \ correlation, covariance, linear_regression, mul except Exception: pass import string from string import \ Formatter, Template, ascii_letters, ascii_lowercase, ascii_uppercase, \ capwords, digits, hexdigits, octdigits, printable, punctuation, \ whitespace alphabet = ascii_letters letters = ascii_letters lowercase = ascii_lowercase uppercase = ascii_uppercase from textwrap import dedent, fill, indent, shorten, wrap from urllib.parse import \ parse_qs, parse_qsl, quote, quote_from_bytes, quote_plus, unquote, \ unquote_plus, unquote_to_bytes, unwrap, urldefrag, urlencode, urljoin, \ urlparse, urlsplit, urlunparse, urlunsplit from re import compile as zj_compile_re zj_info_msg = ''' zj [keys/indices...] Zoom Json digs into a subset of valid JSON input, using the given mix of keys and array-indices, the latter being either 0-based or negative, to index backward from the ends of arrays. Zooming on object keys is first tried as an exact key-match, failing that as a case-insensitive key-match (first such match): when both approaches fail, if the key is a valid integer, the key at the (even negative) index given is used. Invalid array-indices and missing object-keys result in null values, when none of the special keys/fallbacks shown later apply. You can slice arrays the exclusive/go/python way using index-pairs with a `:` between the start/end pair, as long as it's a single argument; you can even use `..` as the index-pair separator to include the stop index in the result. Either way, as with go/python, you can omit either of the indices when slicing. Special key `.` acts as implicit loops on arrays, and even objects without that specific key: in the unlikely case that an object has `.` as one of its keys, you can use one of loop-fallback aliases, shown later. Another special key is `+` (no quotes): when used, the rest of the keys are used `in parallel`, allowing multiple picks from the current value. When picking array items, you can also use either type (`:` or `..`) of slicing, even mixing it with individual indices. Similar to `+`, the `-` fallback-key drops keys, which means all items are picked, except for those mentioned after the `-`. Unlike the looping special key, after the first `+` special-key, all keys following it, special or not, are picked normally. In case any of the special keys are actual keys in the data loaded, some aliases are available: . /. ./ :. .: + /+ +/ :+ +: - /- -/ :- -: .i :i .info :info :info: .k :k .keys :keys :keys: .t :t .type :type :type: .l :l .len .length :len :len: :length :length: These aliases allow using the special functionality even on objects whose keys match some of these special names, as it's extremely unlikely data use all aliases as actual keys at any level. The only input supported is valid JSON coming from standard-input: there's no way to load files using their names. To load data from files/URIs use tools like `cat` or `curl`, and pipe their output into this tool. ''' zj_slice_re = zj_compile_re('''^(([+-]?[0-9]+)?)(:|\.\.)(([+-]?[0-9]+)?)$''') def zj_zoom(data: Any, keys: Tuple[str, ...]) -> Any: eval_due = False for i, k in enumerate(keys): try: if eval_due: data = eval(k)(data) eval_due = False continue if isinstance(data, dict): m = zj_match_key(data, k) if m in data: data = data[m] continue m = zj_slice_re.match(k) if m: data = {k: data[k] for k in zj_match_keys(data, k)} continue if isinstance(data, (list, tuple)): try: i = int(k) l = len(data) data = data[i] if -l <= i < l else None continue except Exception: m = zj_slice_re.match(k) if m: data = [data[i] for i in zj_match_indices(data, k)] continue if k in ('.pyl', ':pyl', 'pyl:', ':pyl:'): eval_due = True continue if k in ('.', '/.', './', ':.', '.:'): if isinstance(data, dict): rest = tuple(keys[i + 1:]) return {k: zj_zoom(v, rest) for k, v in data.items()} if isinstance(data, (list, tuple)): rest = tuple(keys[i + 1:]) return tuple(zj_zoom(v, rest) for v in data) # doing nothing amounts to an identity-op for simple values continue fn = zj_final_fallbacks.get(k, None) if fn: return fn(data, tuple(keys[i + 1:])) fn = zj_fallbacks.get(k, None) if fn: data = fn(data) continue if isinstance(data, (dict, list, tuple)): data = None continue kind = zj_type(data) msg = f'value of type {kind} has no properties to zoom into' raise Exception(msg) except Exception as e: key_path = ' > '.join(islice(keys, None, i + 1)) raise Exception(f'{key_path}: {e}') return data def zj_match_key(src: Dict, key: str) -> str: if key in src: return key low = key.casefold() for k in src.keys(): if low == k.casefold(): return k try: i = int(key) l = len(src) if i < 0: i += l if i < 0 or i >= l: return None for j, k in enumerate(src.keys()): if i == j: return k except Exception: return key return key def zj_match_keys(src: Any, key: str) -> Iterable: if isinstance(src, (list, tuple)): yield from zj_match_indices(src, key) yield from zj_match_fallbacks(src, key) return if isinstance(src, dict): if key in src: yield key return low = key.casefold() for k in src.keys(): if low == k.casefold(): yield k return yield from zj_match_indices(src, key) yield from zj_match_fallbacks(src, key) return yield from zj_match_fallbacks(src, key) def zj_match_indices(src: Any, key: str) -> Iterable: try: i = int(key) if isinstance(src, (list, tuple)): l = len(src) yield src[i] if -l <= i < l else None return if isinstance(src, dict): l = len(src) if i < 0: i += l if i < 0 or i >= l: return for j, k in enumerate(src.keys()): if i == j: yield k return return except Exception: pass m = zj_slice_re.match(key) if not m: return l = len(src) (start, _, kind, stop, _) = m.groups() start = int(start) if start != '' else 0 stop = int(stop) if stop != '' else l if start < 0: start += l start = max(start, 0) if stop < 0: stop += l stop = min(stop, l) if kind == '..': stop += 1 stop = min(stop, l) if start > stop: return if (start < 0 and stop < 0) or (start >= l and stop >= l): return if isinstance(src, dict): for i, k in enumerate(src.keys()): if i >= stop: return if start <= i: yield k return if isinstance(src, (list, tuple)): yield from range(start, stop) return def zj_match_fallbacks(src: Any, key: str) -> Iterable: fn = zj_fallbacks.get(key, None) if fn: yield fn(src) def zj_help(*_) -> NoReturn: print(zj_info_msg.strip(), file=stderr) exit(1) def zj_keys(src: Any) -> Any: if isinstance(src, dict): return tuple(src.keys()) if isinstance(src, (list, tuple)): return tuple(range(len(src))) return None def zj_info(x: Any) -> str: if isinstance(x, dict): return f'object ({len(x)} items)' if isinstance(x, (list, tuple)): return f'array ({len(x)} items)' return zj_type(x) def zj_type(x: Any) -> str: return { type(None): 'null', dict: 'object', float: 'number', int: 'number', str: 'string', list: 'array', tuple: 'array', }.get(type(x), 'other') zj_fallbacks: Dict[str, Callable] = { '.h': zj_help, '.help': zj_help, ':h': zj_help, ':help': zj_help, ':help:': zj_help, '.i': zj_info, '.info': zj_info, ':i': zj_info, ':info': zj_info, ':info:': zj_info, '.k': zj_keys, '.keys': zj_keys, ':keys': zj_keys, ':keys:': zj_keys, '.l': len, '.len': len, '.length': len, ':l': len, ':len': len, ':length': len, ':len:': len, ':length:': len, '.t': zj_type, '.type': zj_type, ':t': zj_type, ':type': zj_type, ':type:': zj_type, } def zj_pick(src: Any, keys: Tuple[str, ...]) -> Any: if isinstance(src, dict): picked = {} for k in keys: for k in zj_match_keys(src, k): picked[k] = src[k] return picked if isinstance(src, (list, tuple)): picked = [] for k in keys: for i in zj_match_indices(src, k): picked.append(src[i]) return tuple(picked) msg = f'can\'t pick properties from value of type {zj_type(src)}' raise Exception(msg) def zj_drop(src: Any, keys: Tuple[str, ...]) -> Any: if isinstance(src, dict): avoid = set() for k in keys: for k in zj_match_keys(src, k): avoid.add(k) return {k: v for k, v in src.items() if not k in avoid} if isinstance(src, (list, tuple)): l = len(src) avoid = set() for k in keys: for i in zj_match_indices(src, k): avoid.add(i if i >= 0 else i + l) return tuple(v for i, v in enumerate(src) if not i in avoid) msg = f'can\'t drop properties from value of type {zj_type(src)}' raise Exception(msg) zj_final_fallbacks: Dict[str, Callable] = { '+': zj_pick, ':+:': zj_pick, ':+': zj_pick, '+:': zj_pick, '/+': zj_pick, '+/': zj_pick, '-': zj_drop, ':-:': zj_drop, ':-': zj_drop, '-:': zj_drop, '/-': zj_drop, '-/': zj_drop, } apo = '\'' apos = '\'' backquote = '`' backtick = '`' ball = '●' block = '█' btick = '`' bullet = '•' cdot = '·' circle = '●' cross = '×' dquo = '"' dquote = '"' emdash = '—' endash = '–' ge = '≥' geq = '≥' hellip = '…' hole = '○' lcurly = '{' ldquo = '“' ldquote = '“' le = '≤' leq = '≤' mdash = '—' mdot = '·' miniball = '•' ndash = '–' neq = '≠' rcurly = '}' rdquo = '”' rdquote = '”' sball = '•' square = '■' squo = '\'' squote = '\'' def dive(into: Any, doing: Callable) -> Any: 'Transform a nested value by calling a func via depth-first recursion.' # support args in either order if callable(into): into, doing = doing, into return _dive_kv(None, into, doing) deepmap = dive dive1 = dive def divebin(x: Any, y: Any, doing: Callable) -> Any: 'Nested 2-value version of depth-first-recursive func dive.' # support args in either order if callable(x): x, y, doing = y, doing, x narg = required_arg_count(doing) if narg == 2: return dive(x, lambda a: dive(y, lambda b: doing(a, b))) if narg == 4: return dive(x, lambda i, a: dive(y, lambda j, b: doing(i, a, j, b))) raise Exception('divebin(...) only supports funcs with 2 or 4 args') bindive = divebin # diveboth = divebin # dualdive = divebin # duodive = divebin dive2 = divebin def _dive_kv(key: Any, into: Any, doing: Callable) -> Any: if isinstance(into, dict): return {k: _dive_kv(k, v, doing) for k, v in into.items()} if isinstance(into, Iterable) and not isinstance(into, str): return [_dive_kv(i, e, doing) for i, e in enumerate(into)] narg = required_arg_count(doing) return doing(key, into) if narg == 2 else doing(into) def recover(*args) -> Any: ''' Catch exceptions using a lambda/callback func, in one of 6 ways recover(zero_args_func) recover(zero_args_func, exception_replacement_value) recover(zero_args_func, one_arg_exception_handling_func) recover(one_arg_func, arg) recover(one_arg_func, arg, exception_replacement_value) recover(one_arg_func, arg, one_arg_exception_handling_func) ''' if len(args) == 1: f = args[0] try: return f() except Exception: return None elif len(args) == 2: f, fallback = args[0], args[1] if callable(f) and callable(fallback): try: return f() except Exception as e: nargs = required_arg_count(fallback) return fallback(e) if nargs == 1 else fallback() else: try: return f() if required_arg_count(f) == 0 else f(args[1]) except Exception: return fallback elif len(args) == 3: f, x, fallback = args[0], args[1], args[2] if callable(f) and callable(fallback): try: return f(x) except Exception as e: nargs = required_arg_count(fallback) return fallback(e) if nargs == 1 else fallback() else: try: return f(x) except Exception: return fallback else: raise Exception('recover(...) only works with 1, 2, or 3 args') attempt = recover attempted = recover recovered = recover recoverred = recover rescue = recover rescued = recover trycall = recover def required_arg_count(f: Callable) -> int: if isinstance(f, type): return 1 meta = getfullargspec(f) n = len(meta.args) if meta.defaults: n -= len(meta.defaults) return n typeof = zj_type # deny file-access to expression-evaluators open = None try: # load data, trying to handle help-like options as well try: data = load(stdin.buffer) except Exception as e: if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'): zj_help(None) else: raise e data = zj_zoom(data, tuple(argv[1:])) dump(data, stdout, indent=2, allow_nan=False, check_circular=False) # dump(data, stdout, indent=None, allow_nan=False, check_circular=False) stdout.write('\n') except BrokenPipeError: # quit quietly, instead of showing a confusing error message stderr.close() except KeyboardInterrupt: exit(2) except Exception as e: print(f'\x1b[31m{e}\x1b[0m', file=stderr) exit(1)