File: timer.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2024 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 from datetime import datetime
  27 from sys import argv, exit, stderr, stdin, stdout
  28 from threading import Lock, Thread
  29 from time import sleep
  30 
  31 
  32 info = '''
  33 timer
  34 
  35 Show a live timer on stderr, which updates `in-place` by not emitting
  36 line-feeds, and which shows both the time elapsed since its start, as
  37 as well as the current date/time.
  38 
  39 If anything comes from stdin, this will show its lines verbatim on
  40 stdout; piping data from stdin is optional.
  41 '''
  42 
  43 # handle standard help cmd-line options, quitting right away in that case
  44 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'):
  45     print(info.strip(), file=stderr)
  46     exit(0)
  47 
  48 # clear hides previous contents on a line of text, while staying on the
  49 # same line; this is used to visually-clear the timer line on stderr
  50 clear = f'\r{" " * 60}\r'
  51 
  52 # mutex serializes output from the 2 concurrent tasks, avoiding any
  53 # collisions
  54 mutex = Lock()
  55 
  56 # done tells the timer task when it's time to quit; always use while
  57 # mutex is `engaged`, both when checking/changing it
  58 done = False
  59 
  60 
  61 def timer_done() -> bool:
  62     '''Check safely if the timer task should end right away.'''
  63     with mutex:
  64         return done
  65 
  66 
  67 def emit_time(started: datetime, now: datetime) -> None:
  68     '''This func does what it says, without using mutexes.'''
  69     dur = now - started
  70     h, rest = divmod(dur.seconds, 3600)
  71     m, s = divmod(rest, 60)
  72     fmt = '%Y-%m-%d %H:%M:%S %b %a'
  73     print(f'{h:02}:{m:02}:{s:02}    {now.strftime(fmt)}', end='', file=stderr)
  74     stderr.flush()
  75 
  76 
  77 def track_time(started: datetime) -> None:
  78     '''Handle the separate task which live-updates the timer line.'''
  79 
  80     while not timer_done():
  81         # show an updated timer line on stderr
  82         with mutex:
  83             now = datetime.now()
  84             print(clear, end='', file=stderr)
  85             emit_time(started, now)
  86 
  87         # wait a second, while checking for the quit-condition more
  88         # frequently than that, to make script end right away
  89         for _ in range(20):
  90             if timer_done():
  91                 break
  92             sleep(1/20)
  93 
  94     # when script is done, end with a final timer line on stderr
  95     track_last_time(started)
  96 
  97 
  98 def track_last_time(started: datetime) -> None:
  99     '''Show/update the timer line for the last time'''
 100 
 101     with mutex:
 102         now = datetime.now()
 103         # visually-clear stderr contents on its incomplete line
 104         print(clear, end='', file=stderr)
 105         # after visual clearing, update timer message on stderr
 106         emit_time(started, now)
 107         dur = now - started
 108         decs = int(dur.microseconds / 1000)
 109         # complete the final stderr line with the total time elapsed
 110         print(f'    {dur.seconds}.{decs:03} seconds', file=stderr)
 111         stderr.flush()
 112 
 113 
 114 def handle_lines(w, src) -> None:
 115     '''Emit all lines, as they come from the input-source given.'''
 116 
 117     for line in src:
 118         with mutex:
 119             now = datetime.now()
 120             # visually-clear stderr contents on its incomplete line
 121             print(clear, end='', file=stderr)
 122             # emit latest line from stdin, ignoring carriage-returns
 123             print(line.rstrip('\r\n').rstrip('\n'), file=w)
 124             # after visual clearing, update timer message on stderr
 125             emit_time(started, now)
 126 
 127 
 128 try:
 129     started = datetime.now()
 130 
 131     # start timer task separately from the main one, which handles
 132     # lines from stdin
 133     Thread(target=track_time, args=[started], daemon=False).start()
 134 
 135     # quit when/if the stdin is over; the timer task is otherwise
 136     # supposed to continue until force-quit by the user
 137     # handle stdin, ignoring trailing carriage-returns in input lines
 138     handle_lines(stdout, stdin)
 139 except Exception as e:
 140     with mutex:
 141         now = datetime.now()
 142         print(clear, end='', file=stderr)
 143         print(f'\x1b[31m{e}\x1b[0m', file=stderr)
 144         stderr.flush()
 145     exit(1)
 146 except (BrokenPipeError, KeyboardInterrupt):
 147     # quit quietly, instead of showing a confusing error message
 148     track_last_time(started)
 149     stderr.flush()
 150     stderr.close()
 151 finally:
 152     # tell the timer task to quit, so the whole script can quit
 153     with mutex:
 154         done = True