File: coby.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2024 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 from multiprocessing import Pool
  27 from sys import argv, exit, stderr, stdin, stdout
  28 from typing import Dict
  29 
  30 
  31 info = '''
  32 coby [filepaths/URIs...]
  33 
  34 COunt BYtes finds various byte-related stats for the files/URIs given. When
  35 given no named inputs, it uses standard input by default.
  36 '''
  37 
  38 # a leading help-option arg means show the help message and quit
  39 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'):
  40     print(info.strip())
  41     exit(0)
  42 
  43 
  44 def fail(msg, code: int = 1) -> None:
  45     'Show the error message given, and quit the app right away.'
  46     print(f'\x1b[31m{msg}\x1b[0m', file=stderr)
  47     exit(code)
  48 
  49 
  50 def count_bytes(src) -> Dict[str, int]:
  51     runes = 0
  52     crlf = 0
  53     trails = 0
  54     tally = [0] * 256
  55 
  56     # counting lines with trailing spaces needs remembering the previous 2
  57     # bytes, as the last trailing space in a line can come either before a
  58     # single line-feed byte, or a CRLF byte-pair
  59     prev2 = 0
  60     prev1 = 0
  61 
  62     for chunk in src:
  63         for b in chunk:
  64             tally[b] += 1
  65             lf = b == 10 # 10 is ord('\n')
  66             runes += (b & 0xc0) != 0x80
  67             crlf += lf and prev1 == 13 # 13 is ord('\r')
  68             trails += (lf and prev1 == 32) or (prev2 == 32 and prev1 == 13)
  69             # notice how the last 2 bytes are remembered even across chunks
  70             prev2 = prev1
  71             prev1 = b
  72 
  73     n = sum(tally)
  74     lines = tally[ord('\n')]
  75     if lines == 0 and n > 0:
  76         lines += 1
  77     trails += (prev1 == 32) or (prev2 == 32 and prev1 == 13)
  78 
  79     return {
  80         'n': sum(tally),
  81         'runes': runes,
  82         'lines': lines,
  83         'lf': tally[ord('\n')],
  84         'crlf': crlf,
  85         'tabs': tally[ord('\t')],
  86         'spaces': tally[ord(' ')],
  87         'trails': trails,
  88         'nulls': tally[0],
  89         'fulls': tally[255],
  90         'highs': sum(tally[128:]),
  91     }
  92 
  93 
  94 def seems_url(s: str) -> bool:
  95     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
  96     return any(s.startswith(p) for p in protocols)
  97 
  98 
  99 def handle_named_input(path: str) -> Dict[str, int]:
 100     if path == '-':
 101         return count_bytes(stdin.buffer)
 102 
 103     if seems_url(path):
 104         with urlopen(path) as inp:
 105             return count_bytes(inp)
 106 
 107     with open(path, mode='rb') as inp:
 108         return count_bytes(inp)
 109 
 110 
 111 header = '''
 112 name\tbytes\trunes\tlines\tlf\tcrlf\ttabs\tspaces\ttrails\tnulls\tfulls\thighs
 113 '''.strip()
 114 
 115 
 116 def show_results(name, counts) -> None:
 117     stdout.write(name)
 118     stdout.write('\t')
 119     stdout.write(str(counts['n']))
 120     for k in header.split('\t')[2:]:
 121         stdout.write('\t')
 122         stdout.write(str(counts[k]))
 123     stdout.write('\n')
 124 
 125 
 126 try:
 127     args = argv[1:]
 128     if args.count('-') > 1:
 129         msg = 'reading from `-` (standard input) more than once not allowed'
 130         raise ValueError(msg)
 131 
 132     if any(seems_url(e) for e in args):
 133         from urllib.request import urlopen
 134 
 135     # given no named inputs, just use stdin
 136     if len(args) == 0:
 137         args = ['-']
 138 
 139     # show header line right away, to reassure users something's happening
 140     stdout.write(header)
 141     stdout.write('\n')
 142 
 143     if len(args) == 1:
 144         # don't bother starting multiple interpreters for a single input
 145         results = [handle_named_input(args[0])]
 146     else:
 147         # vastly speed-up script by handling multiple inputs concurrently
 148         with Pool(processes=min(4, len(args))) as pool:
 149             results = pool.map(handle_named_input, args)
 150 
 151     for name, counts in zip(args, results):
 152         show_results(name, counts)
 153 except BrokenPipeError:
 154     # quit quietly, instead of showing a confusing error message
 155     stderr.close()
 156 except KeyboardInterrupt:
 157     exit(2)
 158 except Exception as e:
 159     fail(e, 1)