File: coby.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2020-2025 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 from multiprocessing import Pool
  27 from sys import argv, exit, stderr, stdin, stdout
  28 from typing import Dict
  29 
  30 
  31 info = '''
  32 coby [filepaths/URIs...]
  33 
  34 COunt BYtes finds various byte-related stats for the files/URIs given. When
  35 given no named inputs, it uses standard input by default.
  36 '''
  37 
  38 # a leading help-option arg means show the help message and quit
  39 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'):
  40     print(info.strip())
  41     exit(0)
  42 
  43 
  44 def fail(msg, code: int = 1) -> None:
  45     'Show the error message given, and quit the app right away.'
  46     print(f'\x1b[31m{msg}\x1b[0m', file=stderr)
  47     exit(code)
  48 
  49 
  50 def count_bytes(src) -> Dict[str, int]:
  51     crlf = 0
  52     trails = 0
  53     tally = [0] * 256
  54 
  55     # counting lines with trailing spaces needs remembering the previous 2
  56     # bytes, as the last trailing space in a line can come either before a
  57     # single line-feed byte, or a CRLF byte-pair
  58     prev2 = 0
  59     prev1 = 0
  60 
  61     for chunk in src:
  62         for b in chunk:
  63             tally[b] += 1
  64             lf = b == 10 # 10 is ord('\n')
  65             crlf += lf and prev1 == 13 # 13 is ord('\r')
  66             trails += (lf and prev1 == 32) or (prev2 == 32 and prev1 == 13)
  67             # notice how the last 2 bytes are remembered even across chunks
  68             prev2 = prev1
  69             prev1 = b
  70 
  71     n = sum(tally)
  72     lines = tally[ord('\n')]
  73     if lines == 0 and n > 0:
  74         lines += 1
  75     trails += (prev1 == 32) or (prev2 == 32 and prev1 == 13)
  76 
  77     return {
  78         'n': sum(tally),
  79         'lines': lines,
  80         'lf': tally[ord('\n')],
  81         'crlf': crlf,
  82         'tabs': tally[ord('\t')],
  83         'spaces': tally[ord(' ')],
  84         'trails': trails,
  85         'nulls': tally[0],
  86         'fulls': tally[255],
  87         'highs': sum(tally[128:]),
  88     }
  89 
  90 
  91 def seems_url(s: str) -> bool:
  92     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
  93     return any(s.startswith(p) for p in protocols)
  94 
  95 
  96 def handle_named_input(path: str) -> Dict[str, int]:
  97     if path == '-':
  98         return count_bytes(stdin.buffer)
  99 
 100     if seems_url(path):
 101         with urlopen(path) as inp:
 102             return count_bytes(inp)
 103 
 104     with open(path, mode='rb') as inp:
 105         return count_bytes(inp)
 106 
 107 
 108 header = '''
 109 name\tbytes\tlines\tlf\tcrlf\ttabs\tspaces\ttrails\tnulls\tfulls\thighs
 110 '''.strip()
 111 
 112 
 113 def show_results(name, counts) -> None:
 114     stdout.write(name)
 115     stdout.write('\t')
 116     stdout.write(str(counts['n']))
 117     for k in header.split('\t')[2:]:
 118         stdout.write('\t')
 119         stdout.write(str(counts[k]))
 120     stdout.write('\n')
 121 
 122 
 123 try:
 124     args = argv[1:]
 125     if args.count('-') > 1:
 126         msg = 'reading from `-` (standard input) more than once not allowed'
 127         raise ValueError(msg)
 128 
 129     if any(seems_url(e) for e in args):
 130         from urllib.request import urlopen
 131 
 132     # given no named inputs, just use stdin
 133     if len(args) == 0:
 134         args = ['-']
 135 
 136     # show header line right away, to reassure users something's happening
 137     stdout.write(header)
 138     stdout.write('\n')
 139 
 140     if len(args) == 1:
 141         # don't bother starting multiple interpreters for a single input
 142         results = [handle_named_input(args[0])]
 143     else:
 144         # vastly speed-up script by handling multiple inputs concurrently
 145         with Pool(processes=min(4, len(args))) as pool:
 146             results = pool.map(handle_named_input, args)
 147 
 148     for name, counts in zip(args, results):
 149         show_results(name, counts)
 150 except BrokenPipeError:
 151     # quit quietly, instead of showing a confusing error message
 152     stderr.close()
 153 except KeyboardInterrupt:
 154     exit(2)
 155 except Exception as e:
 156     fail(e, 1)