#!/usr/bin/python3 # The MIT License (MIT) # # Copyright © 2024 pacman64 # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the “Software”), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. info = ''' tl [options...] [python expression] [filepaths/URIs...] Transform Lines runs a Python expression on each line of plain-text data: each expression given emits its result as its own line. Each input line is available to the expression as either `line`, or `l`. Lines are always stripped of any trailing end-of-line bytes/sequences. When the expression results in non-string iterable values, a sort of input `amplification` happens for the current input-line, where each item from the result is emitted on its own output line. Dictionaries emit their data as a single JSON line. When a formula's result is the None value, it emits no output line, which filters-out the current line, the same way empty-iterable results do. When in `all` mode, all input lines are read first into a list of strings, whose items are all stripped of any end-of-line sequences, and kept in the `lines` global variable: the expression given is then run only once. Similarly, if the argument before the expression is a single equals sign (a `=`, but without the quotes), no data are read/loaded: the expression is then run only once, effectively acting as a `pure` plain-text generator. Current-input names, depending on mode: names mode evaluation l, line each-line (default) for each input line lines all-lines once, after last input line b, block, p, par block/paragraph for each paragraph of lines v, value jsonl for each input line (no name) no-input once, without an input value Modes `each-line` (the default) and `block/paragraph` also define `i` as an integer which starts as 0, and which is incremented after each evaluation. Options, where leading double-dashes are also allowed, except for alias `=`: -a read all lines at once into a string-list called `lines` -all same as -a -lines same as -a -b read uninterrupted blocks/groups of lines as paragraphs -blocks same as -b -g same as -b -groups same as -b -p same as -b -par same as -b -para same as -b -paragraphs same as -b -h show this help message -help same as -h -jsonl transform JSON Lines into proper JSON -nil don't read any input, and run the expression only once -no-input same as -nil -noinput same as -nil -none same as -nil -null same as -nil -null-input same as -nil -nullinput same as -nil = same as -nil -p show a performance/time-profile of the full `task` run -prof same as -p -profile same as -p -s read each input as a whole string -str same as -s -string same as -s -w same as -s -whole same as -s -t show a full traceback of this script for exceptions -trace same as -t -traceback same as -t Extra Functions blue(s) color strings blue, using surrounding ANSI-style sequences gray(s) color strings gray, using surrounding ANSI-style sequences green(s) color strings green, using surrounding ANSI-style sequences highlight(s) highlight strings, using surrounding ANSI-style sequences hilite(s) same as func highlight orange(s) color strings orange, using surrounding ANSI-style sequences purple(s) color strings purple, using surrounding ANSI-style sequences red(s) color strings red, using surrounding ANSI-style sequences realign(x, gap=2) pad items across lines, so that all "columns" align after(x, y) ignore items until the one given; for strings and sequences afterfinal(x, y) backward counterpart of func after afterlast(x, y) same as func afterfinal arrayish(x) check if value is a list, a tuple, or a generator basename(s) get the final/file part of a pathname before(x, y) ignore items since the one given; for strings and sequences beforefinal(x, y) backward counterpart of func before beforelast(x, y) same as func beforefinal chunk(x, size) split/resequence items into chunks of the length given chunked(x, size) same as func chunk compose(*args) make a func which chain-calls all funcs given composed(*args) same as func compose cond(*args) expression-friendly fully-evaluated if-else chain debase64(s) decode base64 strings, including data-URIs dedup(x) ignore later (re)occurrences of values in a sequence dejson(x, f=None) safe parse JSON from strings denan(x, y) turn a floating-point NaN values into the fallback given denil(*args) return the first non-null/none value among those given denone(*args) same as func denil denull(*args) same as func denil dirname(s) get the folder/directory/parent part of a pathname dive(x, f) transform value in depth-first-recursive fashion divebin(x, y, f) binary (2-input) version of recursive-transform func dive drop(x, *what) ignore keys or substrings; for strings, dicts, dict-lists dropped(x, *v) same as func drop each(x, f) generalization of built-in func map endict(x) turn non-dictionary values into dicts with string keys enfloat(x, f=nan) turn values into floats, offering a fallback on failure enint(x, f=None) turn values into ints, offering a fallback on failure enlist(x) turn non-list values into lists entuple(x) turn non-tuple values into tuples ext(s) return the file-extension part of a pathname, if available fields(s) split fields AWK-style from the string given filtered(x, f) same as func keep flat(*args) flatten everything into an unnested sequence fromto(x, y, ?f) sequence integers, end-value included group(x, ?by) group values into dicts of lists; optional transform func grouped(x, ?by) same as func group harden(f, v) make funcs which return values instead of exceptions hardened(f, v) same as func harden countif(x, f) count how many values make the func given true-like idiota(x, ?f) dict-counterpart of func iota ints(x, y, ?f) make sequences of increasing integers, which include the end iota(x, ?f) make an integer sequence from 1 up to the number given join(x, y) join values into a string; make a dict from keys and values json0(x) turn a value into its smallest JSON-string representation json2(x) turn a value into a 2-space-indented multi-line JSON string jsonl(x) turn a value into a sequence of single-line (JSONL) strings keep(x, pred) generalization of built-in func filter kept(x, pred) same as func keep links(x) auto-detect all hyperlink-like (HTTP/HTTPS) substrings mapped(x, f) same as func each number(x) try to parse as an int, on failure try to parse as a float numbers(x) auto-detect all numbers in the value given numstats(x) calculate various `single-pass` numeric stats once(x, y=None) avoid returning the same value more than once; stateful func pick(x, *what) keep only the keys given; works on dicts, or dict-sequences picked(x, *what) same a func pick plain(s) ignore ANSI-style sequences in strings quoted(s, q='"') surround a string with the (optional) quoting-symbol given recover(*args) recover from exceptions with a fallback value reject(x, pred) generalization of built-in func filter, with opposite logic since(x, y) ignore items before the one given; for strings and sequences sincefinal(x, y) backward counterpart of func since sincelast(x, y) same as func sincefinal split(x, y) split string by separator; split sequence into several ones squeeze(s) strip/trim a string, squishing inner runs of spaces stround(x, d=6) format numbers into decimal-number strings tally(x, ?by) count/tally values, using an optional transformation func tallied(x, ?by) same as func tally trap(x, f=None) try running a func, handing exceptions to a fallback func trycall(*args) same as func recover unique(x) same as func dedup uniqued(x) same as func dedup unjson(x, f=None) same as func dejson unquoted(s) ignore surrounding quotes, if present until(x, y) ignore items after the one given; for strings and sequences untilfinal(x, y) backward counterpart of func until untillast(x, y) same as func untilfinal wait(seconds, x) wait the given number of seconds, before returning a value wat(*args) What Are These (wat) shows help/doc messages for funcs Examples # numbers from 0 to 5, each on its own output line; no input is read/used tl = 'range(6)' # all powers up to the 4th, using each input line auto-parsed into a `float` tl = 'range(1, 6)' | tl '(float(l)**p for p in range(1, 4+1))' # separate input lines with an empty line between each; global var `empty` # can be used to avoid bothering with nested shell-quoting tl = 'range(6)' | tl '["", l] if i > 0 else l' # keep only the last 2 lines from the input tl = 'range(1, 6)' | tl -all 'lines[-2:]' # join input lines into tab-separated lines of up to 3 items each; global # var named `tab` can be used to avoid bothering with nested shell-quoting tl = 'range(1, 8)' | tl -all '("\\t".join(c) for c in chunk(lines, 3))' # ignore all lines before the first one with just a '5' in it tl = 'range(8)' | tl -all 'since(lines, "5")' # ignore errors/exceptions, in favor of the original lines/values tl = '("abc", "123")' | tl 'safe(lambda: 2 * float(line), line)' # ignore errors/exceptions, calling a fallback func with the exception tl = '("abc", "123")' | tl 'safe(lambda: 2 * float(line), lambda e: str(e))' # filtering lines out via None values head -c 1024 /dev/urandom | strings | tl 'l if len(l) < 20 else None' # boolean-valued results are concise ways to filter lines out head -c 1024 /dev/urandom | strings | tl 'len(l) < 20' # function/callable results are automatically called on the current line head -c 1024 /dev/urandom | strings | tl len ''' from sys import argv, exit, stderr, stdin, stdout if __name__ != '__main__': print('don\'t import this script, run it directly instead', file=stderr) exit(1) # no args or a leading help-option arg means show the help message and quit if len(argv) < 2 or argv[1] in ('-h', '--h', '-help', '--help'): from sys import exit, stderr print(info.strip(), file=stderr) exit(0) from io import StringIO, TextIOWrapper from typing import \ AbstractSet, Annotated, Any, AnyStr, \ AsyncContextManager, AsyncGenerator, AsyncIterable, AsyncIterator, \ Awaitable, BinaryIO, ByteString, Callable, cast, \ ClassVar, Collection, Container, \ ContextManager, Coroutine, Deque, Dict, Final, \ final, ForwardRef, FrozenSet, Generator, Generic, get_args, get_origin, \ get_type_hints, Hashable, IO, ItemsView, \ Iterable, Iterator, KeysView, List, Literal, Mapping, \ MappingView, Match, MutableMapping, MutableSequence, MutableSet, \ NamedTuple, NewType, no_type_check, no_type_check_decorator, \ NoReturn, Optional, overload, \ Protocol, Reversible, \ runtime_checkable, Sequence, Set, Sized, SupportsAbs, \ SupportsBytes, SupportsComplex, SupportsFloat, SupportsIndex, \ SupportsInt, SupportsRound, Text, TextIO, Tuple, Type, \ TypedDict, TypeVar, \ TYPE_CHECKING, Union, ValuesView try: from typing import \ assert_never, assert_type, clear_overloads, Concatenate, \ dataclass_transform, get_overloads, is_typeddict, LiteralString, \ Never, NotRequired, ParamSpec, ParamSpecArgs, ParamSpecKwargs, \ Required, reveal_type, Self, TypeAlias, TypeGuard, TypeVarTuple, \ Unpack from typing import \ AwaitableGenerator, override, TypeAliasType, type_check_only except Exception: pass def conforms(x: Any) -> bool: ''' Check if a value is JSON-compatible, which includes checking values recursively, in case of composite/nestable values. ''' if x is None or isinstance(x, (bool, int, str)): return True if isinstance(x, float): return not (isnan(x) or isinf(x)) if isinstance(x, (list, tuple)): return all(conforms(e) for e in x) if isinstance(x, dict): return all(conforms(k) and conforms(v) for k, v in x.items()) return False def seems_url(s: str) -> bool: protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') return any(s.startswith(p) for p in protocols) def disabled_exec(*args, **kwargs) -> None: _ = args _ = kwargs raise Exception('built-in func `exec` is disabled') def fix_value(x: Any, default: Any) -> Any: 'Adapt a value so it can be output.' # true shows the current line as the current output; presumably # this is the result of calling a `condition-like` expression if x is True: return default # null and false show no output for the current input line if x is False: return None if x is type: return type(default).__name__ # if expression results in a func, auto-call it with the original data if callable(x) and not isinstance(x, Iterable): c = required_arg_count(x) if c == 1: x = x(default) else: m = f'func auto-call only works with 1-arg funcs (func wanted {c})' raise Exception(m) if x is None or isinstance(x, (bool, int, float, str)): return x rec = fix_value if isinstance(x, dict): return { rec(k, default): rec(v, default) for k, v in x.items() if not (isinstance(k, Skip) or isinstance(v, Skip)) } if isinstance(x, Iterable): return tuple(rec(e, default) for e in x if not isinstance(e, Skip)) if isinstance(x, Dottable): return rec(x.__dict__, default) if isinstance(x, DotCallable): return rec(x.value, default) if isinstance(x, Exception): raise x return None if isinstance(x, Skip) else str(x) def show_value(w, x: Any) -> None: 'Helper func used by func show_result.' # null shows no output for the current input line if x is None or isinstance(x, Skip): return if isinstance(x, dict): dump(x, w, separators=(', ', ': '), allow_nan=False, indent=None) w.write('\n') w.flush() elif isinstance(x, (bytes, bytearray)): w.write(x) w.flush() elif isinstance(x, Iterable) and not isinstance(x, str): dump(x, w, separators=(', ', ': '), allow_nan=False, indent=None) w.write('\n') w.flush() elif isinstance(x, DotCallable): print(x.value, file=w, flush=True) else: print(x, file=w, flush=True) def show_result(w, x: Any) -> None: if isinstance(x, (dict, str)): show_value(w, x) elif isinstance(x, Iterable): for e in x: if isinstance(e, Exception): raise e show_value(w, e) else: show_value(w, x) def make_open_utf8(open: Callable) -> Callable: 'Restrict the file-open func to a read-only utf-8 file-open func.' def open_utf8_readonly(name: str): 'A UTF-8 read-only file-open func overriding the built-in open func.' return open(name, encoding='utf-8') return open_utf8_readonly open_utf8 = make_open_utf8(open) open = open_utf8 def loop_lines_inputs(r, inputs: List[str], doing: Callable) -> None: ''' Act on multiple named inputs line-by-line, via the func given; when not given any named inputs, the default reader given is used instead. ''' main_input: List[str] = [] got_main_input = False dashes = inputs.count('-') if any(seems_url(e) for e in inputs): from urllib.request import urlopen def _adapt_lines(src) -> Iterable[str]: for j, line in enumerate(src): if j == 0: line = line.lstrip('\xef\xbb\xbf') yield line.rstrip('\r\n').rstrip('\n') for path in inputs: if path == '-': if dashes == 1: doing(_adapt_lines(r)) continue if not got_main_input: main_input = r.read().splitlines() got_main_input = True doing(_adapt_lines(main_input)) continue if seems_url(path): with urlopen(path) as inp: with TextIOWrapper(inp, encoding='utf-8') as txt: doing(_adapt_lines(txt)) continue with open_utf8(path) as inp: doing(_adapt_lines(inp)) if len(inputs) == 0: doing(_adapt_lines(r)) def loop_whole_inputs(r, inputs: List[str], doing: Callable) -> None: ''' Act on multiple named inputs, read as whole strings, via the func given; when not given any named inputs, the default reader given is used instead. ''' main_input: List[str] = [] got_main_input = False dashes = inputs.count('-') if any(seems_url(e) for e in inputs): from urllib.request import urlopen for path in inputs: if path == '-': if dashes == 1: doing(r.read()) continue if not got_main_input: main_input = r.read() got_main_input = True doing(main_input) continue if seems_url(path): with urlopen(path) as inp: with TextIOWrapper(inp, encoding='utf-8') as txt: doing(txt.read()) continue with open_utf8(path) as inp: doing(inp.read()) if len(inputs) == 0: doing(r.read()) def main_whole_strings(out, r, expression, inputs) -> None: def _each_string(out, src, expression: Any) -> None: # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global s, t, text, v, value, w, whole, _ s = t = text = v = value = w = whole = src res = eval(expression) res = fix_value(res, src) show_result(out, res) _ = res loop_whole_inputs(r, inputs, lambda s: _each_string(out, s, expression)) def main_each_line(w, r, expression, inputs) -> None: def _each_line(w, src, expression: Any) -> None: # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global i, nr, previous, prev, line, l, line, _ previous = '' prev = previous for line in src: l = line res = eval(expression) res = fix_value(res, line) show_result(w, res) i += 1 nr += 1 previous = line prev = previous _ = res loop_lines_inputs(r, inputs, lambda r: _each_line(w, r, expression)) def main_each_block(w, r, expression, inputs) -> None: def _each_block(w, r, expression) -> None: # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global i, nr global previous, prev, lines, block, par, para, paragraph, _ for item in paragraphize(r): lines = block = par = para = paragraph = item res = eval(expression) if isinstance(res, Skip): previous = data prev = previous i += 1 nr += 1 continue res = fix_value(res, lines) show_result(w, res) i += 1 nr += 1 prev = previous = lines _ = res loop_lines_inputs(r, inputs, lambda r: _each_block(w, r, expression)) def main_all_lines(w, r, expression, inputs) -> None: # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global line, lines, data, values, d, l, v, dat, val def _all_lines(w, r, expression) -> None: # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global lines, line, l for e in r: line = l = e lines.append(line) lines = [] line = l = '' loop_lines_inputs(r, inputs, lambda r: _all_lines(w, r, expression)) data = values = d = v = dat = val = lines res = eval(expression) res = fix_value(res, lines) show_result(w, res) def main_all_bytes(w, r, expression, inputs) -> None: # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global data, values, d, v, dat, val data = values = d = v = dat = val = r.buffer.read() res = eval(expression) res = fix_value(res, data) show_result(w, res) def main_no_input(w, r, expression, inputs) -> None: res = eval(expression) fix = lambda x: fix_value(x, None) f = str if res is None or isinstance(res, bool) else fix res = f(res) show_result(w, res) def main_json_lines(w, r, expression, inputs) -> None: def _jsonl2json(w, src, expression: Any) -> None: # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail global i, nr global line, l, data, d, value, v, dat, val, prev, previous, _ previous = None prev = previous for line in src: if emptyish_re.match(line) or commented_re.match(line): continue l = line data = value = d = v = dat = val = loads(line) res = eval(expression) if isinstance(res, Skip): previous = data prev = previous i += 1 nr += 1 continue res = fix_value(res, data) if callable(res): res = res(data) if not conforms(res): res = conform(res) dump(res, w) _ = res w.write('\n') previous = data prev = previous i += 1 nr += 1 loop_lines_inputs(r, inputs, lambda r: _jsonl2json(w, r, expression)) # opts2modes simplifies option-handling in func main opts2modes = { '=': 'no-input', '-nil': 'no-input', '-no-input': 'no-input', '-noinput': 'no-input', '-none': 'no-input', '-None': 'no-input', '-null': 'no-input', '-null-input': 'no-input', '-nullinput': 'no-input', '--n': 'no-input', '--nil': 'no-input', '--no-input': 'no-input', '--noinput': 'no-input', '--none': 'no-input', '--None': 'no-input', '--null': 'no-input', '--null-input': 'no-input', '--nullinput': 'no-input', '-a': 'all-lines', '-all': 'all-lines', '-lines': 'all-lines', '--a': 'all-lines', '--all': 'all-lines', '--lines': 'all-lines', '-b': 'each-block', '-blocks': 'each-block', '-g': 'each-block', '-groups': 'each-block', '-p': 'each-block', '-par': 'each-block', '-para': 'each-block', '-paragraphs': 'each-block', '--b': 'each-block', '--blocks': 'each-block', '--g': 'each-block', '--groups': 'each-block', '--p': 'each-block', '--par': 'each-block', '--para': 'each-block', '--paragraphs': 'each-block', '-bytes': 'bytes', '--bytes': 'bytes', '-jl': 'json-lines', '-jsonl': 'json-lines', '-jsonlines': 'json-lines', '-json-lines': 'json-lines', '--jl': 'json-lines', '--jsonl': 'json-lines', '--jsonlines': 'json-lines', '--json-lines': 'json-lines', '-s': 'whole-strings', '-str': 'whole-strings', '-string': 'whole-strings', '--s': 'whole-strings', '--str': 'whole-strings', '--string': 'whole-strings', '-w': 'whole-strings', '-whole': 'whole-strings', '--w': 'whole-strings', '--whole': 'whole-strings', } def blue(s: Any) -> str: 'Blue-style a plain string via ANSI-style sequences.' return f'\x1b[38;5;26m{s}\x1b[0m' def blueback(s: Any) -> str: 'Blue-background-style a plain string via ANSI-style sequences.' return f'\x1b[48;5;26m\x1b[38;5;255m{s}\x1b[0m' bluebg = blueback def bold(s: Any) -> str: 'Bold-style a plain string via ANSI-style sequences.' return f'\x1b[1m{s}\x1b[0m' def gbm(s: str, good: Any = False, bad: Any = False, meh: Any = False) -> str: ''' Good, Bad, Meh ANSI-styles a plain string via ANSI-style sequences, according to 1..3 conditions given as boolean(ish) values: these are checked in order, so the first truish one wins. ''' if good: return green(s) if bad: return red(s) if meh: return gray(s) return s def gray(s: Any) -> str: 'Gray-style a plain string via ANSI-style sequences.' return f'\x1b[38;5;248m{s}\x1b[0m' def grayback(s: Any) -> str: 'Gray-background-style a plain string via ANSI-style sequences.' return f'\x1b[48;5;253m{s}\x1b[0m' graybg = grayback def green(s: Any) -> str: 'Green-style a plain string via ANSI-style sequences.' return f'\x1b[38;5;29m{s}\x1b[0m' def greenback(s: Any) -> str: 'Green-background-style a plain string via ANSI-style sequences.' return f'\x1b[48;5;29m\x1b[38;5;255m{s}\x1b[0m' greenbg = greenback def highlight(s: Any) -> str: 'Highlight/reverse-style a plain string via ANSI-style sequences.' return f'\x1b[7m{s}\x1b[0m' hilite = highlight def magenta(s: Any) -> str: 'Magenta-style a plain string via ANSI-style sequences.' return f'\x1b[38;5;165m{s}\x1b[0m' def magentaback(s: Any) -> str: 'Magenta-background-style a plain string via ANSI-style sequences.' return f'\x1b[48;5;165m\x1b[38;5;255m{s}\x1b[0m' magback = magentaback magbg = magentaback magentabg = magentaback def orange(s: Any) -> str: 'Orange-style a plain string via ANSI-style sequences.' return f'\x1b[38;5;166m{s}\x1b[0m' def orangeback(s: Any) -> str: 'Orange-background-style a plain string via ANSI-style sequences.' return f'\x1b[48;5;166m\x1b[38;5;255m{s}\x1b[0m' orangebg = orangeback orback = orangeback orbg = orangeback def purple(s: Any) -> str: 'Purple-style a plain string via ANSI-style sequences.' return f'\x1b[38;5;99m{s}\x1b[0m' def purpleback(s: Any) -> str: 'Purple-background-style a plain string via ANSI-style sequences.' return f'\x1b[48;5;99m\x1b[38;5;255m{s}\x1b[0m' purback = purpleback purbg = purpleback purplebg = purpleback def red(s: Any) -> str: 'Red-style a plain string via ANSI-style sequences.' return f'\x1b[38;5;1m{s}\x1b[0m' def redback(s: Any) -> str: 'Red-background-style a plain string via ANSI-style sequences.' return f'\x1b[48;5;1m\x1b[38;5;255m{s}\x1b[0m' redbg = redback def underline(s: Any) -> str: 'Underline-style a plain string via ANSI-style sequences.' return f'\x1b[4m{s}\x1b[0m' def realign(lines: List[str], gap: int = 2) -> Iterable: ''' Pad lines so that their items align across/vertically: extra padding is put between such `columns`, using 2 spaces by default. ''' widths: List[int] = [] for l in lines: items = awk_sep_re.split(l.strip()) while len(widths) < len(items): widths.append(0) for i, s in enumerate(items): widths[i] = max(widths[i], len(s)) sb = StringIO() gap = max(gap, 0) for l in lines: sb.truncate(0) sb.seek(0) padding = 0 items = awk_sep_re.split(l.strip()) for s, w in zip(items, widths): sb.write(padding * ' ') sb.write(s) padding = max(w - len(s), 0) + gap yield sb.getvalue() def stop_normal(x: Any, exit_code: int = 0) -> NoReturn: show_result(stdout, fix_value(x, None)) exit(exit_code) def stop_json(x: Any, exit_code: int = 0) -> NoReturn: dump(x, stdout) stdout.write('\n') stdout.flush() exit(exit_code) from base64 import \ standard_b64encode, standard_b64decode, \ standard_b64encode as base64bytes, standard_b64decode as debase64bytes from collections import \ ChainMap, Counter, defaultdict, deque, namedtuple, OrderedDict, \ UserDict, UserList, UserString from copy import copy, deepcopy from datetime import \ MAXYEAR, MINYEAR, date, datetime, time, timedelta, timezone, tzinfo try: from datetime import now, UTC except Exception: now = lambda: datetime(2000, 1, 1).now() from decimal import Decimal, getcontext from difflib import \ context_diff, diff_bytes, Differ, get_close_matches, HtmlDiff, \ IS_CHARACTER_JUNK, IS_LINE_JUNK, ndiff, restore, SequenceMatcher, \ unified_diff from fractions import Fraction import functools from functools import \ cache, cached_property, cmp_to_key, get_cache_token, lru_cache, \ namedtuple, partial, partialmethod, recursive_repr, reduce, \ singledispatch, singledispatchmethod, total_ordering, update_wrapper, \ wraps from glob import glob, iglob try: from graphlib import CycleError, TopologicalSorter except Exception: pass from hashlib import \ file_digest, md5, pbkdf2_hmac, scrypt, sha1, sha224, sha256, sha384, \ sha512 from inspect import getfullargspec, getsource import itertools from itertools import \ accumulate, chain, combinations, combinations_with_replacement, \ compress, count, cycle, dropwhile, filterfalse, groupby, islice, \ permutations, product, repeat, starmap, takewhile, tee, zip_longest try: from itertools import pairwise from itertools import batched except Exception: pass from json import dump, dumps, loads import math Math = math from math import \ acos, acosh, asin, asinh, atan, atan2, atanh, ceil, comb, \ copysign, cos, cosh, degrees, dist, e, erf, erfc, exp, expm1, \ fabs, factorial, floor, fmod, frexp, fsum, gamma, gcd, hypot, inf, \ isclose, isfinite, isinf, isnan, isqrt, lcm, ldexp, lgamma, log, \ log10, log1p, log2, modf, nan, nextafter, perm, pi, pow, prod, \ radians, remainder, sin, sinh, sqrt, tan, tanh, tau, trunc, ulp try: from math import cbrt, exp2 except Exception: pass power = pow import operator from pathlib import Path from pprint import \ isreadable, isrecursive, pformat, pp, pprint, PrettyPrinter, saferepr from random import \ betavariate, choice, choices, expovariate, gammavariate, gauss, \ getrandbits, getstate, lognormvariate, normalvariate, paretovariate, \ randbytes, randint, random, randrange, sample, seed, setstate, \ shuffle, triangular, uniform, vonmisesvariate, weibullvariate compile_py = compile # keep built-in func compile for later from re import compile as compile_uncached, Pattern, IGNORECASE import statistics from statistics import \ bisect_left, bisect_right, fmean, \ geometric_mean, harmonic_mean, mean, median, \ median_grouped, median_high, median_low, mode, multimode, pstdev, \ pvariance, quantiles, stdev, variance try: from statistics import \ correlation, covariance, linear_regression, mul except Exception: pass import string from string import \ Formatter, Template, ascii_letters, ascii_lowercase, ascii_uppercase, \ capwords, digits, hexdigits, octdigits, printable, punctuation, \ whitespace alphabet = ascii_letters letters = ascii_letters lowercase = ascii_lowercase uppercase = ascii_uppercase from textwrap import dedent, fill, indent, shorten, wrap from time import \ altzone, asctime, \ ctime, daylight, get_clock_info, \ gmtime, localtime, mktime, monotonic, monotonic_ns, perf_counter, \ perf_counter_ns, process_time, process_time_ns, \ sleep, strftime, strptime, struct_time, thread_time, thread_time_ns, \ time, time_ns, timezone, tzname try: from time import \ clock_getres, clock_gettime, clock_gettime_ns, clock_settime, \ clock_settime_ns, pthread_getcpuclockid, tzset except Exception: pass from unicodedata import \ bidirectional, category, combining, decimal, decomposition, digit, \ east_asian_width, is_normalized, lookup, mirrored, name, normalize, \ numeric from urllib.parse import \ parse_qs, parse_qsl, quote, quote_from_bytes, quote_plus, unquote, \ unquote_plus, unquote_to_bytes, unwrap, urldefrag, urlencode, urljoin, \ urlparse, urlsplit, urlunparse, urlunsplit class Skip: 'Custom type which some funcs type-check to skip values in containers.' def __init__(self, *args) -> None: pass # skip is a ready-to-use value which some funcs filter against: this way # filtering values becomes a special case of transforming values skip = Skip() # re_cache is used by custom func compile to cache previously-compiled # regular-expressions, which makes them quicker to (re)use in formulas re_cache: Dict[str, Pattern] = {} # ire_cache is like re_cache, except it's for case-insensitive regexes ire_cache: Dict[str, Pattern] = {} # ansi_style_re detects the most commonly-used ANSI-style sequences, and # is used in func plain ansi_style_re = compile_uncached('\x1b\\[([0-9;]+m|[0-9]*[A-HJKST])') # number_re detects numbers, and is used in func numbers number_re = compile_uncached('\\W-?[0-9]+(\\.[0-9]*)?\\W') # link_re detects web links, and is used in func links link_re_src = 'https?://[A-Za-z0-9+_.:%-]+(/[A-Za-z0-9+_.%/,#?&=-]*)*' link_re = compile_uncached(link_re_src) # paddable_tab_re detects single tabs and possible runs of spaces around # them, and is used in func squeeze paddable_tab_re = compile_uncached(' *\t *') # seen remembers values already given to func `once` seen = set() # commented_re detects strings/lines which start as unix-style comments commented_re = compile_uncached('^ *#') # emptyish_re detects empty/emptyish strings/lines, the latter being strings # with only spaces in them emptyish_re = compile_uncached('^ *\r?\n?$') # spaces_re detects runs of 2 or more spaces, and is used in func squeeze spaces_re = compile_uncached(' +') # awk_sep_re splits like AWK does by default, and is used in func fields awk_sep_re = compile_uncached(' *\t *| +') # some convenience aliases to commonly-used values false = False true = True nil = None nihil = None none = None null = None s = '' months = [ 'January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December', ] monweek = [ 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday', ] sunweek = [ 'Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', ] phy = { 'kilo': 1_000, 'mega': 1_000_000, 'giga': 1_000_000_000, 'tera': 1_000_000_000_000, 'peta': 1_000_000_000_000_000, 'exa': 1_000_000_000_000_000_000, 'zetta': 1_000_000_000_000_000_000_000, 'c': 299_792_458, 'kcd': 683, 'na': 602214076000000000000000, 'femto': 1e-15, 'pico': 1e-12, 'nano': 1e-9, 'micro': 1e-6, 'milli': 1e-3, 'e': 1.602176634e-19, 'f': 96_485.33212, 'h': 6.62607015e-34, 'k': 1.380649e-23, 'mu': 1.66053906892e-27, 'ge': 9.7803267715, 'gn': 9.80665, } physics = phy # using literal strings on the cmd-line is often tricky/annoying: some of # these aliases can help get around multiple levels of string-quoting; no # quotes are needed as the script will later make these values accessible # via the property/dot syntax sym = { 'amp': '&', 'ampersand': '&', 'ansiclear': '\x1b[0m', 'ansinormal': '\x1b[0m', 'ansireset': '\x1b[0m', 'apo': '\'', 'apos': '\'', 'ast': '*', 'asterisk': '*', 'at': '@', 'backquote': '`', 'backslash': '\\', 'backtick': '`', 'ball': '●', 'bang': '!', 'bigsigma': 'Σ', 'block': '█', 'bquo': '`', 'bquote': '`', 'bslash': '\\', 'btick': '`', 'bullet': '•', 'caret': '^', 'cdot': '·', 'circle': '●', 'colon': ':', 'comma': ',', 'cr': '\r', 'crlf': '\r\n', 'cross': '×', 'cs': ', ', 'dash': '—', 'dollar': '$', 'dot': '.', 'dquo': '"', 'dquote': '"', 'emark': '!', 'emdash': '—', 'empty': '', 'endash': '–', 'eq': '=', 'et': '&', 'euro': '€', 'ge': '≥', 'geq': '≥', 'gt': '>', 'hellip': '…', 'hole': '○', 'hyphen': '-', 'infinity': '∞', 'lcurly': '{', 'ldquo': '“', 'ldquote': '“', 'le': '≤', 'leq': '≤', 'lf': '\n', 'lt': '<', 'mdash': '—', 'mdot': '·', 'miniball': '•', 'minus': '-', 'ndash': '–', 'neq': '≠', 'perc': '%', 'percent': '%', 'period': '.', 'plus': '+', 'qmark': '?', 'que': '?', 'rcurly': '}', 'rdquo': '”', 'rdquote': '”', 'sball': '•', 'semi': ';', 'semicolon': ';', 'sharp': '#', 'slash': '/', 'space': ' ', 'square': '■', 'squo': '\'', 'squote': '\'', 'tab': '\t', 'tilde': '~', 'underscore': '_', 'uscore': '_', 'utf8bom': '\xef\xbb\xbf', 'utf16be': '\xfe\xff', 'utf16le': '\xff\xfe', } symbols = sym units = { 'cup2l': 0.23658824, 'floz2l': 0.0295735295625, 'floz2ml': 29.5735295625, 'ft2m': 0.3048, 'gal2l': 3.785411784, 'in2cm': 2.54, 'lb2kg': 0.45359237, 'mi2km': 1.609344, 'mpg2kpl': 0.425143707, 'nmi2km': 1.852, 'oz2g': 28.34952312, 'psi2pa': 6894.757293168, 'ton2kg': 907.18474, 'yd2m': 0.9144, 'mol': 602214076000000000000000, 'mole': 602214076000000000000000, 'hour': 3_600, 'day': 86_400, 'week': 604_800, 'hr': 3_600, 'wk': 604_800, 'kb': 1024, 'mb': 1024**2, 'gb': 1024**3, 'tb': 1024**4, 'pb': 1024**5, } # some convenience aliases to various funcs from the python stdlib geomean = geometric_mean harmean = harmonic_mean sd = stdev popsd = pstdev var = variance popvar = pvariance randbeta = betavariate randexp = expovariate randgamma = gammavariate randlognorm = lognormvariate randnorm = normalvariate randweibull = weibullvariate capitalize = str.capitalize casefold = str.casefold center = str.center # count = str.count decode = bytes.decode encode = str.encode endswith = str.endswith expandtabs = str.expandtabs find = str.find format = str.format index = str.index isalnum = str.isalnum isalpha = str.isalpha isascii = str.isascii isdecimal = str.isdecimal isdigit = str.isdigit isidentifier = str.isidentifier islower = str.islower isnumeric = str.isnumeric isprintable = str.isprintable isspace = str.isspace istitle = str.istitle isupper = str.isupper # join = str.join ljust = str.ljust lower = str.lower lowered = str.lower lstrip = str.lstrip maketrans = str.maketrans partition = str.partition removeprefix = str.removeprefix removesuffix = str.removesuffix replace = str.replace rfind = str.rfind rindex = str.rindex rjust = str.rjust rpartition = str.rpartition rsplit = str.rsplit rstrip = str.rstrip # split = str.split splitlines = str.splitlines startswith = str.startswith strip = str.strip swapcase = str.swapcase title = str.title translate = str.translate upper = str.upper uppered = str.upper zfill = str.zfill every = all rev = reversed reverse = reversed some = any length = len blowtabs = str.expandtabs hasprefix = str.startswith hassuffix = str.endswith ltrim = str.lstrip stripstart = str.lstrip trimspace = str.strip trimstart = str.lstrip rtrim = str.rstrip stripend = str.rstrip trimend = str.rstrip stripped = str.strip trim = str.strip trimmed = str.strip trimprefix = str.removeprefix trimsuffix = str.removesuffix def required_arg_count(f: Callable) -> int: if isinstance(f, type): return 1 meta = getfullargspec(f) n = len(meta.args) if meta.defaults: n -= len(meta.defaults) return n def identity(x: Any) -> Any: ''' Return the value given: this is the default transformer for several higher-order funcs, which effectively keeps original items as given. ''' return x idem = identity iden = identity def after(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'Skip parts of strings/sequences up to the substring/value given.' return (strafter if isinstance(x, str) else itemsafter)(x, what) def afterlast(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'Skip parts of strings/sequences up to the last substring/value given.' return (strafterlast if isinstance(x, str) else itemsafterlast)(x, what) afterfinal = afterlast def arrayish(x: Any) -> bool: 'Check if a value is array-like enough.' return isinstance(x, (list, tuple, range, Generator)) isarrayish = arrayish def base64(x): return base64bytes(str(x).encode()).decode() def basename(s: str) -> str: 'Get a filepath\'s last part, if present.' return Path(s).name def before(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'End strings/sequences right before a substring/value\'s appearance.' return (strbefore if isinstance(x, str) else itemsbefore)(x, what) def beforelast(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'End strings/sequences right before a substring/value\'s last appearance.' return (strbeforelast if isinstance(x, str) else itemsbeforelast)(x, what) beforefinal = beforelast def cases(x: Any, *args: Any) -> Any: ''' Simulate a switch statement on a value, using matches/result pairs from the arguments given; when given an even number of extra args, None is used as a final fallback result; when given an odd number of extra args, the last argument is used as a final `default` value, if needed. ''' for i in range(0, len(args) - len(args) % 2, 2): test, res = args[i], args[i+1] if isinstance(test, (list, tuple)) and x in test: return res if isinstance(test, float) and isnan(test) and isnan(x): return res if x == test: return res return None if len(args) % 2 == 0 else args[-1] switch = cases def chunk(items: Iterable, chunk_size: int) -> Iterable: 'Break iterable into chunks, each with up to the item-count given.' if isinstance(items, str): n = len(items) while n >= chunk_size: yield items[:chunk_size] items = items[chunk_size:] n -= chunk_size if n > 0: yield items return if not isinstance(chunk_size, int): raise Exception('non-integer chunk-size') if chunk_size < 1: raise Exception('non-positive chunk-size') it = iter(items) while True: head = tuple(islice(it, chunk_size)) if not head: return yield head chunked = chunk def commented(s: str) -> bool: 'Check if a string starts as a unix-style comment.' return commented_re.match(s) != None iscommented = commented def compile(s: str, case_sensitive: bool = True) -> Pattern: 'Cached regex `compiler`, so it\'s quicker to (re)use in formulas.' cache = re_cache if case_sensitive else ire_cache options = 0 if case_sensitive else IGNORECASE if s in cache: return cache[s] e = compile_uncached(s, options) cache[s] = e return e def compose(*what: Callable) -> Callable: def composite(x: Any) -> Any: for f in what: x = f(x) return x return composite composed = compose lcompose = compose lcomposed = compose def cond(*args: Any) -> Any: ''' Simulate a chain of if-else statements, using condition/result pairs from the arguments given; when given an even number of args, None is used as a final fallback result; when given an odd number of args, the last argument is used as a final `else` value, if needed. ''' for i in range(0, len(args) - len(args) % 2, 2): if args[i]: return args[i+1] return None if len(args) % 2 == 0 else args[-1] def conform(x: Any, denan: Any = None, deinf: Any = None, fn = str) -> Any: 'Make values JSON-compatible.' if isinstance(x, float): # turn NaNs and Infinities into the replacement values given if isnan(x): return denan if isinf(x): return deinf return x if isinstance(x, (bool, int, float, str)): return x if isinstance(x, dict): return { str(k): conform(v) for k, v in x.items() if not (isinstance(k, Skip) or isinstance(v, Skip)) } if isinstance(x, Iterable): return [conform(e) for e in x if not isinstance(e, Skip)] if isinstance(x, DotCallable): return x.value return fn(x) fix = conform def countif(src: Iterable, check: Callable) -> int: ''' Count how many values make the func given true-like. This func works with sequences, dictionaries, and strings. ''' if callable(src): src, check = check, src check = predicate(check) total = 0 if isinstance(src, dict): for v in src.values(): if check(v): total += 1 else: for v in src: if check(v): total += 1 return total # def debase64(x): # return debase64bytes(str(x).encode()).decode() def debase64(s: str) -> bytes: 'Convert away from base64 encoding, including data-URIs.' if s.startswith('data:'): i = s.find(',') if i >= 0: return standard_b64decode(s[i + 1:]) return standard_b64decode(s) unbase64 = debase64 def dedup(v: Iterable) -> Iterable: 'Ignore reappearing items from iterables, after their first occurrence.' got = set() for e in v: if not e in got: got.add(e) yield e dedupe = dedup deduped = dedup deduplicate = dedup deduplicated = dedup undup = dedup undupe = dedup unduped = dedup unduplicate = dedup unduplicated = dedup unique = dedup uniqued = dedup def defunc(x: Any) -> Any: 'Call if value is a func, or return it back as given.' return x() if callable(x) else x callmemaybe = defunc defunct = defunc unfunc = defunc unfunct = defunc def dejson(x: Any, catch: Union[Callable[[Exception], Any], Any] = None) -> Any: 'Safely parse JSON from strings.' try: return loads(x) if isinstance(x, str) else x except Exception as e: return catch(e) if callable(catch) else catch unjson = dejson def denan(x: Any, fallback: Any = None) -> Any: 'Replace floating-point NaN with the alternative value given.' return x if not (isinstance(x, float) and isnan(x)) else fallback def denil(*args: Any) -> Any: 'Avoid None values, if possible: first value which isn\'t None wins.' for e in args: if e != None: return e return None denone = denil denull = denil def dirname(s: str) -> str: 'Ignore the last part of a filepath.' return str(Path(s).parent) def dive(into: Any, doing: Callable) -> Any: 'Transform a nested value by calling a func via depth-first recursion.' # support args in either order if callable(into): into, doing = doing, into return _dive_kv(None, into, doing) deepmap = dive dive1 = dive def divebin(x: Any, y: Any, doing: Callable) -> Any: 'Nested 2-value version of depth-first-recursive func dive.' # support args in either order if callable(x): x, y, doing = y, doing, x narg = required_arg_count(doing) if narg == 2: return dive(x, lambda a: dive(y, lambda b: doing(a, b))) if narg == 4: return dive(x, lambda i, a: dive(y, lambda j, b: doing(i, a, j, b))) raise Exception('divebin(...) only supports funcs with 2 or 4 args') bindive = divebin # diveboth = divebin # dualdive = divebin # duodive = divebin dive2 = divebin def _dive_kv(key: Any, into: Any, doing: Callable) -> Any: if isinstance(into, dict): return {k: _dive_kv(k, v, doing) for k, v in into.items()} if isinstance(into, Iterable) and not isinstance(into, str): return [_dive_kv(i, e, doing) for i, e in enumerate(into)] narg = required_arg_count(doing) return doing(key, into) if narg == 2 else doing(into) class DotCallable: 'Enable convenient dot-syntax calling of 1-input funcs.' def __init__(self, value: Any): self.value = value def __getattr__(self, key: str) -> Any: return DotCallable(globals()[key](self.value)) class Dottable: 'Enable convenient dot-syntax access to dictionary values.' def __getattr__(self, key: Any) -> Any: return self.__dict__[key] if key in self.__dict__ else None def __getitem__(self, key: Any) -> Any: return self.__dict__[key] if key in self.__dict__ else None def __iter__(self) -> Iterable: return iter(self.__dict__) def dotate(x: Any) -> Union[Dottable, Any]: 'Recursively ensure all dictionaries in a value are dot-accessible.' if isinstance(x, dict): d = Dottable() d.__dict__ = {k: dotate(v) for k, v in x.items()} return d if isinstance(x, list): return [dotate(e) for e in x] if isinstance(x, tuple): return tuple(dotate(e) for e in x) return x dotated = dotate dote = dotate doted = dotate dotified = dotate dotify = dotate dottified = dotate dottify = dotate # make dictionaries `physics`, `symbols`, and `units` easier to use phy = dotate(phy) physics = phy sym = dotate(sym) symbols = sym units = dotate(units) def drop(src: Any, *what) -> Any: ''' Either ignore all substrings occurrences, or ignore all keys given from an object, or even from a sequence of objects. ''' if isinstance(src, str): return strdrop(src, *what) return _itemsdrop(src, set(what)) dropped = drop # ignore = drop # ignored = drop def _itemsdrop(src: Any, what: Set) -> Any: if isinstance(src, dict): kv = {} for k, v in src.items(): if not (k in what): kv[k] = v return kv if isinstance(src, Iterable): return [_itemsdrop(e, what) for e in src] return None def each(src: Iterable, f: Callable) -> Any: ''' A generalization of built-in func map, which can also handle dictionaries and strings. ''' if callable(src): src, f = f, src if isinstance(src, dict): return mapkv(src, lambda k, _: k, f) if isinstance(src, str): s = StringIO() f = loopify(f) for i, c in enumerate(src): v = f(i, c) if not isinstance(v, Skip): s.write(str(v)) return s.getvalue() return tuple(f(i, v) for i, v in enumerate(src)) mapped = each def emptyish(x: Any) -> bool: ''' Check if a value can be considered empty, which includes non-empty strings which only have spaces in them. ''' def check(x: Any) -> bool: if not x: return True if isinstance(x, str): return bool(emptyish_re.match(x)) return False if check(x): return True if isinstance(x, Iterable): return all(check(e) for e in x) return False isemptyish = emptyish def endict(x: Any) -> Dict[str, Any]: 'Turn non-dictionary values into dictionaries with string keys.' if isinstance(x, dict): return {str(k): v for k, v in x.items()} if arrayish(x): return {str(e): e for e in x} return {str(x): x} dicted = endict endicted = endict indict = endict todict = endict def enfloat(x: Any, fallback: float = nan) -> float: try: return float(x) except Exception: return fallback enfloated = enfloat floated = enfloat floatify = enfloat floatize = enfloat tofloat = enfloat def enint(x: Any, fallback: Any = None) -> Any: try: return int(x) except Exception: return fallback eninted = enint inted = enint integered = enint intify = enint intize = enint toint = enint def enlist(x: Any) -> List[Any]: 'Turn non-list values into lists.' return list(x) if arrayish(x) else [x] # inlist = enlist enlisted = enlist listify = enlist listize = enlist tolist = enlist def entuple(x: Any) -> Tuple[Any, ...]: 'Turn non-tuple values into tuples.' return tuple(x) if arrayish(x) else (x, ) entupled = entuple ntuple = entuple ntupled = entuple tuplify = entuple tuplize = entuple toentuple = entuple tontuple = entuple totuple = entuple def error(message: Any) -> Exception: return Exception(str(message)) err = error def ext(s: str) -> str: 'Get a filepath\'s extension, if present.' name = Path(s).name i = name.rfind('.') return name[i:] if i >= 0 else '' filext = ext def fail(message: Any, error_code: int = 255) -> NoReturn: stdout.flush() print(f'\x1b[31m{message}\x1b[0m', file=stderr) quit(error_code) abort = fail bail = fail def fields(s: str) -> Iterable[str]: 'Split fields AWK-style from the string given.' return awk_sep_re.split(s.strip()) # items = fields splitfields = fields splititems = fields words = fields def first(items: SupportsIndex, fallback: Any = None) -> Any: return items[0] if len(items) > 0 else fallback def flappend(*args: Any) -> List[Any]: 'Turn arbitrarily-nested values/sequences into a single flat sequence.' flat = [] def dig(x: Any) -> None: if arrayish(x): for e in x: dig(e) elif isinstance(x, dict): for e in x.values(): dig(e) else: flat.append(x) for e in args: dig(e) return flat def flat(*args: Any) -> Iterable: 'Turn arbitrarily-nested values/sequences into a single flat sequence.' def _flat_rec(x: Any) -> Iterable: if x is None: return if isinstance(x, dict): yield from _flat_rec(x.values()) if isinstance(x, str): yield x return if isinstance(x, Iterable): for e in x: yield from _flat_rec(e) return yield x for x in args: yield from _flat_rec(x) flatten = flat flattened = flat def fromto(start, stop, f: Callable = identity) -> Iterable: 'Sequence all integers between the numbers given, end-value included.' return (f(e) for e in range(start, stop + 1)) def fuzz(x: Union[int, float]) -> Union[float, Dict[str, float]]: ''' Deapproximate numbers to their max range before approximation: the result is a dictionary with the guessed lower-bound number, the number given, and the guessed upper-bound number which can approximate to the original number given. NaNs and the infinities are returned as given, instead of resulting in a dictionary. ''' if isnan(x) or isinf(x): return x if x == 0: return {'-0.5': -0.5, '0': 0.0, '0.5': +0.5} if x % 1 != 0: # return surrounding integers when given non-integers a = floor(x) b = ceil(x) return {str(a): a, str(x): x, str(b): b} if x % 10 != 0: a = x - 0.5 b = x + 0.5 return {str(a): a, str(x): x, str(b): b} # find the integer log10 of the absolute value; 0 was handled previously y = int(abs(x)) p10 = 1 while True: if y % p10 != 0: p10 /= 10 break p10 *= 10 delta = p10 / 2 s = +1 if x > 0 else -1 ux = abs(x) a = s * ux - delta b = s * ux + delta return {str(a): a, str(x): x, str(b): b} def generated(src: Any) -> Any: 'Make tuples out of generators, or return non-generator values as given.' return tuple(src) if isinstance(src, (Generator, range)) else src concrete = generated concreted = generated concretize = generated concretized = generated degen = generated degenerate = generated degenerated = generated degenerator = generated gen = generated generate = generated synth = generated synthed = generated synthesize = generated synthesized = generated def group(src: Iterable, by: Callable = identity) -> Dict: ''' Separate transformed items into arrays, the final result being a dict whose keys are all the transformed values, and whose values are lists of all the original values which did transform to their group's key. ''' if callable(src): src, by = by, src by = loopify(by) kv = src.items() if isinstance(src, dict) else enumerate(src) groups = {} for k, v in kv: dk = by(k, v) if isinstance(dk, Skip) or isinstance(v, Skip): continue if dk in groups: groups[dk].append(v) else: groups[dk] = [v] return groups grouped = group def gire(src: Iterable[str], using: Iterable[str], fallback: Any = '') -> Dict: ''' Group matched items into arrays, the final result being a dict whose keys are all the matchable regexes given, and whose values are lists of all the original values which did case-insensitively match their group's key as a regex. ''' using = tuple(using) return group(src, lambda x: imatch(x, using, fallback)) gbire = gire groupire = gire def gre(src: Iterable[str], using: Iterable[str], fallback: Any = '') -> Dict: ''' Group matched items into arrays, the final result being a dict whose keys are all the matchable regexes given, and whose values are lists of all the original values which did regex-match their group's key. ''' using = tuple(using) return group(src, lambda x: match(x, using, fallback)) gbre = gre groupre = gre def gsub(s: str, what: str, repl: str) -> str: 'Replace all regex-matches with the string given.' return compile(what).sub(repl, s) def harden(f: Callable, fallback: Any = None) -> Callable: def _hardened_caller(*args): try: return f(*args) except Exception: return fallback return _hardened_caller hardened = harden insure = harden insured = harden def horner(coeffs: List[float], x: Union[int, float]) -> float: if isinstance(coeffs, (int, float)): coeffs, x = x, coeffs if len(coeffs) == 0: return 0 y = coeffs[0] for c in islice(coeffs, 1, None): y *= x y += c return y polyval = horner def idiota(n: int, f: Callable = identity) -> Dict[int, int]: 'ID (keys) version of func iota.' return { v: v for v in (f(e) for e in range(1, n + 1))} dictiota = idiota kviota = idiota def imatch(what: str, using: Iterable[str], fallback: str = '') -> str: 'Try to case-insensitively match a string with any of the regexes given.' if not isinstance(what, str): what, using = using, what for s in using: expr = compile(s, False) m = expr.search(what) if m: # return what[m.start():m.end()] return s return fallback def indices(x: Any) -> Iterable[Any]: 'List all indices/keys, or get an exclusive range from an int.' if isinstance(x, int): return range(x) if isinstance(x, dict): return x.keys() if isinstance(x, (str, list, tuple)): return range(len(x)) return tuple() keys = indices def ints(start, stop, f: Callable = identity) -> Iterable[int]: 'Sequence integers, end-value included.' if isnan(start) or isnan(stop) or isinf(start) or isinf(stop): return tuple() return (f(e) for e in range(int(ceil(start)), int(stop) + 1)) integers = ints def iota(n: int, f: Callable = identity) -> Iterable[int]: 'Sequence all integers from 1 up to (and including) the int given.' return (f(e) for e in range(1, n + 1)) def itemsafter(x: Iterable, what: Any) -> Iterable: ok = False check = predicate(what) for e in x: if ok: yield e elif check(e): ok = True def itemsafterlast(x: Iterable, what: Any) -> Iterable: rest: List[Any] = [] check = predicate(what) for e in x: if check(e): rest.clear() else: rest.append(e) for e in islice(rest, 1, len(rest)): yield e def itemsbefore(x: Iterable, what: Any) -> Iterable: check = predicate(what) for e in x: if check(e): return yield e def itemsbeforelast(x: Iterable, what: Any) -> Iterable: items = [] for e in x: items.append(e) i = -1 check = predicate(what) for j, e in enumerate(reversed(items)): if check(e): i = j break if i < 0: return items if i == 0: return tuple() for e in islice(items, 0, i): yield e def itemssince(x: Iterable, what: Any) -> Iterable: ok = False check = predicate(what) for e in x: ok = ok or check(e) if ok: yield e def itemssincelast(x: Iterable, what: Any) -> Iterable: rest: List[Any] = [] check = predicate(what) for e in x: if check(e): rest.clear() else: rest.append(e) return rest def itemsuntil(x: Iterable, what: Any) -> Iterable: check = predicate(what) for e in x: yield e if check(e): return def itemsuntillast(x: Iterable, what: Any) -> Iterable: items = [] for e in x: items.append(e) i = -1 check = predicate(what) for j, e in enumerate(reversed(items)): if check(e): i = j break if i < 0: return items for e in islice(items, 0, i + 1): yield e itemsuntilfinal = itemsuntillast def join(items: Iterable, sep: Union[str, Iterable] = ' ') -> Union[str, Dict]: ''' Join iterables using the separator-string given: its 2 arguments can come in either order, and are sorted out if needed. When given 2 non-string iterables, the result is an object whose keys are from the first argument, and whose values are from the second one. You can use it any of the following ways, where `keys` and `values` are sequences (lists, tuples, or generators), and `separator` is a string: join(values) join(values, separator) join(separator, values) join(keys, values) ''' if arrayish(items) and arrayish(sep): return {k: v for k, v in zip(items, sep)} if isinstance(items, str): items, sep = sep, items return sep.join(str(e) for e in items) def joined_paragraphs(lines: Iterable[str]) -> Iterable[Sequence[str]]: ''' Regroup lines into individual paragraphs, each of which can span multiple lines: such paragraphs have no empty lines in them, and never end with a trailing line-feed. ''' par: List[str] = [] for l in lines: if (not l) and par: yield '\n'.join(par) par.clear() else: par.append(l) if len(par) > 0: yield '\n'.join(par) def json0(x: Any) -> str: 'Encode value into a minimal single-line JSON string.' return dumps(x, separators=(',', ':'), allow_nan=False, indent=None) j0 = json0 def json2(x: Any) -> str: ''' Encode value into a (possibly multiline) JSON string, using 2 spaces for each indentation level. ''' return dumps(x, separators=(',', ': '), allow_nan=False, indent=2) j2 = json2 def jsonl(x: Any) -> Iterable: 'Turn value into multiple JSON-encoded strings, known as JSON Lines.' if x is None: yield dumps(x, allow_nan=False) elif isinstance(x, (bool, int, float, dict, str)): yield dumps(x, allow_nan=False) elif isinstance(x, Iterable): for e in x: yield dumps(e, allow_nan=False) else: yield dumps(str(x), allow_nan=False) jsonlines = jsonl tojsonl = jsonl tojsonlines = jsonl def keep(src: Iterable, pred: Any) -> Iterable: ''' A generalization of built-in func filter, which can also handle dicts and strings. ''' if callable(src): src, pred = pred, src pred = predicate(pred) pred = loopify(pred) if isinstance(src, str): out = StringIO() for i, c in enumerate(src): if pred(i, c): out.write(c) return out.getvalue() if isinstance(src, dict): return { k: v for k, v in src.items() if pred(k, v) } return (e for i, e in enumerate(src) if pred(i, e)) filtered = keep kept = keep def last(items: SupportsIndex, fallback: Any = None) -> Any: return items[-1] if len(items) > 0 else fallback def links(src: Any) -> Iterable: 'Auto-detect all (HTTP/HTTPS) hyperlink-like substrings.' if isinstance(src, str): for match in link_re.finditer(src): # yield src[match.start():match.end()] yield match.group(0) elif isinstance(src, dict): for k, v in src.items(): yield from k yield from links(v) elif isinstance(src, Iterable): for v in src: yield from links(v) def loopify(x: Callable) -> Callable: nargs = required_arg_count(x) if nargs == 2: return x elif nargs == 1: return lambda _, v: x(v) else: raise Exception('only funcs with 1 or 2 args are supported') def mapkv(src: Iterable, key: Callable, value: Callable = identity) -> Dict: ''' A map-like func for dictionaries, which uses 2 mapping funcs, the first for the keys, the second for the values. ''' if key is None: key = lambda k, _: k if callable(src): src, key, value = value, src, key if required_arg_count(key) != 2: oldkey = key key = lambda k, _: oldkey(k) key = loopify(key) value = loopify(value) # if isinstance(src, dict): # return { key(k, v): value(k, v) for k, v in src.items() } # return { key(i, v): value(i, v) for i, v in enumerate(src) } def add(k, v, to): dk = key(k, v) dv = value(k, v) if isinstance(dk, Skip) or isinstance(dv, Skip): return to[dk] = dv res = {} kv = src.items() if isinstance(src, dict) else enumerate(src) for k, v in kv: add(k, v, res) return res def match(what: str, using: Iterable[str], fallback: str = '') -> str: 'Try to match a string with any of the regexes given.' if not isinstance(what, str): what, using = using, what for s in using: expr = compile(s) m = expr.search(what) if m: # return what[m.start():m.end()] return s return fallback def maybe(f: Callable, x: Any) -> Any: ''' Try calling a func on a value, using the same value as a fallback result, in case of exceptions. ''' if not callable(f): f, x = x, f try: return f(x) except Exception: return x def mappend(*args) -> Dict: kv = {} for src in args: if isinstance(src, dict): for k, v in src.items(): kv[k] = v else: raise Exception('mappend only works with dictionaries') return kv def message(x: Any, result: Any = skip) -> Any: print(x, file=stderr) return result msg = message def must(cond: Any, errmsg: str = 'condition given not always true') -> None: 'Enforce conditions, raising an exception on failure.' if not cond: raise Exception(errmsg) demand = must enforce = must def nowdict() -> dict: v = datetime(2000, 1, 1).now() return { 'year': v.year, 'month': v.month, 'day': v.day, 'hour': v.hour, 'minute': v.minute, 'second': v.second, 'text': v.strftime('%Y-%m-%d %H:%M:%S %b %a'), 'weekday': v.strftime('%A'), } def number(x: Any) -> Union[int, float, Any]: ''' Try to turn the value given into a number, using a fallback value instead of raising exceptions. ''' if isinstance(x, float): return x try: return int(x) except Exception: return float(x) def numbers(src: Any) -> Iterable: 'Auto-detect all number-like substrings.' if isinstance(src, str): for match in number_re.finditer(src): yield match.group(0) # yield src[match.start():match.end()] elif isinstance(src, dict): for k, v in src.items(): yield from k yield from links(v) elif isinstance(src, Iterable): for v in src: yield from links(v) def numsign(x: Union[int, float]) -> Union[int, float]: 'Get a number\'s sign, or NaN if the number given is a NaN.' if isinstance(x, int): if x > 0: return +1 if x < 0: return -1 return 0 if isnan(x): return x if x > 0: return +1.0 if x < 0: return -1.0 return 0.0 def numstats(src: Any) -> Dict[str, Union[float, int]]: 'Gather several single-pass numeric statistics.' n = mean_sq = ln_sum = 0 least = +inf most = -inf total = mean = 0 prod = 1 nans = ints = pos = zero = neg = 0 def update_numstats(x: Any) -> None: nonlocal nans, n, ints, pos, neg, zero, least, most, total, prod nonlocal ln_sum, mean, mean_sq if not isinstance(x, (float, int)): return if isnan(x): nans += 1 return n += 1 ints += int(isinstance(x, int) or x == floor(x)) if x > 0: pos += 1 elif x < 0: neg += 1 else: zero += 1 least = min(least, x) most = max(most, x) # total += x prod *= x ln_sum += log(x) d1 = x - mean mean += d1 / n d2 = x - mean mean_sq += d1 * d2 def _numstats_rec(src: Any) -> None: if isinstance(src, dict): for e in src.values(): _numstats_rec(e) elif isinstance(src, Iterable) and not isinstance(src, str): for e in src: _numstats_rec(e) else: update_numstats(src) _numstats_rec(src) sd = nan geomean = nan if n > 0: sd = sqrt(mean_sq / n) geomean = exp(ln_sum / n) if not isinf(ln_sum) else nan total = n * mean return { 'n': n, 'nan': nans, 'min': least, 'max': most, 'sum': total, 'mean': mean, 'geomean': geomean, 'sd': sd, 'product': prod, 'integer': ints, 'positive': pos, 'zero': zero, 'negative': neg, } def once(x: Any, replacement: Any = None) -> Any: ''' Replace the first argument given after the first time this func has been given it: this is a deliberately stateful function, given its purpose. ''' if not (x in seen): seen.add(x) return x else: return replacement onced = once def pad(s: str, n: int, pad: str = ' ') -> str: l = len(s) return s if l >= n else s + int((n - l) / len(pad)) * pad def padcenter(s: str, n: int, pad: str = ' ') -> str: return s.center(n, pad) centerpad = padcenter centerpadded = padcenter cjust = padcenter cpad = padcenter padc = padcenter paddedcenter = padcenter def padend(s: str, n: int, pad: str = ' ') -> str: return s.rjust(n, pad) padr = padend padright = padend paddedend = padend paddedright = padend rpad = padend rightpad = padend rightpadded = padend def padstart(s: str, n: int, pad: str = ' ') -> str: return s.ljust(n, pad) lpad = padstart leftpad = padstart leftpadded = padstart padl = padstart padleft = padstart paddedleft = padstart paddedstart = padstart def panic(x: Any) -> None: raise Exception(x) def paragraphize(lines: Iterable[str]) -> Iterable[Sequence[str]]: ''' Regroup lines into individual paragraphs, each of which is a list of single-line strings, none of which never end with a trailing line-feed. ''' par: List[str] = [] for l in lines: if (not l) and par: yield par par.clear() else: par.append(l) if len(par) > 0: yield par paragraphed = paragraphize paragraphs = paragraphize paragroup = paragraphize pargroup = paragraphize def parse(s: str, fallback: Any = None) -> Any: 'Try to parse JSON, ignoring exceptions in favor of a fallback value.' try: return loads(s) except Exception: return fallback fromjson = parse parsed = parse loaded = parse unjson = parse def pick(src: Any, *what) -> Any: 'Pick only the keys given from an object, or even a sequence of objects.' if isinstance(src, dict): kv = {} for k in what: kv[k] = src[k] return kv if isinstance(src, Iterable): return [pick(e, *what) for e in src] return None picked = pick def plain(s: str) -> str: 'Ignore all ANSI-style sequences in a string.' return ansi_style_re.sub('', s) def predicate(x: Any) -> Callable: 'Helps various higher-order funcs, by standardizing `predicate` values.' if callable(x): return x if isinstance(x, float): if isnan(x): return lambda y: isinstance(y, float) and isnan(y) if isinf(x): return lambda y: isinstance(y, float) and isinf(y) return lambda y: x == y pred = predicate def quoted(s: str, quote: str = '"') -> str: 'Surround a string with quotes.' return f'{quote}{s}{quote}' def recover(*args) -> Any: ''' Catch exceptions using a lambda/callback func, in one of 6 ways recover(zero_args_func) recover(zero_args_func, exception_replacement_value) recover(zero_args_func, one_arg_exception_handling_func) recover(one_arg_func, arg) recover(one_arg_func, arg, exception_replacement_value) recover(one_arg_func, arg, one_arg_exception_handling_func) ''' if len(args) == 1: f = args[0] try: return f() except Exception: return None elif len(args) == 2: f, fallback = args[0], args[1] if callable(f) and callable(fallback): try: return f() except Exception as e: nargs = required_arg_count(fallback) return fallback(e) if nargs == 1 else fallback() else: try: return f() if required_arg_count(f) == 0 else f(args[1]) except Exception: return fallback elif len(args) == 3: f, x, fallback = args[0], args[1], args[2] if callable(f) and callable(fallback): try: return f(x) except Exception as e: nargs = required_arg_count(fallback) return fallback(e) if nargs == 1 else fallback() else: try: return f(x) except Exception: return fallback else: raise Exception('recover(...) only works with 1, 2, or 3 args') attempt = recover attempted = recover recovered = recover recoverred = recover rescue = recover rescued = recover trycall = recover def reject(src: Iterable, pred: Any) -> Iterable: ''' A generalization of built-in func filter, which uses predicate funcs the opposite way, and which can also handle dicts and strings. ''' if callable(src): src, pred = pred, src pred = predicate(pred) pred = loopify(pred) if isinstance(src, str): out = StringIO() for i, c in enumerate(src): if not pred(i, c): out.write(c) return out.getvalue() if isinstance(src, dict): return { k: v for k, v in src.items() if not pred(k, v) } return (e for i, e in enumerate(src) if not pred(i, e)) avoid = reject avoided = reject keepout = reject keptout = reject rejected = reject def retype(x: Any) -> Any: 'Try to narrow the type of the value given.' if isinstance(x, float): return int(x) if floor(x) == x else x if not isinstance(x, str): return x try: return loads(x) except Exception: pass try: return int(x) except Exception: pass try: return float(x) except Exception: pass return x autocast = retype mold = retype molded = retype narrow = retype narrowed = retype recast = retype recasted = retype remold = retype remolded = retype retyped = retype def revcompose(*what: Callable) -> Callable: def composite(x: Any) -> Any: for f in reversed(what): x = f(x) return x return composite rcompose = revcompose rcomposed = revcompose revcomposed = revcompose def revsort(iterable: Iterable, key: Optional[Callable] = None) -> Iterable: return sorted(iterable, key=key, reverse=True) revsorted = revsort # def revsortkv(src: Dict, key: Callable = None) -> Dict: # if not key: # key = lambda kv: (kv[1], kv[0]) # return sortkv(src, key, reverse=True) def revsortkv(src: Dict, key: Callable = None) -> Dict: if key is None: key = lambda x: x[1] return sortkv(src, key, reverse=True) revsortedkv = revsortkv def rstripdecs(s: str) -> str: ''' Ignore trailing zero decimals on number-like strings; even ignore the decimal dot if trailing. ''' try: f = float(s) if isnan(f) or isinf(f): return s dot = s.find('.') if dot < 0: return s s = s.rstrip('0') return s[:-1] if s.endswith('.') else s except Exception: return s chopdecs = rstripdecs def scale(x: float, x0: float, x1: float, y0: float, y1: float) -> float: 'Transform a value from a linear domain into another linear one.' return (y1 - y0) * (x - x0) / (x1 - x0) + y0 rescale = scale rescaled = scale scaled = scale def shortened(s: str, maxlen: int, trailer: str = '') -> str: 'Limit strings to the symbol-count given, including an optional trailer.' maxlen = max(maxlen, 0) return s if len(s) <= maxlen else s[:maxlen - len(trailer)] + trailer def shuffled(x: Any) -> Any: 'Return a shuffled copy of the list given.' y = copy(x) shuffle(y) return y def split(src: Union[str, Sequence], n: Union[str, int]) -> Iterable: 'Split/break a string/sequence into several chunks/parts.' if isinstance(src, str) and isinstance(n, str): return src.split(n) if not (isinstance(src, (str, Sequence)) and isinstance(n, int)): raise Exception('unsupported type-pair of arguments') if n < 1: return [] l = len(src) if l <= n: return src.split('') if isinstance(src, str) else src chunks = [] csize = int(ceil(l / n)) while len(src) > 0: chunks.append(src[:csize]) src = src[csize:] return chunks broken = split splitted = split splitten = split def strdrop(x: str, *what: str) -> str: 'Ignore all occurrences of all substrings given.' for s in what: x = x.replace(s, '') return x strignore = strdrop def stringify(x: Any) -> str: 'Fancy alias for func dumps, named after JavaScript\'s func.' return dumps(x, separators=(', ', ': '), allow_nan=False, indent=None) jsonate = stringify jsonify = stringify tojson = stringify def strafter(x: str, what: str) -> str: i = x.find(what) return '' if i < 0 else x[i+len(what):] def strafterlast(x: str, what: str) -> str: i = x.rfind(what) return '' if i < 0 else x[i+len(what):] def strbefore(x: str, what: str) -> str: i = x.find(what) return x if i < 0 else x[:i] def strbeforelast(x: str, what: str) -> str: i = x.rfind(what) return x if i < 0 else x[:i] def strsince(x: str, what: str) -> str: i = x.find(what) return '' if i < 0 else x[i:] def strsincelast(x: str, what: str) -> str: i = x.rfind(what) return '' if i < 0 else x[i:] def struntil(x: str, what: str) -> str: i = x.find(what) return x if i < 0 else x[:i+len(what)] def struntillast(x: str, what: str) -> str: i = x.rfind(what) return x if i < 0 else x[:i+len(what)] struntilfinal = struntillast def since(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'Start strings/sequences with a substring/value\'s appearance.' return (strsince if isinstance(x, str) else itemssince)(x, what) def sincelast(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'Start strings/sequences with a substring/value\'s last appearance.' return (strsincelast if isinstance(x, str) else itemssincelast)(x, what) sincefinal = sincelast def sortk(x: Dict, key: Callable = identity, reverse: bool = False) -> Dict: keys = sorted(x.keys(), key=key, reverse=reverse) return {k: x[k] for k in keys} sortkeys = sortk sortedkeys = sortk def sortkv(src: Dict, key: Callable = None, reverse: bool = False) -> Dict: if key is None: key = lambda x: x[1] kv = sorted(src.items(), key=key, reverse=reverse) return {k: v for (k, v) in kv} sortedkv = sortkv def squeeze(s: str) -> str: ''' A more aggressive way to rid strings of extra spaces which, unlike string method strip, also turns inner runs of multiple spaces into single ones. ''' s = s.strip() s = spaces_re.sub(' ', s) s = paddable_tab_re.sub('\t', s) return s squeezed = squeeze def stround(x: Union[int, float], decimals: int = 6) -> str: 'Format numbers into a string with the given decimal-digit count.' if decimals >= 0: return f'{x:.{decimals}f}' else: return f'{round(x, decimals):.0f}' def tally(src: Iterable, by: Callable = identity) -> Dict[Any, int]: ''' Count all distinct (transformed) values, the result being a dictionary whose keys are all the transformed values, and whose items are positive integers. ''' if callable(src): src, by = by, src tally: Dict[Any, int] = {} by = loopify(by) if isinstance(src, dict): for k, v in src.items(): dk = by(k, v) if dk in tally: tally[dk] += 1 else: tally[dk] = 1 else: for i, v in enumerate(src): dk = by(i, v) if dk in tally: tally[dk] += 1 else: tally[dk] = 1 return tally tallied = tally def transpose(src: Any) -> Any: 'Turn lists/objects inside-out like socks, so to speak.' if isinstance(src, dict): return { v: k for k, v in src.items() } if not arrayish(src): msg = 'transpose only supports objects or iterables of objects' raise ValueError(msg) kv: Dict[Any, Any] = {} seq: List[Any] = [] for e in src: if isinstance(e, dict): for k, v in e.items(): if k in kv: kv[k].append(v) else: kv[k] = [v] elif isinstance(e, Iterable): for i, v in enumerate(e): if i < len(seq): seq[i].append(v) else: seq.append([v]) else: msg = 'transpose(...): not all items are iterables/objects' raise ValueError(msg) if len(kv) > 0 and len(seq) > 0: msg = 'transpose(...): mix of iterables and objects not supported' raise ValueError(msg) return kv if len(seq) == 0 else seq tr = transpose transp = transpose transposed = transpose def trap(x: Callable, y: Union[Callable[[Exception], Any], Any] = None) -> Any: 'Try running a func, handing exceptions over to a fallback func.' try: return x() if callable(x) else x except Exception as e: if callable(y): nargs = required_arg_count(y) return y(e) if nargs == 1 else y() else: return y catch = trap catched = trap caught = trap noerr = trap noerror = trap noerrors = trap safe = trap save = trap saved = trap trapped = trap def tsv(x: str, fn: Union[Callable, None] = None) -> Any: if fn is None: return x.split('\t') if callable(x): x, fn = fn, x return fn(x.split('\t')) def typename(x: Any) -> str: if x is None: return 'null' if isinstance(x, bool): return 'boolean' if isinstance(x, str): return 'string' if isinstance(x, (int, float)): return 'number' if isinstance(x, (list, tuple)): return 'array' if isinstance(x, dict): return 'object' return type(x).__name__ def typeof(x: Any) -> str: 'Get a value\'s JS-like typeof type-string.' if callable(x): return 'function' return { bool: 'boolean', int: 'number', float: 'number', str: 'string', }.get(type(x), 'object') def unixify(s: str) -> str: ''' Make plain-text `unix-style`, ignoring a leading UTF-8 BOM if present, and turning any/all CRLF byte-pairs into line-feed bytes. ''' s = s.lstrip('\xef\xbb\xbf') return s.replace('\r\n', '\n') if '\r\n' in s else s def unquoted(s: str) -> str: 'Ignore surrounding quotes in a string.' if s.startswith('"') and s.endswith('"'): return s[1:-1] if s.startswith('\'') and s.endswith('\''): return s[1:-1] if s.startswith('`') and s.endswith('`'): return s[1:-1] if s.startswith('”') and s.endswith('“'): return s[1:-1] if s.startswith('“') and s.endswith('”'): return s[1:-1] return s dequote = unquoted dequoted = unquoted def until(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'End strings/sequences with a substring/value\'s appearance.' return (struntil if isinstance(x, str) else itemsuntil)(x, what) def untillast(x: Union[str, Iterable], what: Any) -> Union[str, Iterable]: 'End strings/sequences with a substring/value\'s last appearance.' return (struntillast if isinstance(x, str) else itemsuntillast)(x, what) untilfinal = untillast def wait(seconds: Union[int, float], result: Any) -> Any: 'Wait the given number of seconds, before returning its latter arg.' t = (int, float) if (not isinstance(seconds, t)) and isinstance(result, t): seconds, result = result, seconds sleep(seconds) return result delay = wait def wat(*args) -> None: 'What Are These (wat) shows help/doc messages for funcs given to it.' from pydoc import doc c = 0 w = stderr for e in args: if not callable(e): continue if c > 0: print(file=w) print(f'\x1b[48;5;253m\x1b[38;5;26m{e.__name__:80}\x1b[0m', file=w) doc(e, output=w) c += 1 return Skip() def wit(*args) -> None: 'What Is This (wit) shows help/doc messages for funcs given to it.' return wat(*args) def zoom(x: Any, *keys_indices) -> Any: for k in keys_indices: # allow int-indexing dicts the same way lists/tuples can be if isinstance(x, dict) and isinstance(k, int): l = len(x) if i < 0: i += l if i < 0 or i >= len(x): x = None continue for i, e in enumerate(x.values()): if i == k: x = e break continue # regular key/index access for dicts/lists/tuples x = x[k] return x # args is the `proper` list of arguments given to the script args = argv[1:] run_mode = '' trace_exceptions = False profile_run = False if len(args) == 0: # show help message when given no arguments print(info.strip(), file=stderr) exit(0) trace_opts = ( '-t', '--t', '-trace', '--trace', '-traceback', '--traceback', ) profile_opts = ('-p', '--p', '-prof', '--prof', '-profile', '--profile') # handle all other leading options; the explicit help options are # handled earlier in the script while len(args) > 0: if args[0] in trace_opts: trace_exceptions = True args = args[1:] continue if args[0] in profile_opts: profile_run = True args = args[1:] continue s = opts2modes.get(args[0], '') if not s: break run_mode = s args = args[1:] inputs = [] expression = '' if len(args) > 0: expression = args[0] inputs = args[1:] if not run_mode: run_mode = 'each-line' if not expression and not (run_mode in ('json-lines', 'each-line')): # show help message when given no expression print(info.strip(), file=stderr) exit(0) glo = globals() for e in (physics, symbols, units): for k, v in e.__dict__.items(): if not k in glo: glo[k] = v exec = disabled_exec try: # compile the expression to speed it up, since they're all (re)run # for each line from standard input; also, handle a single-dot as # an identity expression, using the current line as is if expression in ('', '.'): expression = { 'all-lines': 'lines', 'all-bytes': 'data', 'each-block': 'block', 'each-line': 'line', 'json-lines': 'data', 'no-input': 'info.strip()', 'whole-strings': 'value', }[run_mode] expression = compile_py(expression, expression, 'eval') # `comprehension` expressions seem to ignore local variables: even # lambda-based workarounds fail i = 0 c = 1 nr = 1 _ = None fn = { 'each-line': stop_normal, 'each-block': stop_normal, 'all-lines': stop_normal, 'all-bytes': stop_normal, 'json-lines': stop_json, 'no-input': stop_normal, 'whole-strings': stop_normal, }[run_mode] glo['halt'] = fn glo['stop'] = fn fn = { 'each-line': main_each_line, 'each-block': main_each_block, 'all-lines': main_all_lines, 'all-bytes': main_all_bytes, 'json-lines': main_json_lines, 'no-input': main_no_input, 'whole-strings': main_whole_strings, }[run_mode] if fn is None: raise Exception(f'internal error: invalid run-mode {run_mode}') if profile_run: from cProfile import Profile # using a profiler in a `with` context adds many irrelevant # entries to its output prof = Profile() prof.enable() fn(stdout, stdin, expression, inputs) prof.disable() prof.print_stats() else: fn(stdout, stdin, expression, inputs) except BrokenPipeError: # quit quietly, instead of showing a confusing error message stderr.close() except KeyboardInterrupt: exit(2) except Exception as e: if trace_exceptions: raise e s = str(e) s = s if s else '' print(f'\x1b[31m{s}\x1b[0m', file=stderr) exit(1)