File: seto.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2024 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 # Note: string slicing is a major source of inefficiencies in this script,
  27 # making it viable only for small inputs; it's not clear what the stdlib
  28 # offers to loop over sub-strings without copying data, which is really
  29 # needed in this case.
  30 #
  31 # In the end the code has become much uglier by using explicit index-pairs,
  32 # which are used/updated all over to avoid copying sub-strings. Standard
  33 # output is already line-buffered by default, which is makes writing to it
  34 # already fairly fast.
  35 
  36 
  37 from io import TextIOWrapper
  38 from sys import argv, exit, stderr, stdin, stdout
  39 from typing import Iterator, List
  40 
  41 
  42 info = '''
  43 seto [command] [file/URI] [file/URI] [files/URIs...]
  44 
  45 
  46 SET Operations uses a named set operation, followed by 2 or more named input
  47 sources to read lines from, emitting the result. When given more than 2 input
  48 paths/URIs, the operation will be repeated using the latest result as the 1st
  49 set, and each additional input as the 2nd set to use.
  50 
  51 The set operations are
  52 
  53     seto and          [file/URI] [file/URI] [files/URIs...]
  54     seto int          same as `seto and`
  55     seto intersect    same as `seto and`
  56     seto intersection same as `seto and`
  57 
  58     seto sub         [file/URI] [file/URI] [files/URIs...]
  59     seto dif          same as `seto sub`
  60     seto diff         same as `seto sub`
  61     seto difference   same as `seto sub`
  62     seto subtract     same as `seto sub`
  63     seto subtraction  same as `seto sub`
  64 
  65     seto or           [file/URI] [file/URI] [files/URIs...]
  66     seto union        same as `seto or`
  67 
  68     seto xor          [file/URI] [file/URI] [files/URIs...]
  69 '''
  70 
  71 # handle standard help cmd-line options, quitting right away in that case
  72 if len(argv) < 2 or argv[1] in ('-h', '--h', '-help', '--help'):
  73     print(info.strip(), file=stderr)
  74     exit(0)
  75 
  76 
  77 def handle_lines(src: Iterator[str]) -> Iterator[str]:
  78     for i, line in enumerate(src):
  79         if i == 0:
  80             line = line.lstrip('\xef\xbb\xbf')
  81         line = line.rstrip('\r\n').rstrip('\n')
  82         yield line
  83 
  84 
  85 def seems_url(s: str) -> bool:
  86     protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:')
  87     return any(s.startswith(p) for p in protocols)
  88 
  89 
  90 def handle_input(path: str) -> Iterator[str]:
  91     if path == '-':
  92         yield from handle_lines(stdin)
  93     elif seems_url(path):
  94         with urlopen(path) as inp:
  95             with TextIOWrapper(inp, encoding='utf-8') as txt:
  96                 yield from handle_lines(txt)
  97     else:
  98         with open(path, encoding='utf-8') as inp:
  99             yield from handle_lines(inp)
 100 
 101 
 102 def intersect(inputs: List[str]) -> None:
 103     if len(inputs) < 2:
 104         raise Exception('intersect: not given enough inputs')
 105 
 106     lines = []
 107     seen = [set() for _ in range(len(inputs))]
 108 
 109     for i, path in enumerate(inputs):
 110         for line in handle_input(path):
 111             if line in seen[i]:
 112                 continue
 113             if i == 0:
 114                 lines.append(line)
 115             seen[i].add(line)
 116 
 117     for line in lines:
 118         if all(line in s for s in seen):
 119             print(line)
 120 
 121 
 122 def union(inputs: List[str]) -> None:
 123     if len(inputs) < 2:
 124         raise Exception('union: not given enough inputs')
 125 
 126     seen = set()
 127     for path in inputs:
 128         for line in handle_input(path):
 129             if line in seen:
 130                 continue
 131             print(line)
 132             seen.add(line)
 133 
 134 
 135 def difference(inputs: List[str]) -> None:
 136     if len(inputs) < 2:
 137         raise Exception('difference: not given enough inputs')
 138 
 139     lines = []
 140     seen = set()
 141     avoid = set()
 142 
 143     for i, path in enumerate(inputs):
 144         for line in handle_input(path):
 145             if i == 0:
 146                 if line in seen:
 147                     continue
 148                 lines.append(line)
 149                 seen.add(line)
 150             else:
 151                 avoid.add(line)
 152 
 153     for l in lines:
 154         if not any(l in a for a in avoid):
 155             print(l)
 156 
 157 
 158 def xor(inputs: List[str]) -> None:
 159     if len(inputs) < 2:
 160         raise Exception('xor: not given enough inputs')
 161 
 162     lines = []
 163     tally = {}
 164 
 165     for i, path in enumerate(inputs):
 166         for line in handle_input(path):
 167             lines.append(line)
 168             if line in tally:
 169                 tally[line] += 1
 170             else:
 171                 tally[line] = 1
 172 
 173     for l in lines:
 174         if tally[l] == 1:
 175             print(l)
 176 
 177 
 178 funcs = {
 179     'intersect': intersect,
 180     'inter': intersect,
 181     'int': intersect,
 182     'and': intersect,
 183 
 184     'union': union,
 185     'or': union,
 186 
 187     'difference': difference,
 188     'diff': difference,
 189     'dif': difference,
 190     'subtraction': difference,
 191     'sub': difference,
 192 
 193     'xor': xor,
 194 }
 195 
 196 
 197 try:
 198     if argv.count('-') > 1:
 199         msg = 'reading from `-` (standard input) more than once not allowed'
 200         raise ValueError(msg)
 201 
 202     if len(argv) < 4:
 203         raise Exception('this tool needs at least 2 inputs')
 204 
 205     fn = funcs[argv[1]]
 206     if fn is None:
 207         raise Exception('unknown set operation')
 208 
 209     if any(seems_url(e) for e in argv):
 210         from urllib.request import urlopen
 211 
 212     fn(argv[2:])
 213 except BrokenPipeError:
 214     # quit quietly, instead of showing a confusing error message
 215     stderr.close()
 216 except KeyboardInterrupt:
 217     exit(2)
 218 except Exception as e:
 219     print(info.strip(), file=stderr)
 220     print(f'\x1b[31m{e}\x1b[0m', file=stderr)
 221     exit(1)