File: timer.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2024 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 from datetime import datetime
  27 from sys import argv, exit, stderr, stdin, stdout
  28 from threading import Lock, Thread
  29 from time import sleep
  30 
  31 
  32 info = '''
  33 timer
  34 
  35 Show a live timer on stderr, which updates `in-place` by not emitting
  36 line-feeds, and which shows both the time elapsed since its start, as
  37 as well as the current date/time.
  38 
  39 If anything comes from stdin, this will show its lines verbatim on
  40 stdout; piping data from stdin is optional.
  41 '''
  42 
  43 if len(argv) > 1 and argv[1] in ('-h', '--h', '-help', '--help'):
  44     print(info.strip(), file=stderr)
  45     exit(0)
  46 
  47 if len(argv) > 1:
  48     msg = f'\x1b[31m{argv[0]}: no args are supported/allowed\x1b[0m'
  49     print(msg, file=stderr)
  50     exit(1)
  51 
  52 # clear hides previous contents on a line of text, while staying on the
  53 # same line; this is used to visually-clear the timer line on stderr
  54 clear = f'\r{" " * 60}\r'
  55 
  56 # mutex serializes output from the 2 concurrent tasks, avoiding any
  57 # collisions
  58 mutex = Lock()
  59 
  60 # done tells the timer task when it's time to quit; always use while
  61 # mutex is `engaged`, both when checking/changing it
  62 done = False
  63 
  64 
  65 def timer_done() -> bool:
  66     'Check safely if the timer task should end right away.'
  67     with mutex:
  68         return done
  69 
  70 
  71 def emit_time(started: datetime, now: datetime) -> None:
  72     'This func does what it says, without using mutexes.'
  73     dur = now - started
  74     h, rest = divmod(dur.seconds, 3600)
  75     m, s = divmod(rest, 60)
  76     fmt = '%Y-%m-%d %H:%M:%S %b %a'
  77     print(f'{h:02}:{m:02}:{s:02}    {now.strftime(fmt)}', end='', file=stderr)
  78     stderr.flush()
  79 
  80 
  81 def track_time(started: datetime) -> None:
  82     'Handle the separate task which live-updates the timer line.'
  83 
  84     while not timer_done():
  85         # show an updated timer line on stderr
  86         with mutex:
  87             now = datetime.now()
  88             print(clear, end='', file=stderr)
  89             emit_time(started, now)
  90 
  91         # wait a second, while checking for the quit-condition more
  92         # frequently than that, to make script end right away
  93         for _ in range(20):
  94             if timer_done():
  95                 break
  96             sleep(1/20)
  97 
  98     # when script is done, end with a final timer line on stderr
  99     track_last_time(started)
 100 
 101 
 102 def track_last_time(started: datetime) -> None:
 103     'Show/update the timer line for the last time.'
 104 
 105     with mutex:
 106         now = datetime.now()
 107         # visually-clear stderr contents on its incomplete line
 108         print(clear, end='', file=stderr)
 109         # after visual clearing, update timer message on stderr
 110         emit_time(started, now)
 111         dur = now - started
 112         decs = int(dur.microseconds / 1000)
 113         # complete the final stderr line with the total time elapsed
 114         print(f'    {dur.seconds}.{decs:03} seconds', file=stderr)
 115 
 116 
 117 def handle_lines(w, src) -> None:
 118     'Emit all lines, as they come from the input-source given.'
 119 
 120     for line in src:
 121         with mutex:
 122             now = datetime.now()
 123             # visually-clear stderr contents on its incomplete line
 124             print(clear, end='', file=stderr)
 125             # emit latest line from stdin, ignoring carriage-returns
 126             print(line.rstrip('\r\n').rstrip('\n'), file=w)
 127             # after visual clearing, update timer message on stderr
 128             emit_time(started, now)
 129 
 130 
 131 try:
 132     code = 0
 133     started = datetime.now()
 134 
 135     # start timer task separately from the main one, which handles
 136     # lines from stdin
 137     t = Thread(target=track_time, args=[started], daemon=False)
 138     t.start()
 139 
 140     # quit when/if the stdin is over; the timer task is otherwise
 141     # supposed to continue until force-quit by the user
 142     # handle stdin, ignoring trailing carriage-returns in input lines
 143     handle_lines(stdout, stdin)
 144 except (BrokenPipeError, KeyboardInterrupt):
 145     code = 1
 146     # quit quietly, instead of showing a confusing error message
 147     track_last_time(started)
 148     stderr.close()
 149 except Exception as e:
 150     code = 1
 151     with mutex:
 152         now = datetime.now()
 153         print(clear, end='', file=stderr)
 154         print(f'\x1b[31m{e}\x1b[0m', file=stderr)
 155 
 156 # tell the timer task to quit, so the whole script can quit
 157 with mutex:
 158     done = True
 159 
 160 t.join()
 161 exit(code)