#!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import notify from signal import signal, SIGINT import sys from time import sleep from os.path import expanduser, join from datetime import datetime, timedelta TIME_FILE = join(expanduser("~"), "timekeeping.csv") TODAY_FILE = join(expanduser("~"), ".timekeeping") is_on_break = False today_fields = [] start_time = datetime.now() def die(*args): print("Error: {}".format(*args), file=sys.stderr) sys.exit(1) def log(*args): print(*args, file=sys.stderr) def signal_handler(signal, frame): global today_fields now = datetime.now() hour = now.strftime("%H:%M") if not is_on_break: sys.exit(0) else: break_time = now - start_time log("\nBreak ended at {} and lasted {}".format( hour, td_format(break_time))) today_fields.append(start_time.strftime("%H:%M")) today_fields.append(hour) with open(TODAY_FILE, "w") as f: f.write(",".join(today_fields)) sys.exit(0) def td_format(td): prefix = "" if td < timedelta(0): td = -td prefix = "-" hours, remainder = divmod(td.total_seconds(), 3600) minutes, seconds = divmod(remainder, 60) return '{}{:d}:{:02d}'.format(prefix, int(hours), int(minutes)) def get_today_fields(): try: with open(TODAY_FILE, "r") as f: line_count = 0 for line in f: line_count += 1 if line_count > 1: log("Warning: incomplete day leftover in {}".format(TODAY_FILE)) if line.startswith(start_time.strftime("%Y-%m-%d")): return line.strip().split(",") except FileNotFoundError: return [] return [] def work(args): total_delta = timedelta() month_delta = timedelta() count = 0 month_count = 0 with open(TIME_FILE, "r") as f: for line in f: fields = line.strip().split(",", 5) date = datetime.strptime(fields[0], "%Y-%m-%d") t = datetime.strptime(fields[4], "%H:%M") delta = timedelta(hours=t.hour, minutes=t.minute) if date.month == start_time.month: month_delta += delta month_count += 1 total_delta += delta count += 1 eight_h = timedelta(hours=8) * count mean_time = total_delta / count if count > 0 else total_delta minutes = total_delta - eight_h month_mean_time = month_delta / month_count if month_count > 0 else month_delta month_min = month_delta - timedelta(hours=(8 * month_count)) print("stats: {}, {}".format( td_format(mean_time), td_format(minutes))) print("month stats: {}, {}".format( td_format(month_mean_time), td_format(month_min))) # Try to calculate remaining time fields = get_today_fields() hour = start_time.strftime("%H:%M") if not fields: print("No work ongoing") return fields.append(hour) # Test for even number of timestamp (but fields[0] is the date) if len(fields) < 3: die("not enough fields in {}".format(TODAY_FILE)) elif len(fields) % 2 == 0: # Break in progress log("Break in progress") return begin_time = None worked_time = timedelta() for field in fields[1:]: try: if begin_time is None: begin_time = datetime.strptime(field, "%H:%M") else: end_time = datetime.strptime(field, "%H:%M") worked_time += end_time - begin_time begin_time = None except ValueError: die("couldn't parse field '{}' in {}".format( field, TODAY_FILE)) day_start = datetime.strptime(fields[1], "%H:%M") day_end = datetime.strptime(fields[-1], "%H:%M") total_time = day_end - day_start break_time = total_time - worked_time day_end_estimation = day_start + timedelta(hours=8) + break_time print("Worked {} already today. Estimated leave at {}".format( td_format(worked_time), day_end_estimation.strftime("%H:%M"))) def work_start(args): fields = get_today_fields() try: hour = datetime.strptime(args.time, "%H:%M") except ValueError: die("Error: {} is not a valid time".format(args.time)) if fields: die("You already started working") with open(TODAY_FILE, "a") as f: f.write("{},{}".format( start_time.strftime("%Y-%m-%d"), hour.strftime("%H:%M"))) log("Started working at {}".format(hour.strftime("%H:%M"))) def work_pause(args): global is_on_break global today_fields is_on_break = True today_fields = get_today_fields() hour = start_time.strftime("%H:%M") if not today_fields: die("no work to take a break from") log("Taking a break at {}".format(hour)) notify.init("work") # Wait to be stopped by a Ctrl-C reminder_interval = 5 # In minutes count = 0 while True: sleep(reminder_interval * 60) count += 1 notify.send("Pause reminder", "It has now been {} minutes".format(count * reminder_interval)) def work_end(args): fields = get_today_fields() hour = start_time.strftime("%H:%M") if not fields: die("Why try to leave when you haven't even started") fields.append(hour) if len(fields) < 3: die("not enough fields in {}".format(TODAY_FILE)) # Test for even number of timestamp (but fields[0] is the date) elif len(fields) % 2 == 0: die("odd number of timestamps in {}".format(TODAY_FILE)) begin_time = None worked_time = timedelta() for field in fields[1:]: try: if begin_time is None: begin_time = datetime.strptime(field, "%H:%M") else: end_time = datetime.strptime(field, "%H:%M") worked_time += end_time - begin_time begin_time = None except ValueError: die("couldn't parse field '{}' in {}".format( field, TODAY_FILE)) day_start = datetime.strptime(fields[1], "%H:%M") day_end = datetime.strptime(fields[-1], "%H:%M") total_time = day_end - day_start break_time = total_time - worked_time with open(TIME_FILE, "a") as f: f.write('{},{},{},{},{},"{}"\n'.format( fields[0], day_start.strftime("%H:%M"), start_time.strftime("%H:%M"), td_format(break_time), td_format(worked_time), args.description)) # Erase TODAY_FILE with open(TODAY_FILE, "w") as f: f.write("") f.flush() log("Finished working at {} after working {}".format( hour, td_format(worked_time))) def work_export(args): print("export") def work_parse(args): new_lines = [] try: with open(args.file, "r") as f: for line in f: fields = line.strip().split(",") if len(fields) < 6: log("Record: '{}' hasn't got enough fields ({})".format(line, len(fields))) continue times = [] for field in fields[1:]: try: time = datetime.strptime(field, "%H:%M") times.append(time) except ValueError: break if len(times) % 2 != 0: die("Error: uneven number of timestamps ({}) in line '{}'".format(len(times), line)) desc = ','.join(fields[len(times)+1:]).strip() worked_time = timedelta() for i in range(int(len(times) / 2)): worked_time += times[i * 2 + 1] - times[i * 2] morning = times[0] evening = times[-1] full_day = evening - morning break_time = full_day - worked_time new_lines.append('{},{},{},{},{},{}'.format( fields[0], morning.strftime("%H:%M"), evening.strftime("%H:%M"), td_format(break_time), td_format(worked_time), desc)) except FileNotFoundError: die("file not found: {}".format(args.file)) # TODO do sanity checking, like if a day already exists with open(TIME_FILE, "a") as f: for line in new_lines: f.write("{}\n".format(line)) log("Written {} new entries to {}".format(len(new_lines), TIME_FILE)) if __name__ == "__main__": # Handle Ctrl-C signal(SIGINT, signal_handler) parser = argparse.ArgumentParser() parser.set_defaults(func=work) commands = parser.add_subparsers(dest="command") start_parser = commands.add_parser("start") pause_parser = commands.add_parser("pause") end_parser = commands.add_parser("end") export_parser = commands.add_parser("export") parse_parser = commands.add_parser("parse") start_parser.set_defaults(func=work_start) start_parser.add_argument("time", nargs="?", default=datetime.now().strftime("%H:%M")) pause_parser.set_defaults(func=work_pause) end_parser.set_defaults(func=work_end) end_parser.add_argument("description") export_parser.set_defaults(func=work_export) parse_parser.set_defaults(func=work_parse) parse_parser.add_argument("file") args = parser.parse_args() args.func(args) #now = datetime.now() #with open(TIME_FILE, "r") as f: # for line in f: # print(line.strip())