File: seto.py 1 #!/usr/bin/python3 2 3 # The MIT License (MIT) 4 # 5 # Copyright © 2024 pacman64 6 # 7 # Permission is hereby granted, free of charge, to any person obtaining a copy 8 # of this software and associated documentation files (the “Software”), to deal 9 # in the Software without restriction, including without limitation the rights 10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 11 # copies of the Software, and to permit persons to whom the Software is 12 # furnished to do so, subject to the following conditions: 13 # 14 # The above copyright notice and this permission notice shall be included in 15 # all copies or substantial portions of the Software. 16 # 17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 23 # SOFTWARE. 24 25 26 # Note: string slicing is a major source of inefficiencies in this script, 27 # making it viable only for small inputs; it's not clear what the stdlib 28 # offers to loop over sub-strings without copying data, which is really 29 # needed in this case. 30 # 31 # In the end the code has become much uglier by using explicit index-pairs, 32 # which are used/updated all over to avoid copying sub-strings. Standard 33 # output is already line-buffered by default, which is makes writing to it 34 # already fairly fast. 35 36 37 from io import TextIOWrapper 38 from sys import argv, exit, stderr, stdin, stdout 39 from typing import Iterator, List 40 41 42 info = ''' 43 seto [command] [file/URI] [file/URI] [files/URIs...] 44 45 46 SET Operations uses a named set operation, followed by 2 or more named input 47 sources to read lines from, emitting the result. When given more than 2 input 48 paths/URIs, the operation will be repeated using the latest result as the 1st 49 set, and each additional input as the 2nd set to use. 50 51 The set operations are 52 53 seto and [file/URI] [file/URI] [files/URIs...] 54 seto int same as `seto and` 55 seto intersect same as `seto and` 56 seto intersection same as `seto and` 57 58 seto sub [file/URI] [file/URI] [files/URIs...] 59 seto dif same as `seto sub` 60 seto diff same as `seto sub` 61 seto difference same as `seto sub` 62 seto subtract same as `seto sub` 63 seto subtraction same as `seto sub` 64 65 seto or [file/URI] [file/URI] [files/URIs...] 66 seto union same as `seto or` 67 68 seto xor [file/URI] [file/URI] [files/URIs...] 69 ''' 70 71 # handle standard help cmd-line options, quitting right away in that case 72 if len(argv) < 2 or argv[1] in ('-h', '--h', '-help', '--help'): 73 print(info.strip(), file=stderr) 74 exit(0) 75 76 77 def handle_lines(src: Iterator[str]) -> Iterator[str]: 78 for i, line in enumerate(src): 79 if i == 0: 80 line = line.lstrip('\xef\xbb\xbf') 81 line = line.rstrip('\r\n').rstrip('\n') 82 yield line 83 84 85 def seems_url(s: str) -> bool: 86 protocols = ('https://', 'http://', 'file://', 'ftp://', 'data:') 87 return any(s.startswith(p) for p in protocols) 88 89 90 def handle_input(path: str) -> Iterator[str]: 91 if path == '-': 92 yield from handle_lines(stdin) 93 elif seems_url(path): 94 with urlopen(path) as inp: 95 with TextIOWrapper(inp, encoding='utf-8') as txt: 96 yield from handle_lines(txt) 97 else: 98 with open(path, encoding='utf-8') as inp: 99 yield from handle_lines(inp) 100 101 102 def intersect(inputs: List[str]) -> None: 103 if len(inputs) < 2: 104 raise Exception('intersect: not given enough inputs') 105 106 lines = [] 107 seen = [set() for _ in range(len(inputs))] 108 109 for i, path in enumerate(inputs): 110 for line in handle_input(path): 111 if line in seen[i]: 112 continue 113 if i == 0: 114 lines.append(line) 115 seen[i].add(line) 116 117 for line in lines: 118 if all(line in s for s in seen): 119 print(line) 120 121 122 def union(inputs: List[str]) -> None: 123 if len(inputs) < 2: 124 raise Exception('union: not given enough inputs') 125 126 seen = set() 127 for path in inputs: 128 for line in handle_input(path): 129 if line in seen: 130 continue 131 print(line) 132 seen.add(line) 133 134 135 def difference(inputs: List[str]) -> None: 136 if len(inputs) < 2: 137 raise Exception('difference: not given enough inputs') 138 139 lines = [] 140 seen = set() 141 avoid = set() 142 143 for i, path in enumerate(inputs): 144 for line in handle_input(path): 145 if i == 0: 146 if line in seen: 147 continue 148 lines.append(line) 149 seen.add(line) 150 else: 151 avoid.add(line) 152 153 for l in lines: 154 if not any(l in a for a in avoid): 155 print(l) 156 157 158 def xor(inputs: List[str]) -> None: 159 if len(inputs) < 2: 160 raise Exception('xor: not given enough inputs') 161 162 lines = [] 163 tally = {} 164 165 for i, path in enumerate(inputs): 166 for line in handle_input(path): 167 lines.append(line) 168 if line in tally: 169 tally[line] += 1 170 else: 171 tally[line] = 1 172 173 for l in lines: 174 if tally[l] == 1: 175 print(l) 176 177 178 funcs = { 179 'intersect': intersect, 180 'inter': intersect, 181 'int': intersect, 182 'and': intersect, 183 184 'union': union, 185 'or': union, 186 187 'difference': difference, 188 'diff': difference, 189 'dif': difference, 190 'subtraction': difference, 191 'sub': difference, 192 193 'xor': xor, 194 } 195 196 197 try: 198 if argv.count('-') > 1: 199 msg = 'reading from `-` (standard input) more than once not allowed' 200 raise ValueError(msg) 201 202 if len(argv) < 4: 203 raise Exception('this tool needs at least 2 inputs') 204 205 fn = funcs[argv[1]] 206 if fn is None: 207 raise Exception('unknown set operation') 208 209 if any(seems_url(e) for e in argv): 210 from urllib.request import urlopen 211 212 fn(argv[2:]) 213 except BrokenPipeError: 214 # quit quietly, instead of showing a confusing error message 215 stderr.close() 216 except KeyboardInterrupt: 217 exit(2) 218 except Exception as e: 219 print(info.strip(), file=stderr) 220 print(f'\x1b[31m{e}\x1b[0m', file=stderr) 221 exit(1)