296 lines
9.5 KiB
Python
Executable file
296 lines
9.5 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
import notify
|
|
from signal import signal, SIGINT
|
|
import sys
|
|
from time import sleep
|
|
from os.path import expanduser, join
|
|
from datetime import datetime, timedelta
|
|
|
|
TIME_FILE = join(expanduser("~"), "timekeeping.csv")
|
|
TODAY_FILE = join(expanduser("~"), ".timekeeping")
|
|
|
|
is_on_break = False
|
|
today_fields = []
|
|
start_time = datetime.now()
|
|
|
|
|
|
def die(*args):
|
|
print("Error: {}".format(*args), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def log(*args):
|
|
print(*args, file=sys.stderr)
|
|
|
|
|
|
def signal_handler(signal, frame):
|
|
global today_fields
|
|
now = datetime.now()
|
|
hour = now.strftime("%H:%M")
|
|
if not is_on_break:
|
|
sys.exit(0)
|
|
else:
|
|
break_time = now - start_time
|
|
log("\nBreak ended at {} and lasted {}".format(
|
|
hour, td_format(break_time)))
|
|
today_fields.append(start_time.strftime("%H:%M"))
|
|
today_fields.append(hour)
|
|
with open(TODAY_FILE, "w") as f:
|
|
f.write(",".join(today_fields))
|
|
sys.exit(0)
|
|
|
|
|
|
def td_format(td):
|
|
prefix = ""
|
|
if td < timedelta(0):
|
|
td = -td
|
|
prefix = "-"
|
|
hours, remainder = divmod(td.total_seconds(), 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
|
|
return '{}{:d}:{:02d}'.format(prefix, int(hours), int(minutes))
|
|
|
|
|
|
def get_today_fields():
|
|
try:
|
|
with open(TODAY_FILE, "r") as f:
|
|
line_count = 0
|
|
for line in f:
|
|
line_count += 1
|
|
if line_count > 1:
|
|
log("Warning: incomplete day leftover in {}".format(TODAY_FILE))
|
|
if line.startswith(start_time.strftime("%Y-%m-%d")):
|
|
return line.strip().split(",")
|
|
except FileNotFoundError:
|
|
return []
|
|
return []
|
|
|
|
|
|
def work(args):
|
|
total_delta = timedelta()
|
|
month_delta = timedelta()
|
|
count = 0
|
|
month_count = 0
|
|
with open(TIME_FILE, "r") as f:
|
|
for line in f:
|
|
fields = line.strip().split(",", 5)
|
|
date = datetime.strptime(fields[0], "%Y-%m-%d")
|
|
t = datetime.strptime(fields[4], "%H:%M")
|
|
delta = timedelta(hours=t.hour, minutes=t.minute)
|
|
if date.month == start_time.month:
|
|
month_delta += delta
|
|
month_count += 1
|
|
total_delta += delta
|
|
count += 1
|
|
eight_h = timedelta(hours=8) * count
|
|
mean_time = total_delta / count if count > 0 else total_delta
|
|
minutes = total_delta - eight_h
|
|
month_mean_time = month_delta / month_count if month_count > 0 else month_delta
|
|
month_min = month_delta - timedelta(hours=(8 * month_count))
|
|
print("stats: {}, {}".format(
|
|
td_format(mean_time),
|
|
td_format(minutes)))
|
|
print("month stats: {}, {}".format(
|
|
td_format(month_mean_time),
|
|
td_format(month_min)))
|
|
|
|
# Try to calculate remaining time
|
|
fields = get_today_fields()
|
|
hour = start_time.strftime("%H:%M")
|
|
if not fields:
|
|
print("No work ongoing")
|
|
return
|
|
fields.append(hour)
|
|
# Test for even number of timestamp (but fields[0] is the date)
|
|
if len(fields) < 3:
|
|
die("not enough fields in {}".format(TODAY_FILE))
|
|
elif len(fields) % 2 == 0:
|
|
# Break in progress
|
|
log("Break in progress")
|
|
return
|
|
begin_time = None
|
|
worked_time = timedelta()
|
|
for field in fields[1:]:
|
|
try:
|
|
if begin_time is None:
|
|
begin_time = datetime.strptime(field, "%H:%M")
|
|
else:
|
|
end_time = datetime.strptime(field, "%H:%M")
|
|
worked_time += end_time - begin_time
|
|
begin_time = None
|
|
except ValueError:
|
|
die("couldn't parse field '{}' in {}".format(
|
|
field, TODAY_FILE))
|
|
day_start = datetime.strptime(fields[1], "%H:%M")
|
|
day_end = datetime.strptime(fields[-1], "%H:%M")
|
|
total_time = day_end - day_start
|
|
break_time = total_time - worked_time
|
|
day_end_estimation = day_start + timedelta(hours=8) + break_time
|
|
print("Worked {} already today. Estimated leave at {}".format(
|
|
td_format(worked_time),
|
|
day_end_estimation.strftime("%H:%M")))
|
|
|
|
|
|
|
|
def work_start(args):
|
|
fields = get_today_fields()
|
|
try:
|
|
hour = datetime.strptime(args.time, "%H:%M")
|
|
except ValueError:
|
|
die("Error: {} is not a valid time".format(args.time))
|
|
if fields:
|
|
die("You already started working")
|
|
with open(TODAY_FILE, "a") as f:
|
|
f.write("{},{}".format(
|
|
start_time.strftime("%Y-%m-%d"),
|
|
hour.strftime("%H:%M")))
|
|
log("Started working at {}".format(hour.strftime("%H:%M")))
|
|
|
|
|
|
def work_pause(args):
|
|
global is_on_break
|
|
global today_fields
|
|
is_on_break = True
|
|
today_fields = get_today_fields()
|
|
hour = start_time.strftime("%H:%M")
|
|
if not today_fields:
|
|
die("no work to take a break from")
|
|
log("Taking a break at {}".format(hour))
|
|
|
|
notify.init("work")
|
|
# Wait to be stopped by a Ctrl-C
|
|
reminder_interval = 5 # In minutes
|
|
count = 0
|
|
while True:
|
|
sleep(reminder_interval * 60)
|
|
count += 1
|
|
notify.send("Pause reminder",
|
|
"It has now been {} minutes".format(count * reminder_interval))
|
|
|
|
|
|
def work_end(args):
|
|
fields = get_today_fields()
|
|
hour = start_time.strftime("%H:%M")
|
|
if not fields:
|
|
die("Why try to leave when you haven't even started")
|
|
fields.append(hour)
|
|
if len(fields) < 3:
|
|
die("not enough fields in {}".format(TODAY_FILE))
|
|
# Test for even number of timestamp (but fields[0] is the date)
|
|
elif len(fields) % 2 == 0:
|
|
die("odd number of timestamps in {}".format(TODAY_FILE))
|
|
begin_time = None
|
|
worked_time = timedelta()
|
|
for field in fields[1:]:
|
|
try:
|
|
if begin_time is None:
|
|
begin_time = datetime.strptime(field, "%H:%M")
|
|
else:
|
|
end_time = datetime.strptime(field, "%H:%M")
|
|
worked_time += end_time - begin_time
|
|
begin_time = None
|
|
except ValueError:
|
|
die("couldn't parse field '{}' in {}".format(
|
|
field, TODAY_FILE))
|
|
day_start = datetime.strptime(fields[1], "%H:%M")
|
|
day_end = datetime.strptime(fields[-1], "%H:%M")
|
|
total_time = day_end - day_start
|
|
break_time = total_time - worked_time
|
|
with open(TIME_FILE, "a") as f:
|
|
f.write('{},{},{},{},{},"{}"\n'.format(
|
|
fields[0],
|
|
day_start.strftime("%H:%M"),
|
|
start_time.strftime("%H:%M"),
|
|
td_format(break_time),
|
|
td_format(worked_time),
|
|
args.description))
|
|
# Erase TODAY_FILE
|
|
with open(TODAY_FILE, "w") as f:
|
|
f.write("")
|
|
f.flush()
|
|
log("Finished working at {} after working {}".format(
|
|
hour, td_format(worked_time)))
|
|
|
|
|
|
def work_export(args):
|
|
print("export")
|
|
|
|
|
|
def work_parse(args):
|
|
new_lines = []
|
|
try:
|
|
with open(args.file, "r") as f:
|
|
for line in f:
|
|
fields = line.strip().split(",")
|
|
if len(fields) < 6:
|
|
log("Record: '{}' hasn't got enough fields ({})".format(line, len(fields)))
|
|
continue
|
|
times = []
|
|
for field in fields[1:]:
|
|
try:
|
|
time = datetime.strptime(field, "%H:%M")
|
|
times.append(time)
|
|
except ValueError:
|
|
break
|
|
if len(times) % 2 != 0:
|
|
die("Error: uneven number of timestamps ({}) in line '{}'".format(len(times), line))
|
|
|
|
desc = ','.join(fields[len(times)+1:]).strip()
|
|
|
|
worked_time = timedelta()
|
|
for i in range(int(len(times) / 2)):
|
|
worked_time += times[i * 2 + 1] - times[i * 2]
|
|
|
|
morning = times[0]
|
|
evening = times[-1]
|
|
full_day = evening - morning
|
|
break_time = full_day - worked_time
|
|
new_lines.append('{},{},{},{},{},{}'.format(
|
|
fields[0],
|
|
morning.strftime("%H:%M"),
|
|
evening.strftime("%H:%M"),
|
|
td_format(break_time),
|
|
td_format(worked_time),
|
|
desc))
|
|
except FileNotFoundError:
|
|
die("file not found: {}".format(args.file))
|
|
# TODO do sanity checking, like if a day already exists
|
|
with open(TIME_FILE, "a") as f:
|
|
for line in new_lines:
|
|
f.write("{}\n".format(line))
|
|
log("Written {} new entries to {}".format(len(new_lines), TIME_FILE))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Handle Ctrl-C
|
|
signal(SIGINT, signal_handler)
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.set_defaults(func=work)
|
|
commands = parser.add_subparsers(dest="command")
|
|
|
|
start_parser = commands.add_parser("start")
|
|
pause_parser = commands.add_parser("pause")
|
|
end_parser = commands.add_parser("end")
|
|
export_parser = commands.add_parser("export")
|
|
parse_parser = commands.add_parser("parse")
|
|
|
|
start_parser.set_defaults(func=work_start)
|
|
start_parser.add_argument("time", nargs="?", default=datetime.now().strftime("%H:%M"))
|
|
pause_parser.set_defaults(func=work_pause)
|
|
end_parser.set_defaults(func=work_end)
|
|
end_parser.add_argument("description")
|
|
export_parser.set_defaults(func=work_export)
|
|
parse_parser.set_defaults(func=work_parse)
|
|
parse_parser.add_argument("file")
|
|
|
|
args = parser.parse_args()
|
|
args.func(args)
|
|
|
|
#now = datetime.now()
|
|
#with open(TIME_FILE, "r") as f:
|
|
# for line in f:
|
|
# print(line.strip())
|