#!/usr/bin/python3 # The MIT License (MIT) # # Copyright © 2024 pacman64 # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the “Software”), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from io import BufferedReader, BytesIO from sys import argv, exit, stderr, stdin, stdout info = ''' j0 [filepath/URI...] Json-0 converts/fixes JSON/pseudo-JSON input into minimal JSON output. Besides minimizing bytes, this tool also adapts almost-JSON input into valid JSON, since it ignores comments and trailing commas, neither of which are supported in JSON, but which are still commonly used. It also turns single-quoted strings into proper double-quoted ones, as well as change invalid 2-digit `\\x` hexadecimal escapes into JSON's 4-digit `\\u` hexadecimal escapes. When backslashes in strings are followed by an invalid escape letter, the backslash is ignored. Output is always a single line of valid JSON, ending with a line-feed. ''' # handle standard help cmd-line options, quitting right away in that case if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'): print(info.strip(), file=stderr) exit(0) # note: using regexes doesn't seem to speed-up number/string-handling def read(r, size: int) -> bytes: global pos, linenum chunk = r.read(size) if not chunk: return chunk if not (10 in chunk): pos += len(chunk) return chunk for b in chunk: if b == 10: pos = 1 linenum += 1 else: pos += 1 return chunk def skip_byte(r) -> None: global pos, linenum chunk = r.read(1) if not chunk: return if chunk[0] == 10: pos = 1 linenum += 1 else: pos += 1 def peek_byte(r) -> int: chunk = r.peek(64) if len(chunk) > 0: return chunk[0] return -1 def handle_array(w, r) -> None: seek_next = seek_next_token n = 0 skip_byte(r) w.write(b'[') while True: # whitespace/comments may precede the next item/comma seek_next(r) b = peek_byte(r) if b < 0: raise ValueError('unexpected end of input data, before "]"') comma = b == 44 # ord(',') if comma: skip_byte(r) # whitespace/comments may follow the comma seek_next(r) b = peek_byte(r) if b < 0: raise ValueError('unexpected end of input data, before "]"') if b == 93: # ord(']') skip_byte(r) w.write(b']') return if n > 0: if not comma: raise ValueError('missing a comma between array values') w.write(b',') b = peek_byte(r) if b > 0: handlers[b](w, r) n += 1 def handle_double_quoted_string(w, r) -> None: skip_byte(r) w.write(b'"') handle_inner_string(w, r, 34) # ord('"') w.write(b'"') def handle_dot(w, r) -> None: skip_byte(r) # precede the leading decimal dot with a 0 w.write(b'0.') # handle decimals, which in this case aren't optional, as a leading # dot is what led to this point if copy_digits(w, r) < 1: raise ValueError('expected numeric digits, but found none') def handle_false(w, r) -> None: demand(r, b'false') w.write(b'false') def handle_invalid(w, r) -> None: b = peek_byte(r) if b < 0: raise ValueError('unexpected end of input data') # raise ValueError(f'unexpected JSON byte-value {b}') if 32 < b <= 126: msg = f'unexpected symbol {chr(b)}' else: msg = f'unexpected byte-value {b}' raise ValueError(msg) def handle_negative(w, r) -> None: skip_byte(r) w.write(b'-') if peek_byte(r) == 46: # ord('.') skip_byte(r) w.write(b'0.') if copy_digits(w, r) < 1: raise ValueError('expected numeric digits, but found none') else: handle_number(w, r) def handle_null(w, r) -> None: demand(r, b'null') w.write(b'null') def handle_number(w, r) -> None: # handle integer part if copy_digits(w, r) < 1: raise ValueError('expected numeric digits, but found none') # handle optional decimals b = peek_byte(r) if b == 46: # ord('.') skip_byte(r) w.write(b'.') if copy_digits(w, r) < 1: # follow a trailing decimal dot with a 0 w.write(b'0') def handle_object(w, r) -> None: seek_next = seek_next_token num_pairs = 0 skip_byte(r) w.write(b'{') while True: # whitespace/comments may precede the next item/comma seek_next(r) b = peek_byte(r) if b < 0: raise ValueError('unexpected end of input data, before "}"') comma = b == 44 # ord(',') if comma: skip_byte(r) # whitespace/comments may follow the comma seek_next(r) b = peek_byte(r) if b < 0: raise ValueError('unexpected end of input data, before "}"') if b == 125: # ord('}') skip_byte(r) w.write(b'}') return if num_pairs > 0: if not comma: raise ValueError('missing a comma between key-value pairs') w.write(b',') demand_string(w, r) # whitespace/comments may follow the key seek_next(r) demand(r, b':') w.write(b':') # whitespace/comments may follow the colon seek_next(r) b = peek_byte(r) if b > 0: handlers[b](w, r) num_pairs += 1 def handle_positive(w, r) -> None: # do nothing with the leading plus sign: strictly-speaking, JSON numbers # can't start with a positive sign, and this tool's output is supposed # to be `JSON-0` (minimized) anyway skip_byte(r) if peek_byte(r) == 46: # ord('.') skip_byte(r) w.write(b'0.') if copy_digits(w, r) < 1: raise ValueError('expected numeric digits, but found none') else: handle_number(w, r) def handle_single_quoted_string(w, r) -> None: skip_byte(r) w.write(b'"') handle_inner_string(w, r, 39) # ord('\'') w.write(b'"') def demand_string(w, r) -> None: quote = peek_byte(r) if quote < 0: msg = 'unexpected end of input, instead of a string quote' raise ValueError(msg) if quote == 34: # ord('"') handle_double_quoted_string(w, r) return if quote == 39: # ord('\'') handle_single_quoted_string(w, r) return if 32 < quote <= 126: # ord(' '), ord('~') msg = f'expected ", or even \', but got {chr(quote)} instead' else: msg = f'expected ", or even \', but got byte {quote} instead' raise ValueError(msg) def handle_inner_string(w, r, quote: int) -> None: esc = False bad_hex_msg = 'invalid hexadecimal symbols' early_end_msg = 'input data ended while still in quoted string' def is_hex(x: int) -> bool: # 48 is ord('0'), 57 is ord('9'), 97 is ord('a'), 102 is ord('f') return 48 <= x <= 57 or 97 <= x <= 102 def lower(x: int) -> bool: # 65 is ord('A'), 90 is ord('Z') return x + 32 if 65 <= x <= 90 else x while True: chunk = r.peek(1) if len(chunk) < 1: raise ValueError(early_end_msg) b = chunk[0] if esc: esc = False if b == 120: # ord('x') skip_byte(r) chunk = read(r, 2) if len(chunk) != 2: raise ValueError(early_end_msg) a = lower(chunk[0]) b = lower(chunk[1]) w.write(b'\\u00') if not (is_hex(a) and is_hex(b)): raise ValueError(bad_hex_msg) w.write(a) w.write(b) continue if b == 117: # ord('u') skip_byte(r) chunk = read(r, 4) if len(chunk) != 4: raise ValueError(early_end_msg) a = lower(chunk[0]) b = lower(chunk[1]) c = lower(chunk[2]) d = lower(chunk[3]) if not (is_hex(a) and is_hex(b) and is_hex(c) and is_hex(d)): raise ValueError(bad_hex_msg) w.write(chunk) continue # these numbers stand for 't', 'n', 'r', 'v', 'u', '"', and '\\' if b in (116, 110, 114, 118, 117, 34, 92): w.write(b'\\') w.write(read(r, 1)) continue if b == 92: # ord('\\') esc = True skip_byte(r) continue if b == quote: skip_byte(r) return # emit normal string-byte w.write(read(r, 1)) def handle_true(w, r) -> None: demand(r, b'true') w.write(b'true') # setup byte-handling lookup tuple byte2handler = [handle_invalid for i in range(256)] byte2handler[ord('0')] = handle_number byte2handler[ord('1')] = handle_number byte2handler[ord('2')] = handle_number byte2handler[ord('3')] = handle_number byte2handler[ord('4')] = handle_number byte2handler[ord('5')] = handle_number byte2handler[ord('6')] = handle_number byte2handler[ord('7')] = handle_number byte2handler[ord('8')] = handle_number byte2handler[ord('9')] = handle_number byte2handler[ord('+')] = handle_positive byte2handler[ord('-')] = handle_negative byte2handler[ord('.')] = handle_dot byte2handler[ord('"')] = handle_double_quoted_string byte2handler[ord('\'')] = handle_single_quoted_string byte2handler[ord('f')] = handle_false byte2handler[ord('n')] = handle_null byte2handler[ord('t')] = handle_true byte2handler[ord('[')] = handle_array byte2handler[ord('{')] = handle_object # handlers is the immutable byte-driven func-dispatch table handlers = tuple(byte2handler) def copy_digits(w, r) -> int: 'Returns how many digits were copied/handled.' copied = 0 while True: chunk = r.peek(64) if len(chunk) == 0: return copied i = find_digits_end_index(chunk) if i >= 0: w.write(read(r, i)) copied += i return copied else: w.write(chunk) read(r, len(chunk)) copied += len(chunk) def seek_next_token(r) -> None: 'Skip an arbitrarily-long mix of whitespace and comments.' while True: chunk = r.peek(1024) if len(chunk) == 0: # input is over, and this func doesn't consider that an error return comment = False for i, b in enumerate(chunk): # skip space, tab, line-feed, carriage-return, or form-feed if b in (9, 10, 11, 13, 32): continue if b == 47: # ord('/') read(r, i) demand_comment(r) comment = True break # found start of next token read(r, i) return if not comment: read(r, len(chunk)) def skip_line(r) -> None: while True: chunk = r.peek(1024) if len(chunk) == 0: return i = chunk.find(b'\n') if i >= 0: read(r, i + 1) return read(r, len(chunk)) def skip_general_comment(r) -> None: while True: chunk = r.peek(1024) if len(chunk) == 0: raise ValueError(f'input data ended before an expected */') i = chunk.find(b'*') if i < 0: # no */ in this chunk, so skip it and try with the next one read(r, len(chunk)) continue # skip right past the * just found, then check if a / follows it read(r, i + 1) if peek_byte(r) == 47: # ord('/') # got */, the end of this comment skip_byte(r) return def find_digits_end_index(chunk: bytes) -> int: i = 0 for b in chunk: if 48 <= b <= 57: i += 1 else: return i # all bytes (if any) were digits, so no end was found return -1 def demand(r, what: bytes) -> None: lead = read(r, len(what)) if not lead.startswith(what): lead = str(lead, encoding='utf-8') what = str(what, encoding='utf-8') raise ValueError(f'expected {what}, but got {lead} instead') def demand_comment(r) -> None: demand(r, b'/') b = peek_byte(r) if b < 0: raise ValueError('unexpected end of input data') if b == 47: # ord('/') # handle single-line comment skip_line(r) return if b == 42: # ord('*') # handle (potentially) multi-line comment skip_general_comment(r) return raise ValueError('expected * or another /, after a /') def json0(w, src, end) -> None: r = BufferedReader(src) # skip leading UTF-8 BOM (byte-order mark) if r.peek(3) == b'\xef\xbb\xbf': read(r, 3) # skip leading whitespace/comments seek_next_token(r) # emit a single output line, ending with a line-feed b = peek_byte(r) if b >= 0: handlers[b](w, r) else: # treat empty(ish) input as invalid JSON raise ValueError('can\'t turn empty(ish) input into JSON') end(w) # check against trailing non-whitespace/non-comment bytes seek_next_token(r) if len(r.peek(1)) > 0: raise ValueError('unexpected trailing bytes in JSON data') def seems_url(s: str) -> bool: protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') return any(s.startswith(p) for p in protocols) def handle_json(w, r) -> None: json0(w, r, lambda w: w.write(b'\n')) def handle_json_lines(w, r) -> None: global pos, linenum items = 0 linenum = 0 w.write(b'[') while True: line = r.readline().lstrip() if not line: break pos = 1 linenum += 1 stripped = line.strip() if not stripped or stripped.startswith(b'//'): continue items += 1 if items > 1: w.write(b',') json0(w, BytesIO(line), lambda w: None) w.write(b']\n') start_args = 1 handle_input = handle_json if len(argv) > 1 and argv[1] in ('-jl', '--jl', '-jsonl', '--jsonl'): start_args = 2 handle_input = handle_json_lines if len(argv) - 1 > start_args: print(f'\x1b[31mmultiple inputs not allowed\x1b[0m', file=stderr) exit(1) w = stdout.buffer name = argv[start_args] if len(argv) > start_args else '-' # values keeping track of the input-position, shown in case of errors pos = 1 linenum = 1 try: if name == '-': handle_input(w, stdin.buffer) elif seems_url(name): from urllib.request import urlopen with urlopen(name) as inp: handle_input(w, inp) else: with open(name, mode='rb') as inp: handle_input(w, inp) except BrokenPipeError: # quit quietly, instead of showing a confusing error message stderr.close() except KeyboardInterrupt: exit(2) except Exception as e: stdout.flush() print(f'\x1b[31mline {linenum}, pos {pos} : {e}\x1b[0m', file=stderr) exit(1)