#!/usr/bin/python3 # The MIT License (MIT) # # Copyright © 2024 pacman64 # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the “Software”), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. info = ''' tj [options...] [python expression] [filepath/URI...] Transform Json loads JSON data, runs a Python expression on it, and emits the result as JSON. Parsed input-data are available to the expression as any of the variables named `v`, `value`, `d`, and `data`. If no file/URI is given, it loads JSON data from its standard input. If the argument before the expression is a single equals sign (a `=`, without the quotes), no data are read/parsed, and the expression is evaluated as given. Options, where leading double-dashes are also allowed, except for alias `=`: -c compact single-line JSON output (JSON-0) -compact same as -c -j0 same as -c -json0 same as -c -json-0 same as -c -h show this help message -help same as -h -nil don't read any input -no-input same as -nil -noinput same as -nil -none same as -nil -null same as -nil -null-input same as -nil -nullinput same as -nil = same as -nil -d recursively make dictionary values dot-accessible -dot same as -d -dots same as -d -p show a performance/time-profile of the expression run -prof same as -p -profile same as -p -t show a full traceback of this script for exceptions -trace same as -t -traceback same as -t -z zoom JSON value read from stdin, using all the keys given -zj same as -z -zoom same as -z Extra Functions after(x, y) ignore items until the one given; for strings and sequences afterfinal(x, y) backward counterpart of func after afterlast(x, y) same as func afterfinal arrayish(x) check if value is a list, a tuple, or a generator basename(s) get the final/file part of a pathname before(x, y) ignore items since the one given; for strings and sequences beforefinal(x, y) backward counterpart of func before beforelast(x, y) same as func beforefinal chunk(x, size) split/resequence items into chunks of the length given chunked(x, size) same as func chunk compose(*args) make a func which chain-calls all funcs given composed(*args) same as func compose cond(*args) expression-friendly fully-evaluated if-else chain debase64(s) decode base64 strings, including data-URIs dedup(x) ignore later (re)occurrences of values in a sequence dejson(x, f=None) safe parse JSON from strings denan(x, y) turn a floating-point NaN values into the fallback given denil(*args) return the first non-null/none value among those given denone(*args) same as func denil denull(*args) same as func denil dirname(s) get the folder/directory/parent part of a pathname dive(x, f) transform value in depth-first-recursive fashion divebin(x, y, f) binary (2-input) version of recursive-transform func dive drop(x, *what) ignore keys or substrings; for strings, dicts, dict-lists dropped(x, *v) same as func drop each(x, f) generalization of built-in func map endict(x) turn non-dictionary values into dicts with string keys enfloat(x, f=nan) turn values into floats, offering a fallback on failure enint(x, f=None) turn values into ints, offering a fallback on failure enlist(x) turn non-list values into lists entuple(x) turn non-tuple values into tuples ext(s) return the file-extension part of a pathname, if available fields(s) split fields AWK-style from the string given filtered(x, f) same as func keep flat(*args) flatten everything into an unnested sequence fromto(x, y, ?f) sequence integers, end-value included group(x, ?by) group values into dicts of lists; optional transform func grouped(x, ?by) same as func group harden(f, v) make funcs which return values instead of exceptions hardened(f, v) same as func harden countif(x, f) count how many values make the func given true-like idiota(x, ?f) dict-counterpart of func iota ints(x, y, ?f) make sequences of increasing integers, which include the end iota(x, ?f) make an integer sequence from 1 up to the number given join(x, y) join values into a string; make a dict from keys and values json0(x) turn a value into its smallest JSON-string representation json2(x) turn a value into a 2-space-indented multi-line JSON string jsonl(x) turn a value into a sequence of single-line (JSONL) strings keep(x, pred) generalization of built-in func filter kept(x, pred) same as func keep links(x) auto-detect all hyperlink-like (HTTP/HTTPS) substrings mapped(x, f) same as func each number(x) try to parse as an int, on failure try to parse as a float numbers(x) auto-detect all numbers in the value given numstats(x) calculate various `single-pass` numeric stats once(x, y=None) avoid returning the same value more than once; stateful func pick(x, *what) keep only the keys given; works on dicts, or dict-sequences picked(x, *what) same a func pick plain(s) ignore ANSI-style sequences in strings quoted(s, q='"') surround a string with the (optional) quoting-symbol given recover(*args) recover from exceptions with a fallback value reject(x, pred) generalization of built-in func filter, with opposite logic since(x, y) ignore items before the one given; for strings and sequences sincefinal(x, y) backward counterpart of func since sincelast(x, y) same as func sincefinal split(x, y) split string by separator; split sequence into several ones squeeze(s) strip/trim a string, squishing inner runs of spaces stround(x, d=6) format numbers into decimal-number strings tally(x, ?by) count/tally values, using an optional transformation func tallied(x, ?by) same as func tally trap(x, f=None) try running a func, handing exceptions to a fallback func trycall(*args) same as func recover unique(x) same as func dedup uniqued(x) same as func dedup unjson(x, f=None) same as func dejson unquoted(s) ignore surrounding quotes, if present until(x, y) ignore items after the one given; for strings and sequences untilfinal(x, y) backward counterpart of func until untillast(x, y) same as func untilfinal wait(seconds, x) wait the given number of seconds, before returning a value wat(*args) What Are These (wat) shows help/doc messages for funcs Examples # numbers from 0 to 5; no input is read/used tj = 'range(6)' # using bases 1 to 5, find all their powers up to the 4th tj = '((n**p for p in range(1, 4+1)) for n in range(1, 6))' # keep only the last 2 items from the input tj = 'range(1, 6)' | tj 'data[-2:]' # chunk/regroup input items into arrays of up to 3 items each tj = 'range(1, 8)' | tj 'chunk(data, 3)' # ignore all items before the first one with just a 5 in it tj = 'range(8)' | tj 'since(data, 5)' # ignore errors/exceptions, in favor of a fallback value tj = 'safe(lambda: 2 * float("no way"), "fallback value")' # ignore errors/exceptions, calling a fallback func with the exception tj = 'safe(lambda: 2 * float("no way"), lambda err: str(err))' # use dot-syntax on JSON data tj = '{"abc": {"xyz": 123}}' | tj -d 'data.abc.xyz' # use dot-syntax on JSON data; keywords as properties are syntax-errors tj = '{"abc": {"def": 123}}' | tj -d 'data.abc["def"]' # func results are automatically called on the input tj = '{"abc": 123, "def": 456}' | tj len ''' from sys import argv, exit, stderr, stdin, stdout if __name__ != '__main__': print('don\'t import this script, run it directly instead', file=stderr) exit(1) # no args or a leading help-option arg means show the help message and quit help_opts = ('-h', '--h', '-help', '--help') if len(argv) < 2 or (len(argv) == 2 and argv[1] in help_opts): print(info.strip(), file=stderr) exit(0) from io import StringIO from itertools import islice from json import load from typing import \ AbstractSet, Annotated, Any, AnyStr, \ AsyncContextManager, AsyncGenerator, AsyncIterable, AsyncIterator, \ Awaitable, BinaryIO, ByteString, Callable, cast, \ ClassVar, Collection, Container, \ ContextManager, Coroutine, Deque, Dict, Final, \ final, ForwardRef, FrozenSet, Generator, Generic, get_args, get_origin, \ get_type_hints, Hashable, IO, ItemsView, \ Iterable, Iterator, KeysView, List, Literal, Mapping, \ MappingView, Match, MutableMapping, MutableSequence, MutableSet, \ NamedTuple, NewType, no_type_check, no_type_check_decorator, \ NoReturn, Optional, overload, \ Protocol, Reversible, \ runtime_checkable, Sequence, Set, Sized, SupportsAbs, \ SupportsBytes, SupportsComplex, SupportsFloat, SupportsIndex, \ SupportsInt, SupportsRound, Text, TextIO, Tuple, Type, \ TypedDict, TypeVar, \ TYPE_CHECKING, Union, ValuesView try: from typing import \ assert_never, assert_type, clear_overloads, Concatenate, \ dataclass_transform, get_overloads, is_typeddict, LiteralString, \ Never, NotRequired, ParamSpec, ParamSpecArgs, ParamSpecKwargs, \ Required, reveal_type, Self, TypeAlias, TypeGuard, TypeVarTuple, \ Unpack from typing import \ AwaitableGenerator, override, TypeAliasType, type_check_only except Exception: pass def conforms(x: Any) -> bool: ''' Check if a value is JSON-compatible, which includes checking values recursively, in case of composite/nestable values. ''' if x is None or isinstance(x, (bool, int, str)): return True if isinstance(x, float): return not (isnan(x) or isinf(x)) if isinstance(x, (list, tuple)): return all(conforms(e) for e in x) if isinstance(x, dict): return all(conforms(k) and conforms(v) for k, v in x.items()) return False def seems_url(s: str) -> bool: protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') return any(s.startswith(p) for p in protocols) def result_needs_fixing(x: Any) -> bool: ''' See if func fix_result needs to be called: avoiding that can speed things up and save memory, when a composite value is big enough. ''' if x is None or isinstance(x, (bool, int, float, str)): return False rec = result_needs_fixing if isinstance(x, dict): return any(rec(k) or rec(v) for k, v in x.items()) if isinstance(x, (list, tuple)): return any(rec(e) for e in x) return True def fix_result(x: Any, default: Any) -> Any: 'Adapt a value so it can be output.' if x is type: return type(default).__name__ # if expression results in a func, auto-call it with the original data if callable(x): c = required_arg_count(x) if c == 1: x = x(default) else: m = f'func auto-call only works with 1-arg funcs (func wanted {c})' raise Exception(m) if x is None or isinstance(x, (bool, int, float, str)): return x rec = fix_result if isinstance(x, dict): return { rec(k, default): rec(v, default) for k, v in x.items() if not (isinstance(k, Skip) or isinstance(v, Skip)) } if isinstance(x, Iterable): return tuple(rec(e, default) for e in x if not isinstance(e, Skip)) if isinstance(x, Dottable): return rec(x.__dict__, default) if isinstance(x, DotCallable): return rec(x.value, default) if isinstance(x, Exception): raise x return None if isinstance(x, Skip) else str(x) def disabled_exec(*args, **kwargs) -> None: _ = args _ = kwargs raise Exception('built-in func `exec` is disabled') def disabled_open(*args, **kwargs) -> None: _ = args _ = kwargs raise Exception('built-in func `open` is disabled') from re import compile as zj_compile_re zj_info_msg = ''' zj [keys/indices...] Zoom Json digs into a subset of valid JSON input, using the given mix of keys and array-indices, the latter being either 0-based or negative, to index backward from the ends of arrays. Zooming on object keys is first tried as an exact key-match, failing that as a case-insensitive key-match (first such match): when both approaches fail, if the key is a valid integer, the key at the (even negative) index given is used. Invalid array-indices and missing object-keys result in null values, when none of the special keys/fallbacks shown later apply. You can slice arrays the exclusive/go/python way using index-pairs with a `:` between the start/end pair, as long as it's a single argument; you can even use `..` as the index-pair separator to include the stop index in the result. Either way, as with go/python, you can omit either of the indices when slicing. Special key `.` acts as implicit loops on arrays, and even objects without that specific key: in the unlikely case that an object has `.` as one of its keys, you can use one of loop-fallback aliases, shown later. Another special key is `+` (no quotes): when used, the rest of the keys are used `in parallel`, allowing multiple picks from the current value. When picking array items, you can also use either type (`:` or `..`) of slicing, even mixing it with individual indices. Similar to `+`, the `-` fallback-key drops keys, which means all items are picked, except for those mentioned after the `-`. Unlike the looping special key, after the first `+` special-key, all keys following it, special or not, are picked normally. In case any of the special keys are actual keys in the data loaded, some aliases are available: . /. ./ :. .: + /+ +/ :+ +: - /- -/ :- -: .i :i .info :info :info: .k :k .keys :keys :keys: .t :t .type :type :type: .l :l .len .length :len :len: :length :length: These aliases allow using the special functionality even on objects whose keys match some of these special names, as it's extremely unlikely data use all aliases as actual keys at any level. The only input supported is valid JSON coming from standard-input: there's no way to load files using their names. To load data from files/URIs use tools like `cat` or `curl`, and pipe their output into this tool. ''' zj_slice_re = zj_compile_re('''^(([+-]?[0-9]+)?)(:|\.\.)(([+-]?[0-9]+)?)$''') def zj_zoom(data: Any, keys: Tuple[str, ...]) -> Any: eval_due = False for i, k in enumerate(keys): try: if eval_due: data = eval(k)(data) eval_due = False continue if isinstance(data, dict): m = zj_match_key(data, k) if m in data: data = data[m] continue m = zj_slice_re.match(k) if m: data = {k: data[k] for k in zj_match_keys(data, k)} continue if isinstance(data, (list, tuple)): try: i = int(k) l = len(data) data = data[i] if -l <= i < l else None continue except Exception: m = zj_slice_re.match(k) if m: data = [data[i] for i in zj_match_indices(data, k)] continue if k in ('.pyl', ':pyl', 'pyl:', ':pyl:'): eval_due = True continue if k in ('.', '/.', './', ':.', '.:'): if isinstance(data, dict): rest = tuple(keys[i + 1:]) return {k: zj_zoom(v, rest) for k, v in data.items()} if isinstance(data, (list, tuple)): rest = tuple(keys[i + 1:]) return tuple(zj_zoom(v, rest) for v in data) # doing nothing amounts to an identity-op for simple values continue fn = zj_final_fallbacks.get(k, None) if fn: return fn(data, tuple(keys[i + 1:])) fn = zj_fallbacks.get(k, None) if fn: data = fn(data) continue if isinstance(data, (dict, list, tuple)): data = None continue kind = zj_type(data) msg = f'value of type {kind} has no properties to zoom into' raise Exception(msg) except Exception as e: key_path = ' > '.join(islice(keys, None, i + 1)) raise Exception(f'{key_path}: {e}') return data def zj_match_key(src: Dict, key: str) -> str: if key in src: return key low = key.casefold() for k in src.keys(): if low == k.casefold(): return k try: i = int(key) l = len(src) if i < 0: i += l if i < 0 or i >= l: return None for j, k in enumerate(src.keys()): if i == j: return k except Exception: return key return key def zj_match_keys(src: Any, key: str) -> Iterable: if isinstance(src, (list, tuple)): yield from zj_match_indices(src, key) yield from zj_match_fallbacks(src, key) return if isinstance(src, dict): if key in src: yield key return low = key.casefold() for k in src.keys(): if low == k.casefold(): yield k return yield from zj_match_indices(src, key) yield from zj_match_fallbacks(src, key) return yield from zj_match_fallbacks(src, key) def zj_match_indices(src: Any, key: str) -> Iterable: try: i = int(key) if isinstance(src, (list, tuple)): l = len(src) yield src[i] if -l <= i < l else None return if isinstance(src, dict): l = len(src) if i < 0: i += l if i < 0 or i >= l: return for j, k in enumerate(src.keys()): if i == j: yield k return return except Exception: pass m = zj_slice_re.match(key) if not m: return l = len(src) (start, _, kind, stop, _) = m.groups() start = int(start) if start != '' else 0 stop = int(stop) if stop != '' else l if start < 0: start += l start = max(start, 0) if stop < 0: stop += l stop = min(stop, l) if kind == '..': stop += 1 stop = min(stop, l) if start > stop: return if (start < 0 and stop < 0) or (start >= l and stop >= l): return if isinstance(src, dict): for i, k in enumerate(src.keys()): if i >= stop: return if start <= i: yield k return if isinstance(src, (list, tuple)): yield from range(start, stop) return def zj_match_fallbacks(src: Any, key: str) -> Iterable: fn = zj_fallbacks.get(key, None) if fn: yield fn(src) def zj_help(*_) -> NoReturn: print(zj_info_msg.strip(), file=stderr) exit(1) def zj_keys(src: Any) -> Any: if isinstance(src, dict): return tuple(src.keys()) if isinstance(src, (list, tuple)): return tuple(range(len(src))) return None def zj_info(x: Any) -> str: if isinstance(x, dict): return f'object ({len(x)} items)' if isinstance(x, (list, tuple)): return f'array ({len(x)} items)' return zj_type(x) def zj_type(x: Any) -> str: return { type(None): 'null', dict: 'object', float: 'number', int: 'number', str: 'string', list: 'array', tuple: 'array', }.get(type(x), 'other') zj_fallbacks: Dict[str, Callable] = { '.h': zj_help, '.help': zj_help, ':h': zj_help, ':help': zj_help, ':help:': zj_help, '.i': zj_info, '.info': zj_info, ':i': zj_info, ':info': zj_info, ':info:': zj_info, '.k': zj_keys, '.keys': zj_keys, ':keys': zj_keys, ':keys:': zj_keys, '.l': len, '.len': len, '.length': len, ':l': len, ':len': len, ':length': len, ':len:': len, ':length:': len, '.t': zj_type, '.type': zj_type, ':t': zj_type, ':type': zj_type, ':type:': zj_type, } def zj_pick(src: Any, keys: Tuple[str, ...]) -> Any: if isinstance(src, dict): picked = {} for k in keys: for k in zj_match_keys(src, k): picked[k] = src[k] return picked if isinstance(src, (list, tuple)): picked = [] for k in keys: for i in zj_match_indices(src, k): picked.append(src[i]) return tuple(picked) msg = f'can\'t pick properties from value of type {zj_type(src)}' raise Exception(msg) def zj_drop(src: Any, keys: Tuple[str, ...]) -> Any: if isinstance(src, dict): avoid = set() for k in keys: for k in zj_match_keys(src, k): avoid.add(k) return {k: v for k, v in src.items() if not k in avoid} if isinstance(src, (list, tuple)): l = len(src) avoid = set() for k in keys: for i in zj_match_indices(src, k): avoid.add(i if i >= 0 else i + l) return tuple(v for i, v in enumerate(src) if not i in avoid) msg = f'can\'t drop properties from value of type {zj_type(src)}' raise Exception(msg) zj_final_fallbacks: Dict[str, Callable] = { '+': zj_pick, ':+:': zj_pick, ':+': zj_pick, '+:': zj_pick, '/+': zj_pick, '+/': zj_pick, '-': zj_drop, ':-:': zj_drop, ':-': zj_drop, '-:': zj_drop, '/-': zj_drop, '-/': zj_drop, } from base64 import \ standard_b64encode, standard_b64decode, \ standard_b64encode as base64bytes, standard_b64decode as debase64bytes from collections import \ ChainMap, Counter, defaultdict, deque, namedtuple, OrderedDict, \ UserDict, UserList, UserString from copy import copy, deepcopy from datetime import \ MAXYEAR, MINYEAR, date, datetime, time, timedelta, timezone, tzinfo try: from datetime import now, UTC except Exception: now = lambda: datetime(2000, 1, 1).now() from decimal import Decimal, getcontext from difflib import \ context_diff, diff_bytes, Differ, get_close_matches, HtmlDiff, \ IS_CHARACTER_JUNK, IS_LINE_JUNK, ndiff, restore, SequenceMatcher, \ unified_diff from fractions import Fraction import functools from functools import \ cache, cached_property, cmp_to_key, get_cache_token, lru_cache, \ namedtuple, partial, partialmethod, recursive_repr, reduce, \ singledispatch, singledispatchmethod, total_ordering, update_wrapper, \ wraps from glob import glob, iglob try: from graphlib import CycleError, TopologicalSorter except Exception: pass from hashlib import \ file_digest, md5, pbkdf2_hmac, scrypt, sha1, sha224, sha256, sha384, \ sha512 from inspect import getfullargspec, getsource import itertools from itertools import \ accumulate, chain, combinations, combinations_with_replacement, \ compress, count, cycle, dropwhile, filterfalse, groupby, islice, \ permutations, product, repeat, starmap, takewhile, tee, zip_longest try: from itertools import pairwise from itertools import batched except Exception: pass from json import dump, dumps, loads import math Math = math from math import \ acos, acosh, asin, asinh, atan, atan2, atanh, ceil, comb, \ copysign, cos, cosh, degrees, dist, e, erf, erfc, exp, expm1, \ fabs, factorial, floor, fmod, frexp, fsum, gamma, gcd, hypot, inf, \ isclose, isfinite, isinf, isnan, isqrt, lcm, ldexp, lgamma, log, \ log10, log1p, log2, modf, nan, nextafter, perm, pi, pow, prod, \ radians, remainder, sin, sinh, sqrt, tan, tanh, tau, trunc, ulp try: from math import cbrt, exp2 except Exception: pass power = pow import operator from pathlib import Path from pprint import \ isreadable, isrecursive, pformat, pp, pprint, PrettyPrinter, saferepr from random import \ betavariate, choice, choices, expovariate, gammavariate, gauss, \ getrandbits, getstate, lognormvariate, normalvariate, paretovariate, \ randbytes, randint, random, randrange, sample, seed, setstate, \ shuffle, triangular, uniform, vonmisesvariate, weibullvariate compile_py = compile # keep built-in func compile for later from re import compile as compile_uncached, Pattern, IGNORECASE import statistics from statistics import \ bisect_left, bisect_right, fmean, \ geometric_mean, harmonic_mean, mean, median, \ median_grouped, median_high, median_low, mode, multimode, pstdev, \ pvariance, quantiles, stdev, variance try: from statistics import \ correlation, covariance, linear_regression, mul except Exception: pass import string from string import \ Formatter, Template, ascii_letters, ascii_lowercase, ascii_uppercase, \ capwords, digits, hexdigits, octdigits, printable, punctuation, \ whitespace alphabet = ascii_letters letters = ascii_letters lowercase = ascii_lowercase uppercase = ascii_uppercase from textwrap import dedent, fill, indent, shorten, wrap from time import \ altzone, asctime, \ ctime, daylight, get_clock_info, \ gmtime, localtime, mktime, monotonic, monotonic_ns, perf_counter, \ perf_counter_ns, process_time, process_time_ns, \ sleep, strftime, strptime, struct_time, thread_time, thread_time_ns, \ time, time_ns, timezone, tzname try: from time import \ clock_getres, clock_gettime, clock_gettime_ns, clock_settime, \ clock_settime_ns, pthread_getcpuclockid, tzset except Exception: pass from unicodedata import \ bidirectional, category, combining, decimal, decomposition, digit, \ east_asian_width, is_normalized, lookup, mirrored, name, normalize, \ numeric from urllib.parse import \ parse_qs, parse_qsl, quote, quote_from_bytes, quote_plus, unquote, \ unquote_plus, unquote_to_bytes, unwrap, urldefrag, urlencode, urljoin, \ urlparse, urlsplit, urlunparse, urlunsplit class Skip: 'Custom type which some funcs type-check to skip values in containers.' def __init__(self, *args) -> None: pass # skip is a ready-to-use value which some funcs filter against: this way # filtering values becomes a special case of transforming values skip = Skip() # re_cache is used by custom func compile to cache previously-compiled # regular-expressions, which makes them quicker to (re)use in formulas re_cache: Dict[str, Pattern] = {} # ire_cache is like re_cache, except it's for case-insensitive regexes ire_cache: Dict[str, Pattern] = {} # ansi_style_re detects the most commonly-used ANSI-style sequences, and # is used in func plain ansi_style_re = compile_uncached('\x1b\\[([0-9;]+m|[0-9]*[A-HJKST])') # number_re detects numbers, and is used in func numbers number_re = compile_uncached('\\W-?[0-9]+(\\.[0-9]*)?\\W') # link_re detects web links, and is used in func links link_re_src = 'https?://[A-Za-z0-9+_.:%-]+(/[A-Za-z0-9+_.%/,#?&=-]*)*' link_re = compile_uncached(link_re_src) # paddable_tab_re detects single tabs and possible runs of spaces around # them, and is used in func squeeze paddable_tab_re = compile_uncached(' *\t *') # seen remembers values already given to func `once` seen = set() # commented_re detects strings/lines which start as unix-style comments commented_re = compile_uncached('^ *#') # emptyish_re detects empty/emptyish strings/lines, the latter being strings # with only spaces in them emptyish_re = compile_uncached('^ *\r?\n?$') # spaces_re detects runs of 2 or more spaces, and is used in func squeeze spaces_re = compile_uncached(' +') # awk_sep_re splits like AWK does by default, and is used in func fields awk_sep_re = compile_uncached(' *\t *| +') # some convenience aliases to commonly-used values false = False true = True nil = None nihil = None none = None null = None s = '' months = [ 'January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December', ] monweek = [ 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday', ] sunweek = [ 'Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', ] phy = { 'kilo': 1_000, 'mega': 1_000_000, 'giga': 1_000_000_000, 'tera': 1_000_000_000_000, 'peta': 1_000_000_000_000_000, 'exa': 1_000_000_000_000_000_000, 'zetta': 1_000_000_000_000_000_000_000, 'c': 299_792_458, 'kcd': 683, 'na': 602214076000000000000000, 'femto': 1e-15, 'pico': 1e-12, 'nano': 1e-9, 'micro': 1e-6, 'milli': 1e-3, 'e': 1.602176634e-19, 'f': 96_485.33212, 'h': 6.62607015e-34, 'k': 1.380649e-23, 'mu': 1.66053906892e-27, 'ge': 9.7803267715, 'gn': 9.80665, } physics = phy # using literal strings on the cmd-line is often tricky/annoying: some of # these aliases can help get around multiple levels of string-quoting; no # quotes are needed as the script will later make these values accessible # via the property/dot syntax sym = { 'amp': '&', 'ampersand': '&', 'ansiclear': '\x1b[0m', 'ansinormal': '\x1b[0m', 'ansireset': '\x1b[0m', 'apo': '\'', 'apos': '\'', 'ast': '*', 'asterisk': '*', 'at': '@', 'backquote': '`', 'backslash': '\\', 'backtick': '`', 'ball': '●', 'bang': '!', 'bigsigma': 'Σ', 'block': '█', 'bquo': '`', 'bquote': '`', 'bslash': '\\', 'btick': '`', 'bullet': '•', 'caret': '^', 'cdot': '·', 'circle': '●', 'colon': ':', 'comma': ',', 'cr': '\r', 'crlf': '\r\n', 'cross': '×', 'cs': ', ', 'dash': '—', 'dollar': '$', 'dot': '.', 'dquo': '"', 'dquote': '"', 'emark': '!', 'emdash': '—', 'empty': '', 'endash': '–', 'eq': '=', 'et': '&', 'euro': '€', 'ge': '≥', 'geq': '≥', 'gt': '>', 'hellip': '…', 'hole': '○', 'hyphen': '-', 'infinity': '∞', 'lcurly': '{', 'ldquo': '“', 'ldquote': '“', 'le': '≤', 'leq': '≤', 'lf': '\n', 'lt': '<', 'mdash': '—', 'mdot': '·', 'miniball': '•', 'minus': '-', 'ndash': '–', 'neq': '≠', 'perc': '%', 'percent': '%', 'period': '.', 'plus': '+', 'qmark': '?', 'que': '?', 'rcurly': '}', 'rdquo': '”', 'rdquote': '”', 'sball': '•', 'semi': ';', 'semicolon': ';', 'sharp': '#', 'slash': '/', 'space': ' ', 'square': '■', 'squo': '\'', 'squote': '\'', 'tab': '\t', 'tilde': '~', 'underscore': '_', 'uscore': '_', 'utf8bom': '\xef\xbb\xbf', 'utf16be': '\xfe\xff', 'utf16le': '\xff\xfe', } symbols = sym units = { 'cup2l': 0.23658824, 'floz2l': 0.0295735295625, 'floz2ml': 29.5735295625, 'ft2m': 0.3048, 'gal2l': 3.785411784, 'in2cm': 2.54, 'lb2kg': 0.45359237, 'mi2km': 1.609344, 'mpg2kpl': 0.425143707, 'nmi2km': 1.852, 'oz2g': 28.34952312, 'psi2pa': 6894.757293168, 'ton2kg': 907.18474, 'yd2m': 0.9144, 'mol': 602214076000000000000000, 'mole': 602214076000000000000000, 'hour': 3_600, 'day': 86_400, 'week': 604_800, 'hr': 3_600, 'wk': 604_800, 'kb': 1024, 'mb': 1024**2, 'gb': 1024**3, 'tb': 1024**4, 'pb': 1024**5, } # some convenience aliases to various funcs from the python stdlib geomean = geometric_mean harmean = harmonic_mean sd = stdev popsd = pstdev var = variance popvar = pvariance randbeta = betavariate randexp = expovariate randgamma = gammavariate randlognorm = lognormvariate randnorm = normalvariate randweibull = weibullvariate capitalize = str.capitalize casefold = str.casefold center = str.center # count = str.count decode = bytes.decode encode = str.encode endswith = str.endswith expandtabs = str.expandtabs find = str.find format = str.format index = str.index isalnum = str.isalnum isalpha = str.isalpha isascii = str.isascii isdecimal = str.isdecimal isdigit = str.isdigit isidentifier = str.isidentifier islower = str.islower isnumeric = str.isnumeric isprintable = str.isprintable isspace = str.isspace istitle = str.istitle isupper = str.isupper # join = str.join ljust = str.ljust lower = str.lower lowered = str.lower lstrip = str.lstrip maketrans = str.maketrans partition = str.partition removeprefix = str.removeprefix removesuffix = str.removesuffix replace = str.replace rfind = str.rfind rindex = str.rindex rjust = str.rjust rpartition = str.rpartition rsplit = str.rsplit rstrip = str.rstrip # split = str.split splitlines = str.splitlines startswith = str.startswith strip = str.strip swapcase = str.swapcase title = str.title translate = str.translate upper = str.upper uppered = str.upper zfill = str.zfill every = all rev = reversed reverse = reversed some = any length = len blowtabs = str.expandtabs hasprefix = str.startswith hassuffix = str.endswith ltrim = str.lstrip stripstart = str.lstrip trimspace = str.strip trimstart = str.lstrip rtrim = str.rstrip stripend = str.rstrip trimend = str.rstrip stripped = str.strip trim = str.strip trimmed = str.strip trimprefix = str.removeprefix trimsuffix = str.removesuffix def required_arg_count(f: Callable) -> int: if isinstance(f, type): return 1 meta = getfullargspec(f) n = len(meta.args) if meta.defaults: n -= len(meta.defaults) return n def identity(x: Any) -> Any: ''' Return the value given: this is the default transformer for several higher-order funcs, which effectively keeps original items as given. ''' return x idem = identity iden = identity def after(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'Skip parts of strings/sequences up to the substring/value given.' return (strafter if isinstance(x, str) else itemsafter)(x, what) def afterlast(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'Skip parts of strings/sequences up to the last substring/value given.' return (strafterlast if isinstance(x, str) else itemsafterlast)(x, what) afterfinal = afterlast def arrayish(x: Any) -> bool: 'Check if a value is array-like enough.' return isinstance(x, (list, tuple, range, Generator)) isarrayish = arrayish def base64(x): return base64bytes(str(x).encode()).decode() def basename(s: str) -> str: 'Get a filepath\'s last part, if present.' return Path(s).name def before(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'End strings/sequences right before a substring/value\'s appearance.' return (strbefore if isinstance(x, str) else itemsbefore)(x, what) def beforelast(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'End strings/sequences right before a substring/value\'s last appearance.' return (strbeforelast if isinstance(x, str) else itemsbeforelast)(x, what) beforefinal = beforelast def cases(x: Any, *args: Any) -> Any: ''' Simulate a switch statement on a value, using matches/result pairs from the arguments given; when given an even number of extra args, None is used as a final fallback result; when given an odd number of extra args, the last argument is used as a final `default` value, if needed. ''' for i in range(0, len(args) - len(args) % 2, 2): test, res = args[i], args[i+1] if isinstance(test, (list, tuple)) and x in test: return res if isinstance(test, float) and isnan(test) and isnan(x): return res if x == test: return res return None if len(args) % 2 == 0 else args[-1] switch = cases def chunk(items: Iterable, chunk_size: int) -> Iterable: 'Break iterable into chunks, each with up to the item-count given.' if isinstance(items, str): n = len(items) while n >= chunk_size: yield items[:chunk_size] items = items[chunk_size:] n -= chunk_size if n > 0: yield items return if not isinstance(chunk_size, int): raise Exception('non-integer chunk-size') if chunk_size < 1: raise Exception('non-positive chunk-size') it = iter(items) while True: head = tuple(islice(it, chunk_size)) if not head: return yield head chunked = chunk def commented(s: str) -> bool: 'Check if a string starts as a unix-style comment.' return commented_re.match(s) != None iscommented = commented def compile(s: str, case_sensitive: bool = True) -> Pattern: 'Cached regex `compiler`, so it\'s quicker to (re)use in formulas.' cache = re_cache if case_sensitive else ire_cache options = 0 if case_sensitive else IGNORECASE if s in cache: return cache[s] e = compile_uncached(s, options) cache[s] = e return e def compose(*what: Callable) -> Callable: def composite(x: Any) -> Any: for f in what: x = f(x) return x return composite composed = compose lcompose = compose lcomposed = compose def cond(*args: Any) -> Any: ''' Simulate a chain of if-else statements, using condition/result pairs from the arguments given; when given an even number of args, None is used as a final fallback result; when given an odd number of args, the last argument is used as a final `else` value, if needed. ''' for i in range(0, len(args) - len(args) % 2, 2): if args[i]: return args[i+1] return None if len(args) % 2 == 0 else args[-1] def conform(x: Any, denan: Any = None, deinf: Any = None, fn = str) -> Any: 'Make values JSON-compatible.' if isinstance(x, float): # turn NaNs and Infinities into the replacement values given if isnan(x): return denan if isinf(x): return deinf return x if isinstance(x, (bool, int, float, str)): return x if isinstance(x, dict): return { str(k): conform(v) for k, v in x.items() if not (isinstance(k, Skip) or isinstance(v, Skip)) } if isinstance(x, Iterable): return [conform(e) for e in x if not isinstance(e, Skip)] if isinstance(x, DotCallable): return x.value return fn(x) fix = conform def countif(src: Iterable, check: Callable) -> int: ''' Count how many values make the func given true-like. This func works with sequences, dictionaries, and strings. ''' if callable(src): src, check = check, src check = predicate(check) total = 0 if isinstance(src, dict): for v in src.values(): if check(v): total += 1 else: for v in src: if check(v): total += 1 return total # def debase64(x): # return debase64bytes(str(x).encode()).decode() def debase64(s: str) -> bytes: 'Convert away from base64 encoding, including data-URIs.' if s.startswith('data:'): i = s.find(',') if i >= 0: return standard_b64decode(s[i + 1:]) return standard_b64decode(s) unbase64 = debase64 def dedup(v: Iterable) -> Iterable: 'Ignore reappearing items from iterables, after their first occurrence.' got = set() for e in v: if not e in got: got.add(e) yield e dedupe = dedup deduped = dedup deduplicate = dedup deduplicated = dedup undup = dedup undupe = dedup unduped = dedup unduplicate = dedup unduplicated = dedup unique = dedup uniqued = dedup def defunc(x: Any) -> Any: 'Call if value is a func, or return it back as given.' return x() if callable(x) else x callmemaybe = defunc defunct = defunc unfunc = defunc unfunct = defunc def dejson(x: Any, catch: Union[Callable[[Exception], Any], Any] = None) -> Any: 'Safely parse JSON from strings.' try: return loads(x) if isinstance(x, str) else x except Exception as e: return catch(e) if callable(catch) else catch unjson = dejson def denan(x: Any, fallback: Any = None) -> Any: 'Replace floating-point NaN with the alternative value given.' return x if not (isinstance(x, float) and isnan(x)) else fallback def denil(*args: Any) -> Any: 'Avoid None values, if possible: first value which isn\'t None wins.' for e in args: if e != None: return e return None denone = denil denull = denil def dirname(s: str) -> str: 'Ignore the last part of a filepath.' return str(Path(s).parent) def dive(into: Any, doing: Callable) -> Any: 'Transform a nested value by calling a func via depth-first recursion.' # support args in either order if callable(into): into, doing = doing, into return _dive_kv(None, into, doing) deepmap = dive dive1 = dive def divebin(x: Any, y: Any, doing: Callable) -> Any: 'Nested 2-value version of depth-first-recursive func dive.' # support args in either order if callable(x): x, y, doing = y, doing, x narg = required_arg_count(doing) if narg == 2: return dive(x, lambda a: dive(y, lambda b: doing(a, b))) if narg == 4: return dive(x, lambda i, a: dive(y, lambda j, b: doing(i, a, j, b))) raise Exception('divebin(...) only supports funcs with 2 or 4 args') bindive = divebin # diveboth = divebin # dualdive = divebin # duodive = divebin dive2 = divebin def _dive_kv(key: Any, into: Any, doing: Callable) -> Any: if isinstance(into, dict): return {k: _dive_kv(k, v, doing) for k, v in into.items()} if isinstance(into, Iterable) and not isinstance(into, str): return [_dive_kv(i, e, doing) for i, e in enumerate(into)] narg = required_arg_count(doing) return doing(key, into) if narg == 2 else doing(into) class DotCallable: 'Enable convenient dot-syntax calling of 1-input funcs.' def __init__(self, value: Any): self.value = value def __getattr__(self, key: str) -> Any: return DotCallable(globals()[key](self.value)) class Dottable: 'Enable convenient dot-syntax access to dictionary values.' def __getattr__(self, key: Any) -> Any: return self.__dict__[key] if key in self.__dict__ else None def __getitem__(self, key: Any) -> Any: return self.__dict__[key] if key in self.__dict__ else None def __iter__(self) -> Iterable: return iter(self.__dict__) def dotate(x: Any) -> Union[Dottable, Any]: 'Recursively ensure all dictionaries in a value are dot-accessible.' if isinstance(x, dict): d = Dottable() d.__dict__ = {k: dotate(v) for k, v in x.items()} return d if isinstance(x, list): return [dotate(e) for e in x] if isinstance(x, tuple): return tuple(dotate(e) for e in x) return x dotated = dotate dote = dotate doted = dotate dotified = dotate dotify = dotate dottified = dotate dottify = dotate # make dictionaries `physics`, `symbols`, and `units` easier to use phy = dotate(phy) physics = phy sym = dotate(sym) symbols = sym units = dotate(units) def drop(src: Any, *what) -> Any: ''' Either ignore all substrings occurrences, or ignore all keys given from an object, or even from a sequence of objects. ''' if isinstance(src, str): return strdrop(src, *what) return _itemsdrop(src, set(what)) dropped = drop # ignore = drop # ignored = drop def _itemsdrop(src: Any, what: Set) -> Any: if isinstance(src, dict): kv = {} for k, v in src.items(): if not (k in what): kv[k] = v return kv if isinstance(src, Iterable): return [_itemsdrop(e, what) for e in src] return None def each(src: Iterable, f: Callable) -> Any: ''' A generalization of built-in func map, which can also handle dictionaries and strings. ''' if callable(src): src, f = f, src if isinstance(src, dict): return mapkv(src, lambda k, _: k, f) if isinstance(src, str): s = StringIO() f = loopify(f) for i, c in enumerate(src): v = f(i, c) if not isinstance(v, Skip): s.write(str(v)) return s.getvalue() return tuple(f(i, v) for i, v in enumerate(src)) mapped = each def emptyish(x: Any) -> bool: ''' Check if a value can be considered empty, which includes non-empty strings which only have spaces in them. ''' def check(x: Any) -> bool: if not x: return True if isinstance(x, str): return bool(emptyish_re.match(x)) return False if check(x): return True if isinstance(x, Iterable): return all(check(e) for e in x) return False isemptyish = emptyish def endict(x: Any) -> Dict[str, Any]: 'Turn non-dictionary values into dictionaries with string keys.' if isinstance(x, dict): return {str(k): v for k, v in x.items()} if arrayish(x): return {str(e): e for e in x} return {str(x): x} dicted = endict endicted = endict indict = endict todict = endict def enfloat(x: Any, fallback: float = nan) -> float: try: return float(x) except Exception: return fallback enfloated = enfloat floated = enfloat floatify = enfloat floatize = enfloat tofloat = enfloat def enint(x: Any, fallback: Any = None) -> Any: try: return int(x) except Exception: return fallback eninted = enint inted = enint integered = enint intify = enint intize = enint toint = enint def enlist(x: Any) -> List[Any]: 'Turn non-list values into lists.' return list(x) if arrayish(x) else [x] # inlist = enlist enlisted = enlist listify = enlist listize = enlist tolist = enlist def entuple(x: Any) -> Tuple[Any, ...]: 'Turn non-tuple values into tuples.' return tuple(x) if arrayish(x) else (x, ) entupled = entuple ntuple = entuple ntupled = entuple tuplify = entuple tuplize = entuple toentuple = entuple tontuple = entuple totuple = entuple def error(message: Any) -> Exception: return Exception(str(message)) err = error def ext(s: str) -> str: 'Get a filepath\'s extension, if present.' name = Path(s).name i = name.rfind('.') return name[i:] if i >= 0 else '' filext = ext def fail(message: Any, error_code: int = 255) -> NoReturn: stdout.flush() print(f'\x1b[31m{message}\x1b[0m', file=stderr) quit(error_code) abort = fail bail = fail def fields(s: str) -> Iterable[str]: 'Split fields AWK-style from the string given.' return awk_sep_re.split(s.strip()) # items = fields splitfields = fields splititems = fields words = fields def first(items: SupportsIndex, fallback: Any = None) -> Any: return items[0] if len(items) > 0 else fallback def flappend(*args: Any) -> List[Any]: 'Turn arbitrarily-nested values/sequences into a single flat sequence.' flat = [] def dig(x: Any) -> None: if arrayish(x): for e in x: dig(e) elif isinstance(x, dict): for e in x.values(): dig(e) else: flat.append(x) for e in args: dig(e) return flat def flat(*args: Any) -> Iterable: 'Turn arbitrarily-nested values/sequences into a single flat sequence.' def _flat_rec(x: Any) -> Iterable: if x is None: return if isinstance(x, dict): yield from _flat_rec(x.values()) if isinstance(x, str): yield x return if isinstance(x, Iterable): for e in x: yield from _flat_rec(e) return yield x for x in args: yield from _flat_rec(x) flatten = flat flattened = flat def fromto(start, stop, f: Callable = identity) -> Iterable: 'Sequence all integers between the numbers given, end-value included.' return (f(e) for e in range(start, stop + 1)) def fuzz(x: Union[int, float]) -> Union[float, Dict[str, float]]: ''' Deapproximate numbers to their max range before approximation: the result is a dictionary with the guessed lower-bound number, the number given, and the guessed upper-bound number which can approximate to the original number given. NaNs and the infinities are returned as given, instead of resulting in a dictionary. ''' if isnan(x) or isinf(x): return x if x == 0: return {'-0.5': -0.5, '0': 0.0, '0.5': +0.5} if x % 1 != 0: # return surrounding integers when given non-integers a = floor(x) b = ceil(x) return {str(a): a, str(x): x, str(b): b} if x % 10 != 0: a = x - 0.5 b = x + 0.5 return {str(a): a, str(x): x, str(b): b} # find the integer log10 of the absolute value; 0 was handled previously y = int(abs(x)) p10 = 1 while True: if y % p10 != 0: p10 /= 10 break p10 *= 10 delta = p10 / 2 s = +1 if x > 0 else -1 ux = abs(x) a = s * ux - delta b = s * ux + delta return {str(a): a, str(x): x, str(b): b} def generated(src: Any) -> Any: 'Make tuples out of generators, or return non-generator values as given.' return tuple(src) if isinstance(src, (Generator, range)) else src concrete = generated concreted = generated concretize = generated concretized = generated degen = generated degenerate = generated degenerated = generated degenerator = generated gen = generated generate = generated synth = generated synthed = generated synthesize = generated synthesized = generated def group(src: Iterable, by: Callable = identity) -> Dict: ''' Separate transformed items into arrays, the final result being a dict whose keys are all the transformed values, and whose values are lists of all the original values which did transform to their group's key. ''' if callable(src): src, by = by, src by = loopify(by) kv = src.items() if isinstance(src, dict) else enumerate(src) groups = {} for k, v in kv: dk = by(k, v) if isinstance(dk, Skip) or isinstance(v, Skip): continue if dk in groups: groups[dk].append(v) else: groups[dk] = [v] return groups grouped = group def gire(src: Iterable[str], using: Iterable[str], fallback: Any = '') -> Dict: ''' Group matched items into arrays, the final result being a dict whose keys are all the matchable regexes given, and whose values are lists of all the original values which did case-insensitively match their group's key as a regex. ''' using = tuple(using) return group(src, lambda x: imatch(x, using, fallback)) gbire = gire groupire = gire def gre(src: Iterable[str], using: Iterable[str], fallback: Any = '') -> Dict: ''' Group matched items into arrays, the final result being a dict whose keys are all the matchable regexes given, and whose values are lists of all the original values which did regex-match their group's key. ''' using = tuple(using) return group(src, lambda x: match(x, using, fallback)) gbre = gre groupre = gre def gsub(s: str, what: str, repl: str) -> str: 'Replace all regex-matches with the string given.' return compile(what).sub(repl, s) def harden(f: Callable, fallback: Any = None) -> Callable: def _hardened_caller(*args): try: return f(*args) except Exception: return fallback return _hardened_caller hardened = harden insure = harden insured = harden def horner(coeffs: List[float], x: Union[int, float]) -> float: if isinstance(coeffs, (int, float)): coeffs, x = x, coeffs if len(coeffs) == 0: return 0 y = coeffs[0] for c in islice(coeffs, 1, None): y *= x y += c return y polyval = horner def idiota(n: int, f: Callable = identity) -> Dict[int, int]: 'ID (keys) version of func iota.' return { v: v for v in (f(e) for e in range(1, n + 1))} dictiota = idiota kviota = idiota def imatch(what: str, using: Iterable[str], fallback: str = '') -> str: 'Try to case-insensitively match a string with any of the regexes given.' if not isinstance(what, str): what, using = using, what for s in using: expr = compile(s, False) m = expr.search(what) if m: # return what[m.start():m.end()] return s return fallback def indices(x: Any) -> Iterable[Any]: 'List all indices/keys, or get an exclusive range from an int.' if isinstance(x, int): return range(x) if isinstance(x, dict): return x.keys() if isinstance(x, (str, list, tuple)): return range(len(x)) return tuple() keys = indices def ints(start, stop, f: Callable = identity) -> Iterable[int]: 'Sequence integers, end-value included.' if isnan(start) or isnan(stop) or isinf(start) or isinf(stop): return tuple() return (f(e) for e in range(int(ceil(start)), int(stop) + 1)) integers = ints def iota(n: int, f: Callable = identity) -> Iterable[int]: 'Sequence all integers from 1 up to (and including) the int given.' return (f(e) for e in range(1, n + 1)) def itemsafter(x: Iterable, what: Any) -> Iterable: ok = False check = predicate(what) for e in x: if ok: yield e elif check(e): ok = True def itemsafterlast(x: Iterable, what: Any) -> Iterable: rest: List[Any] = [] check = predicate(what) for e in x: if check(e): rest.clear() else: rest.append(e) for e in islice(rest, 1, len(rest)): yield e def itemsbefore(x: Iterable, what: Any) -> Iterable: check = predicate(what) for e in x: if check(e): return yield e def itemsbeforelast(x: Iterable, what: Any) -> Iterable: items = [] for e in x: items.append(e) i = -1 check = predicate(what) for j, e in enumerate(reversed(items)): if check(e): i = j break if i < 0: return items if i == 0: return tuple() for e in islice(items, 0, i): yield e def itemssince(x: Iterable, what: Any) -> Iterable: ok = False check = predicate(what) for e in x: ok = ok or check(e) if ok: yield e def itemssincelast(x: Iterable, what: Any) -> Iterable: rest: List[Any] = [] check = predicate(what) for e in x: if check(e): rest.clear() else: rest.append(e) return rest def itemsuntil(x: Iterable, what: Any) -> Iterable: check = predicate(what) for e in x: yield e if check(e): return def itemsuntillast(x: Iterable, what: Any) -> Iterable: items = [] for e in x: items.append(e) i = -1 check = predicate(what) for j, e in enumerate(reversed(items)): if check(e): i = j break if i < 0: return items for e in islice(items, 0, i + 1): yield e itemsuntilfinal = itemsuntillast def join(items: Iterable, sep: Union[str, Iterable] = ' ') -> Union[str, Dict]: ''' Join iterables using the separator-string given: its 2 arguments can come in either order, and are sorted out if needed. When given 2 non-string iterables, the result is an object whose keys are from the first argument, and whose values are from the second one. You can use it any of the following ways, where `keys` and `values` are sequences (lists, tuples, or generators), and `separator` is a string: join(values) join(values, separator) join(separator, values) join(keys, values) ''' if arrayish(items) and arrayish(sep): return {k: v for k, v in zip(items, sep)} if isinstance(items, str): items, sep = sep, items return sep.join(str(e) for e in items) def joined_paragraphs(lines: Iterable[str]) -> Iterable[Sequence[str]]: ''' Regroup lines into individual paragraphs, each of which can span multiple lines: such paragraphs have no empty lines in them, and never end with a trailing line-feed. ''' par: List[str] = [] for l in lines: if (not l) and par: yield '\n'.join(par) par.clear() else: par.append(l) if len(par) > 0: yield '\n'.join(par) def json0(x: Any) -> str: 'Encode value into a minimal single-line JSON string.' return dumps(x, separators=(',', ':'), allow_nan=False, indent=None) j0 = json0 def json2(x: Any) -> str: ''' Encode value into a (possibly multiline) JSON string, using 2 spaces for each indentation level. ''' return dumps(x, separators=(',', ': '), allow_nan=False, indent=2) j2 = json2 def jsonl(x: Any) -> Iterable: 'Turn value into multiple JSON-encoded strings, known as JSON Lines.' if x is None: yield dumps(x, allow_nan=False) elif isinstance(x, (bool, int, float, dict, str)): yield dumps(x, allow_nan=False) elif isinstance(x, Iterable): for e in x: yield dumps(e, allow_nan=False) else: yield dumps(str(x), allow_nan=False) jsonlines = jsonl tojsonl = jsonl tojsonlines = jsonl def keep(src: Iterable, pred: Any) -> Iterable: ''' A generalization of built-in func filter, which can also handle dicts and strings. ''' if callable(src): src, pred = pred, src pred = predicate(pred) pred = loopify(pred) if isinstance(src, str): out = StringIO() for i, c in enumerate(src): if pred(i, c): out.write(c) return out.getvalue() if isinstance(src, dict): return { k: v for k, v in src.items() if pred(k, v) } return (e for i, e in enumerate(src) if pred(i, e)) filtered = keep kept = keep def last(items: SupportsIndex, fallback: Any = None) -> Any: return items[-1] if len(items) > 0 else fallback def links(src: Any) -> Iterable: 'Auto-detect all (HTTP/HTTPS) hyperlink-like substrings.' if isinstance(src, str): for match in link_re.finditer(src): # yield src[match.start():match.end()] yield match.group(0) elif isinstance(src, dict): for k, v in src.items(): yield from k yield from links(v) elif isinstance(src, Iterable): for v in src: yield from links(v) def loopify(x: Callable) -> Callable: nargs = required_arg_count(x) if nargs == 2: return x elif nargs == 1: return lambda _, v: x(v) else: raise Exception('only funcs with 1 or 2 args are supported') def mapkv(src: Iterable, key: Callable, value: Callable = identity) -> Dict: ''' A map-like func for dictionaries, which uses 2 mapping funcs, the first for the keys, the second for the values. ''' if key is None: key = lambda k, _: k if callable(src): src, key, value = value, src, key if required_arg_count(key) != 2: oldkey = key key = lambda k, _: oldkey(k) key = loopify(key) value = loopify(value) # if isinstance(src, dict): # return { key(k, v): value(k, v) for k, v in src.items() } # return { key(i, v): value(i, v) for i, v in enumerate(src) } def add(k, v, to): dk = key(k, v) dv = value(k, v) if isinstance(dk, Skip) or isinstance(dv, Skip): return to[dk] = dv res = {} kv = src.items() if isinstance(src, dict) else enumerate(src) for k, v in kv: add(k, v, res) return res def match(what: str, using: Iterable[str], fallback: str = '') -> str: 'Try to match a string with any of the regexes given.' if not isinstance(what, str): what, using = using, what for s in using: expr = compile(s) m = expr.search(what) if m: # return what[m.start():m.end()] return s return fallback def maybe(f: Callable, x: Any) -> Any: ''' Try calling a func on a value, using the same value as a fallback result, in case of exceptions. ''' if not callable(f): f, x = x, f try: return f(x) except Exception: return x def mappend(*args) -> Dict: kv = {} for src in args: if isinstance(src, dict): for k, v in src.items(): kv[k] = v else: raise Exception('mappend only works with dictionaries') return kv def message(x: Any, result: Any = skip) -> Any: print(x, file=stderr) return result msg = message def must(cond: Any, errmsg: str = 'condition given not always true') -> None: 'Enforce conditions, raising an exception on failure.' if not cond: raise Exception(errmsg) demand = must enforce = must def nowdict() -> dict: v = datetime(2000, 1, 1).now() return { 'year': v.year, 'month': v.month, 'day': v.day, 'hour': v.hour, 'minute': v.minute, 'second': v.second, 'text': v.strftime('%Y-%m-%d %H:%M:%S %b %a'), 'weekday': v.strftime('%A'), } def number(x: Any) -> Union[int, float, Any]: ''' Try to turn the value given into a number, using a fallback value instead of raising exceptions. ''' if isinstance(x, float): return x try: return int(x) except Exception: return float(x) def numbers(src: Any) -> Iterable: 'Auto-detect all number-like substrings.' if isinstance(src, str): for match in number_re.finditer(src): yield match.group(0) # yield src[match.start():match.end()] elif isinstance(src, dict): for k, v in src.items(): yield from k yield from links(v) elif isinstance(src, Iterable): for v in src: yield from links(v) def numsign(x: Union[int, float]) -> Union[int, float]: 'Get a number\'s sign, or NaN if the number given is a NaN.' if isinstance(x, int): if x > 0: return +1 if x < 0: return -1 return 0 if isnan(x): return x if x > 0: return +1.0 if x < 0: return -1.0 return 0.0 def numstats(src: Any) -> Dict[str, Union[float, int]]: 'Gather several single-pass numeric statistics.' n = mean_sq = ln_sum = 0 least = +inf most = -inf total = mean = 0 prod = 1 nans = ints = pos = zero = neg = 0 def update_numstats(x: Any) -> None: nonlocal nans, n, ints, pos, neg, zero, least, most, total, prod nonlocal ln_sum, mean, mean_sq if not isinstance(x, (float, int)): return if isnan(x): nans += 1 return n += 1 ints += int(isinstance(x, int) or x == floor(x)) if x > 0: pos += 1 elif x < 0: neg += 1 else: zero += 1 least = min(least, x) most = max(most, x) # total += x prod *= x ln_sum += log(x) d1 = x - mean mean += d1 / n d2 = x - mean mean_sq += d1 * d2 def _numstats_rec(src: Any) -> None: if isinstance(src, dict): for e in src.values(): _numstats_rec(e) elif isinstance(src, Iterable) and not isinstance(src, str): for e in src: _numstats_rec(e) else: update_numstats(src) _numstats_rec(src) sd = nan geomean = nan if n > 0: sd = sqrt(mean_sq / n) geomean = exp(ln_sum / n) if not isinf(ln_sum) else nan total = n * mean return { 'n': n, 'nan': nans, 'min': least, 'max': most, 'sum': total, 'mean': mean, 'geomean': geomean, 'sd': sd, 'product': prod, 'integer': ints, 'positive': pos, 'zero': zero, 'negative': neg, } def once(x: Any, replacement: Any = None) -> Any: ''' Replace the first argument given after the first time this func has been given it: this is a deliberately stateful function, given its purpose. ''' if not (x in seen): seen.add(x) return x else: return replacement onced = once def pad(s: str, n: int, pad: str = ' ') -> str: l = len(s) return s if l >= n else s + int((n - l) / len(pad)) * pad def padcenter(s: str, n: int, pad: str = ' ') -> str: return s.center(n, pad) centerpad = padcenter centerpadded = padcenter cjust = padcenter cpad = padcenter padc = padcenter paddedcenter = padcenter def padend(s: str, n: int, pad: str = ' ') -> str: return s.rjust(n, pad) padr = padend padright = padend paddedend = padend paddedright = padend rpad = padend rightpad = padend rightpadded = padend def padstart(s: str, n: int, pad: str = ' ') -> str: return s.ljust(n, pad) lpad = padstart leftpad = padstart leftpadded = padstart padl = padstart padleft = padstart paddedleft = padstart paddedstart = padstart def panic(x: Any) -> None: raise Exception(x) def paragraphize(lines: Iterable[str]) -> Iterable[Sequence[str]]: ''' Regroup lines into individual paragraphs, each of which is a list of single-line strings, none of which never end with a trailing line-feed. ''' par: List[str] = [] for l in lines: if (not l) and par: yield par par.clear() else: par.append(l) if len(par) > 0: yield par paragraphed = paragraphize paragraphs = paragraphize paragroup = paragraphize pargroup = paragraphize def parse(s: str, fallback: Any = None) -> Any: 'Try to parse JSON, ignoring exceptions in favor of a fallback value.' try: return loads(s) except Exception: return fallback fromjson = parse parsed = parse loaded = parse unjson = parse def pick(src: Any, *what) -> Any: 'Pick only the keys given from an object, or even a sequence of objects.' if isinstance(src, dict): kv = {} for k in what: kv[k] = src[k] return kv if isinstance(src, Iterable): return [pick(e, *what) for e in src] return None picked = pick def plain(s: str) -> str: 'Ignore all ANSI-style sequences in a string.' return ansi_style_re.sub('', s) def predicate(x: Any) -> Callable: 'Helps various higher-order funcs, by standardizing `predicate` values.' if callable(x): return x if isinstance(x, float): if isnan(x): return lambda y: isinstance(y, float) and isnan(y) if isinf(x): return lambda y: isinstance(y, float) and isinf(y) return lambda y: x == y pred = predicate def quoted(s: str, quote: str = '"') -> str: 'Surround a string with quotes.' return f'{quote}{s}{quote}' def recover(*args) -> Any: ''' Catch exceptions using a lambda/callback func, in one of 6 ways recover(zero_args_func) recover(zero_args_func, exception_replacement_value) recover(zero_args_func, one_arg_exception_handling_func) recover(one_arg_func, arg) recover(one_arg_func, arg, exception_replacement_value) recover(one_arg_func, arg, one_arg_exception_handling_func) ''' if len(args) == 1: f = args[0] try: return f() except Exception: return None elif len(args) == 2: f, fallback = args[0], args[1] if callable(f) and callable(fallback): try: return f() except Exception as e: nargs = required_arg_count(fallback) return fallback(e) if nargs == 1 else fallback() else: try: return f() if required_arg_count(f) == 0 else f(args[1]) except Exception: return fallback elif len(args) == 3: f, x, fallback = args[0], args[1], args[2] if callable(f) and callable(fallback): try: return f(x) except Exception as e: nargs = required_arg_count(fallback) return fallback(e) if nargs == 1 else fallback() else: try: return f(x) except Exception: return fallback else: raise Exception('recover(...) only works with 1, 2, or 3 args') attempt = recover attempted = recover recovered = recover recoverred = recover rescue = recover rescued = recover trycall = recover def reject(src: Iterable, pred: Any) -> Iterable: ''' A generalization of built-in func filter, which uses predicate funcs the opposite way, and which can also handle dicts and strings. ''' if callable(src): src, pred = pred, src pred = predicate(pred) pred = loopify(pred) if isinstance(src, str): out = StringIO() for i, c in enumerate(src): if not pred(i, c): out.write(c) return out.getvalue() if isinstance(src, dict): return { k: v for k, v in src.items() if not pred(k, v) } return (e for i, e in enumerate(src) if not pred(i, e)) avoid = reject avoided = reject keepout = reject keptout = reject rejected = reject def retype(x: Any) -> Any: 'Try to narrow the type of the value given.' if isinstance(x, float): return int(x) if floor(x) == x else x if not isinstance(x, str): return x try: return loads(x) except Exception: pass try: return int(x) except Exception: pass try: return float(x) except Exception: pass return x autocast = retype mold = retype molded = retype narrow = retype narrowed = retype recast = retype recasted = retype remold = retype remolded = retype retyped = retype def revcompose(*what: Callable) -> Callable: def composite(x: Any) -> Any: for f in reversed(what): x = f(x) return x return composite rcompose = revcompose rcomposed = revcompose revcomposed = revcompose def revsort(iterable: Iterable, key: Optional[Callable] = None) -> Iterable: return sorted(iterable, key=key, reverse=True) revsorted = revsort # def revsortkv(src: Dict, key: Callable = None) -> Dict: # if not key: # key = lambda kv: (kv[1], kv[0]) # return sortkv(src, key, reverse=True) def revsortkv(src: Dict, key: Callable = None) -> Dict: if key is None: key = lambda x: x[1] return sortkv(src, key, reverse=True) revsortedkv = revsortkv def rstripdecs(s: str) -> str: ''' Ignore trailing zero decimals on number-like strings; even ignore the decimal dot if trailing. ''' try: f = float(s) if isnan(f) or isinf(f): return s dot = s.find('.') if dot < 0: return s s = s.rstrip('0') return s[:-1] if s.endswith('.') else s except Exception: return s chopdecs = rstripdecs def scale(x: float, x0: float, x1: float, y0: float, y1: float) -> float: 'Transform a value from a linear domain into another linear one.' return (y1 - y0) * (x - x0) / (x1 - x0) + y0 rescale = scale rescaled = scale scaled = scale def shortened(s: str, maxlen: int, trailer: str = '') -> str: 'Limit strings to the symbol-count given, including an optional trailer.' maxlen = max(maxlen, 0) return s if len(s) <= maxlen else s[:maxlen - len(trailer)] + trailer def shuffled(x: Any) -> Any: 'Return a shuffled copy of the list given.' y = copy(x) shuffle(y) return y def split(src: Union[str, Sequence], n: Union[str, int]) -> Iterable: 'Split/break a string/sequence into several chunks/parts.' if isinstance(src, str) and isinstance(n, str): return src.split(n) if not (isinstance(src, (str, Sequence)) and isinstance(n, int)): raise Exception('unsupported type-pair of arguments') if n < 1: return [] l = len(src) if l <= n: return src.split('') if isinstance(src, str) else src chunks = [] csize = int(ceil(l / n)) while len(src) > 0: chunks.append(src[:csize]) src = src[csize:] return chunks broken = split splitted = split splitten = split def strdrop(x: str, *what: str) -> str: 'Ignore all occurrences of all substrings given.' for s in what: x = x.replace(s, '') return x strignore = strdrop def stringify(x: Any) -> str: 'Fancy alias for func dumps, named after JavaScript\'s func.' return dumps(x, separators=(', ', ': '), allow_nan=False, indent=None) jsonate = stringify jsonify = stringify tojson = stringify def strafter(x: str, what: str) -> str: i = x.find(what) return '' if i < 0 else x[i+len(what):] def strafterlast(x: str, what: str) -> str: i = x.rfind(what) return '' if i < 0 else x[i+len(what):] def strbefore(x: str, what: str) -> str: i = x.find(what) return x if i < 0 else x[:i] def strbeforelast(x: str, what: str) -> str: i = x.rfind(what) return x if i < 0 else x[:i] def strsince(x: str, what: str) -> str: i = x.find(what) return '' if i < 0 else x[i:] def strsincelast(x: str, what: str) -> str: i = x.rfind(what) return '' if i < 0 else x[i:] def struntil(x: str, what: str) -> str: i = x.find(what) return x if i < 0 else x[:i+len(what)] def struntillast(x: str, what: str) -> str: i = x.rfind(what) return x if i < 0 else x[:i+len(what)] struntilfinal = struntillast def since(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'Start strings/sequences with a substring/value\'s appearance.' return (strsince if isinstance(x, str) else itemssince)(x, what) def sincelast(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'Start strings/sequences with a substring/value\'s last appearance.' return (strsincelast if isinstance(x, str) else itemssincelast)(x, what) sincefinal = sincelast def sortk(x: Dict, key: Callable = identity, reverse: bool = False) -> Dict: keys = sorted(x.keys(), key=key, reverse=reverse) return {k: x[k] for k in keys} sortkeys = sortk sortedkeys = sortk def sortkv(src: Dict, key: Callable = None, reverse: bool = False) -> Dict: if key is None: key = lambda x: x[1] kv = sorted(src.items(), key=key, reverse=reverse) return {k: v for (k, v) in kv} sortedkv = sortkv def squeeze(s: str) -> str: ''' A more aggressive way to rid strings of extra spaces which, unlike string method strip, also turns inner runs of multiple spaces into single ones. ''' s = s.strip() s = spaces_re.sub(' ', s) s = paddable_tab_re.sub('\t', s) return s squeezed = squeeze def stround(x: Union[int, float], decimals: int = 6) -> str: 'Format numbers into a string with the given decimal-digit count.' if decimals >= 0: return f'{x:.{decimals}f}' else: return f'{round(x, decimals):.0f}' def tally(src: Iterable, by: Callable = identity) -> Dict[Any, int]: ''' Count all distinct (transformed) values, the result being a dictionary whose keys are all the transformed values, and whose items are positive integers. ''' if callable(src): src, by = by, src tally: Dict[Any, int] = {} by = loopify(by) if isinstance(src, dict): for k, v in src.items(): dk = by(k, v) if dk in tally: tally[dk] += 1 else: tally[dk] = 1 else: for i, v in enumerate(src): dk = by(i, v) if dk in tally: tally[dk] += 1 else: tally[dk] = 1 return tally tallied = tally def transpose(src: Any) -> Any: 'Turn lists/objects inside-out like socks, so to speak.' if isinstance(src, dict): return { v: k for k, v in src.items() } if not arrayish(src): msg = 'transpose only supports objects or iterables of objects' raise ValueError(msg) kv: Dict[Any, Any] = {} seq: List[Any] = [] for e in src: if isinstance(e, dict): for k, v in e.items(): if k in kv: kv[k].append(v) else: kv[k] = [v] elif isinstance(e, Iterable): for i, v in enumerate(e): if i < len(seq): seq[i].append(v) else: seq.append([v]) else: msg = 'transpose(...): not all items are iterables/objects' raise ValueError(msg) if len(kv) > 0 and len(seq) > 0: msg = 'transpose(...): mix of iterables and objects not supported' raise ValueError(msg) return kv if len(seq) == 0 else seq tr = transpose transp = transpose transposed = transpose def trap(x: Callable, y: Union[Callable[[Exception], Any], Any] = None) -> Any: 'Try running a func, handing exceptions over to a fallback func.' try: return x() if callable(x) else x except Exception as e: if callable(y): nargs = required_arg_count(y) return y(e) if nargs == 1 else y() else: return y catch = trap catched = trap caught = trap noerr = trap noerror = trap noerrors = trap safe = trap save = trap saved = trap trapped = trap def tsv(x: str, fn: Union[Callable, None] = None) -> Any: if fn is None: return x.split('\t') if callable(x): x, fn = fn, x return fn(x.split('\t')) def typename(x: Any) -> str: if x is None: return 'null' if isinstance(x, bool): return 'boolean' if isinstance(x, str): return 'string' if isinstance(x, (int, float)): return 'number' if isinstance(x, (list, tuple)): return 'array' if isinstance(x, dict): return 'object' return type(x).__name__ def typeof(x: Any) -> str: 'Get a value\'s JS-like typeof type-string.' if callable(x): return 'function' return { bool: 'boolean', int: 'number', float: 'number', str: 'string', }.get(type(x), 'object') def unixify(s: str) -> str: ''' Make plain-text `unix-style`, ignoring a leading UTF-8 BOM if present, and turning any/all CRLF byte-pairs into line-feed bytes. ''' s = s.lstrip('\xef\xbb\xbf') return s.replace('\r\n', '\n') if '\r\n' in s else s def unquoted(s: str) -> str: 'Ignore surrounding quotes in a string.' if s.startswith('"') and s.endswith('"'): return s[1:-1] if s.startswith('\'') and s.endswith('\''): return s[1:-1] if s.startswith('`') and s.endswith('`'): return s[1:-1] if s.startswith('”') and s.endswith('“'): return s[1:-1] if s.startswith('“') and s.endswith('”'): return s[1:-1] return s dequote = unquoted dequoted = unquoted def until(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'End strings/sequences with a substring/value\'s appearance.' return (struntil if isinstance(x, str) else itemsuntil)(x, what) def untillast(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'End strings/sequences with a substring/value\'s last appearance.' return (struntillast if isinstance(x, str) else itemsuntillast)(x, what) untilfinal = untillast def wait(seconds: Union[int, float], result: Any) -> Any: 'Wait the given number of seconds, before returning its latter arg.' t = (int, float) if (not isinstance(seconds, t)) and isinstance(result, t): seconds, result = result, seconds sleep(seconds) return result delay = wait def wat(*args) -> None: 'What Are These (wat) shows help/doc messages for funcs given to it.' from pydoc import doc c = 0 w = stderr for e in args: if not callable(e): continue if c > 0: print(file=w) print(f'\x1b[48;5;253m\x1b[38;5;26m{e.__name__:80}\x1b[0m', file=w) doc(e, output=w) c += 1 return Skip() def wit(*args) -> None: 'What Is This (wit) shows help/doc messages for funcs given to it.' return wat(*args) def zoom(x: Any, *keys_indices) -> Any: for k in keys_indices: # allow int-indexing dicts the same way lists/tuples can be if isinstance(x, dict) and isinstance(k, int): l = len(x) if i < 0: i += l if i < 0 or i >= len(x): x = None continue for i, e in enumerate(x.values()): if i == k: x = e break continue # regular key/index access for dicts/lists/tuples x = x[k] return x no_input_opts = ( '=', '-None', '--None', '-nil', '--nil', '-noinput', '--noinput', '-no-input', '--no-input', '-none', '--none', '-null', '--null', '-null-input', '--null-input', '-nullinput', '--nullinput', '--n', ) json0_opts = ( '-c', '--c', '-compact', '--compact', '-j0', '--j0', '-json0', '--json0', '-json-0', '--json-0', ) traceback_opts = ( '-t', '--t', '-trace', '--trace', '-traceback', '--traceback', ) profile_opts = ('-p', '--p', '-prof', '--prof', '-profile', '--profile') dot_opts = ('-d', '--d', '-dot', '--dot', '-dots', '--dots') zoom_opts = ('-z', '--z', '-zj', '--zj', '-zoom', '--zoom') # no args or a leading help-option arg means show the help message and quit if len(argv) == 3: if argv[1] in ('-h', '--h', '-help', '--help') and argv[2] in zoom_opts: print(zj_info_msg.strip(), file=stderr) exit(0) args = argv[1:] load_input = True dot_input = False trace_exceptions = False profile_eval = False compact_output = False expression = None name = '' # handle all other leading options; the explicit help options are # handled earlier in the script while len(args) > 0: if args[0] in no_input_opts: load_input = False args = args[1:] elif args[0] in json0_opts: compact_output = True args = args[1:] elif args[0] in traceback_opts: trace_exceptions = True args = args[1:] elif args[0] in profile_opts: profile_eval = True args = args[1:] elif args[0] in dot_opts: dot_input = True args = args[1:] elif args[0] in zoom_opts: # if len(args) < 2: # print(zj_info_msg.strip(), file=stderr) # exit(0) try: z = load(stdin) z = zj_zoom(z, tuple(args[1:])) ind = None if compact_output else 2 seps = (',', ':') if compact_output else (',', ': ') dump(z, stdout, indent=ind, separators=seps, allow_nan=False) stdout.write('\n') except BrokenPipeError: # quit quietly, instead of showing a confusing error message stderr.close() except KeyboardInterrupt: exit(2) except Exception as e: if trace_exceptions: raise e s = str(e) s = s if s else '' print(f'\x1b[31m{s}\x1b[0m', file=stderr) exit(1) exit(0) else: break if len(args) > 2 and load_input: print('\x1b[31mmultiple inputs not allowed\x1b[0m', file=stderr) exit(1) if len(args) == 1: expression = args[0] args = args[1:] elif len(args) > 1: expression = args[0] name = args[1] args = args[1:] if expression is None: print(info.strip(), file=stderr) exit(0) glo = globals() for e in (physics, symbols, units): for _k, _v in e.__dict__.items(): if not _k in glo: glo[_k] = _v try: # load JSON into variable `v`, unless input was explicitly disabled v = None if load_input: try: if name == '' or name == '-': v = load(stdin) elif seems_url(name): from urllib.request import urlopen with urlopen(name) as inp: v = load(inp) else: with open(name, encoding='utf-8') as inp: v = load(inp) except Exception as e: raise Exception(f'JSON-input error: {e}') if dot_input: v = dotate(v) # offer several aliases for main variable `v`; the intuitive # `in` (short for `input`) is a keyword, so it's not available data = value = d = dat = val = v # transform data using the formula/expression given, handling # single dots as identity operations exec = disabled_exec open = disabled_open if not expression or expression == '.': expression = 'data' expression = compile_py(expression, expression, 'eval') if profile_eval: from cProfile import Profile # using a profiler in a `with` context adds many irrelevant # entries to its output prof = Profile() prof.enable() v = eval(expression) if result_needs_fixing(v): v = fix_result(v, data) prof.disable() prof.print_stats() else: v = eval(expression) if result_needs_fixing(v): v = fix_result(v, data) # emit result as JSON try: ind = None if compact_output else 2 seps = (',', ':') if compact_output else (',', ': ') dump(v, stdout, indent=ind, separators=seps, allow_nan=False) stdout.write('\n') except Exception as e: raise Exception(f'JSON-output error: {e}') except BrokenPipeError: # quit quietly, instead of showing a confusing error message stderr.close() except KeyboardInterrupt: exit(2) except Exception as e: if trace_exceptions: raise e s = str(e) s = s if s else '' print(f'\x1b[31m{s}\x1b[0m', file=stderr) exit(1)