File: timer.py
   1 #!/usr/bin/python3
   2 
   3 # The MIT License (MIT)
   4 #
   5 # Copyright © 2024 pacman64
   6 #
   7 # Permission is hereby granted, free of charge, to any person obtaining a copy
   8 # of this software and associated documentation files (the “Software”), to deal
   9 # in the Software without restriction, including without limitation the rights
  10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 # copies of the Software, and to permit persons to whom the Software is
  12 # furnished to do so, subject to the following conditions:
  13 #
  14 # The above copyright notice and this permission notice shall be included in
  15 # all copies or substantial portions of the Software.
  16 #
  17 # THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  23 # SOFTWARE.
  24 
  25 
  26 # timer
  27 #
  28 # Show a live timer on stderr, which updates `in-place` by not emitting
  29 # line-feeds, and which shows both the time elapsed since its start, as
  30 # as well as the current date/time.
  31 #
  32 # If anything comes from stdin, this will show its lines verbatim on
  33 # stdout; piping data from stdin is optional.
  34 
  35 
  36 from datetime import datetime
  37 from threading import Lock, Thread
  38 from time import sleep, strftime
  39 from sys import argv, exit, stderr, stdin, stdout
  40 
  41 
  42 # info is the help message shown when asked to
  43 info = '''
  44 timer
  45 
  46 Show a live timer on stderr, which updates `in-place` by not emitting
  47 line-feeds, and which shows both the time elapsed since its start, as
  48 as well as the current date/time.
  49 
  50 If anything comes from stdin, this will show its lines verbatim on
  51 stdout; piping data from stdin is optional.
  52 '''.strip()
  53 
  54 # handle standard help cmd-line options, quitting right away in that case
  55 if len(argv) == 2 and argv[1] in ('-h', '--h', '-help', '--help'):
  56     print(info, file=stderr)
  57     exit(0)
  58 
  59 # clear hides previous contents on a line of text, while staying on the
  60 # same line; this is used to visually-clear the timer line on stderr
  61 clear = f'\r{" " * 60}\r'
  62 
  63 # mutex serializes output from the 2 concurrent tasks, avoiding any
  64 # collisions
  65 mutex = Lock()
  66 
  67 # done tells the timer task when it's time to quit; always use while
  68 # mutex is `engaged`, both when checking/changing it
  69 done = False
  70 
  71 
  72 def timer_done() -> bool:
  73     '''Check safely if the timer task should end right away.'''
  74     with mutex:
  75         return done
  76 
  77 
  78 def emit_time(started: datetime, now: datetime) -> None:
  79     '''This func does what it says, without using mutexes.'''
  80     dur = now - started
  81     h, rest = divmod(dur.seconds, 3600)
  82     m, s = divmod(rest, 60)
  83     fmt = '%Y-%m-%d %H:%M:%S %b %a'
  84     print(f'{h:02}:{m:02}:{s:02}    {now.strftime(fmt)}', end='', file=stderr)
  85     stderr.flush()
  86 
  87 
  88 def track_time(started: datetime) -> None:
  89     '''Handle the separate task which live-updates the timer line.'''
  90 
  91     while not timer_done():
  92         # show an updated timer line on stderr
  93         with mutex:
  94             now = datetime.now()
  95             print(clear, end='', file=stderr)
  96             emit_time(started, now)
  97 
  98         # wait a second, while checking for the quit-condition more
  99         # frequently than that, to make script end right away
 100         for _ in range(20):
 101             if timer_done():
 102                 break
 103             sleep(1/20)
 104 
 105     # when script is done, end with a final timer line on stderr
 106     track_last_time(started)
 107 
 108 
 109 def track_last_time(started: datetime) -> None:
 110     '''Show/update the timer line for the last time'''
 111 
 112     with mutex:
 113         now = datetime.now()
 114         # visually-clear stderr contents on its incomplete line
 115         print(clear, end='', file=stderr)
 116         # after visual clearing, update timer message on stderr
 117         emit_time(started, now)
 118         dur = now - started
 119         decs = int(dur.microseconds / 1000)
 120         # complete the final stderr line with the total time elapsed
 121         print(f'    {dur.seconds}.{decs:03} seconds', file=stderr)
 122         stderr.flush()
 123 
 124 
 125 def handle_lines(w, src) -> None:
 126     '''Emit all lines, as they come from the input-source given.'''
 127 
 128     for line in src:
 129         with mutex:
 130             now = datetime.now()
 131             # visually-clear stderr contents on its incomplete line
 132             print(clear, end='', file=stderr)
 133             # emit latest line from stdin, ignoring carriage-returns
 134             print(line.rstrip('\r\n').rstrip('\n'), file=w)
 135             # after visual clearing, update timer message on stderr
 136             emit_time(started, now)
 137 
 138 
 139 try:
 140     started = datetime.now()
 141     stdout.reconfigure(newline='\n', encoding='utf-8')
 142 
 143     # start timer task separately from the main one, which handles
 144     # lines from stdin
 145     Thread(target=track_time, args=[started], daemon=False).start()
 146 
 147     # quit when/if the stdin is over; the timer task is otherwise
 148     # supposed to continue until force-quit by the user
 149     # handle stdin, ignoring trailing carriage-returns in input lines
 150     handle_lines(stdout, stdin)
 151 except Exception as e:
 152     with mutex:
 153         now = datetime.now()
 154         print(clear, end='', file=stderr)
 155         print(f'\x1b[31m{e}\x1b[0m', file=stderr)
 156         stderr.flush()
 157     exit(1)
 158 except (BrokenPipeError, KeyboardInterrupt):
 159     # quit quietly, instead of showing a confusing error message
 160     track_last_time(started)
 161     stderr.flush()
 162     stderr.close()
 163 finally:
 164     # tell the timer task to quit, so the whole script can quit
 165     with mutex:
 166         done = True