diff --git a/uvgotmail_buggy_idle.py b/uvgotmail_buggy_idle.py new file mode 100755 index 0000000..9156c8d --- /dev/null +++ b/uvgotmail_buggy_idle.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +import argparse +import sys +import os +import atexit + +def parseYN(s, default = False): + if default == False: + return s.lower() in ['true', '1', 't', 'y', 'yes', 'yeah', 'yup', 'certainly', 'uh-huh', 'go for it'] + else: return not s.lower() in ['false', '0', 'f', 'n', 'no', 'nah', 'nope', 'certainly not', 'nuh-huh', 'bugger that'] + +def getYN(b): + return "yes" if b else "no" + +def parseConfig(args=None,config_path=None): + # default settings + if args: + config = { "no_count": args.no_count, "mailbox_symbol": "📬", "mutt": args.mutt, "ssl": True, "server": "", "port": 0, "user": "", "password": "" } + if not config_path: + config_path = args.config + filename = os.path.expanduser(config_path) + # if we have a config file, parse it + if os.path.exists(filename): + with open(filename) as f: + for line in f: + line = line.strip() + if len(line) == 0: continue + if line[0] == "#": continue # skip commented lines + kv = line.strip().split("=",1); + kv[0] = kv[0].replace("-","_"); + if len(kv)==1: + continue + try: + # load types appropriately + if type(config[kv[0]]) == type(True): + config[kv[0]] = (kv[1] == 'True') # cast to boolean + elif type(config[kv[0]]) == type(1): + config[kv[0]] = int(kv[1]) + else: + config[kv[0]] = kv[1] + except: + print("uvgotmail: unknown option: "+kv[0]) + # sort booleans + # otherwise, prompt the user to create one + elif(args.daemon and not args.mutt): + if parseYN(input("uvgotmail: no config file exists. create now? [Y/n] "),default=True): + createConfig(config_path, config,args) + return + else: + print("uvgotmail: failed to start.") + exit(1) + + # if no port is specified, use default + if config["port"] == 0: + config["port"] = 993 if config["ssl"] else 143 + + # read muttrc if we should do that + if config["mutt"] == True: + try: # try both locations of dotfile + with open(os.path.expanduser("~/.muttrc")) as f: + muttrc = f.readlines(); + except Exception as e: + try: + with open(os.path.expanduser("~/.mutt/muttrc")) as f: + muttrc = f.readlines(); + except Exception as f: + print(e) + print(f) + print("uvgotmail: could not read your muttrc. does it exist?") + exit(1) + for line in muttrc: + line = line.strip() + if len(line) == 0: + continue + if line[0] == "#": continue # skip commented lines + if "set imap_user" in line: + config["user"]=line.split("=",1)[1].strip().replace('"','') + if "set imap_pass" in line: + # voodoo to remove "s only from the start and end of password, if present + config["password"]="".join(line.split("=",1)[1].strip().replace('"','',1).rsplit('"',1)) + if "set folder" in line: + folder = line.split("=",1)[1].strip().replace('"','') + if folder.startswith("imaps"): + config["ssl"] = True + server_port = folder.split("//")[1].split(":") + config["server"] = server_port[0] + config["port"] = server_port[1] + + # make sure at least a server and a username are set + if config["server"] == "" or config["user"] == "": + print("uvgotmail: failed to start. no imap server or username specified.") + return config + +def createConfig(config_path, config,args): + # helper function to deal with setting each item + def getInput(prompt, existing): + boolean = type(existing) == type(True) + existing_str = getYN(existing) if boolean else str(existing) + inp = input(prompt + " [" + existing_str + "]: ") + if inp=="": + return existing + if boolean: + inp = parseYN(inp) + return inp + + import datetime + config_path = getInput("where do you want to put your config file? (directory tree will be created if necessary)",config_path) + config["mailbox_symbol"] = getInput("what symbol do you want to display when there are unread messages?",config["mailbox_symbol"]) + config["no_count"] = not getInput("do you want to display the number of unread messages?",not config["no_count"]) + config["mutt"] = getInput("do you want to use your muttrc for the server settings?",config["mutt"]) + if not config["mutt"]: + config["server"] = getInput("what's your IMAP server?",config["server"]) + config["ssl"] = getInput("are you using ssl? it's " + str(datetime.date.today().year) +" so you really should be",config["ssl"]) + if(config["port"] == ""): + config["port"] = 993 if config["ssl"] else 143 + config["port"] = getInput("what's the IMAP port?",config["port"]) + config["user"]= getInput("what's your IMAP username?",config["user"]) + config["password"] = getInput("what's your IMAP password?",config["password"]) + # confirm all this is correct; if not go round again + if(not parseYN(input("is this correct? [y/N] "))): + createConfig(config_path,config) + return + else: + from pathlib import Path + while True: + path = Path(os.path.expanduser(config_path)) + try: + path.parent.mkdir(parents=True, exist_ok=True,mode=0o700) + path.touch(mode=0o600,exist_ok=False) + with path.open(mode="w") as f: + for i,key in enumerate(config): + f.write(key+"="+str(config[key])+("\n" if i < len(config)-1 else "")) + print("config file created; starting daemon") + parseConfig(config_path = config_path,args=args) + break + except Exception as e: + if not parseYN(input("failed to create config file: " + str(e) +"... retry? [Y/n] ")): + exit(1) + config_path = getInput("where do you want to put your config file? (directory tree will be created if necessary)",config_path) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--daemon", help="run in daemon mode (put this at end of .bashrc)", action="store_true") + parser.add_argument("--config", help="specify config file (default: ~/.uvgotmail/config)", default="~/.uvgotmail/config") + parser.add_argument("--mutt", help="use existing mutt configuration (experimental)", action="store_true") + parser.add_argument("--check", help="run in check mode (put this in PS1)", action="store_true") + parser.add_argument("--no-count", help="don't display how many unread messages there are", action="store_true") + parser.add_argument("--mailbox-symbol", help="specify a string instead of the mailbox emoji",default="") + parser.add_argument("--debug", help="print errors",action="store_true") + args = parser.parse_args() + + + global config + config = parseConfig(args); + + config["debug"] = args.debug + + if(args.daemon == args.check): + parser.print_help() + sys.exit(0) + if(args.daemon): + daemon() + else: + check() + +def check(): + filename = os.path.expanduser('~/.uvgotmail/unread') + if os.path.exists(filename): + with open(filename) as f: + unread = f.read(); + if unread != '': + unread = int(unread) + else: + unread = 0 + if unread == 0: + sys.exit(0) + pre = "" + if(unread > 1 and config["no_count"] == False): + pre = "("+str(unread)+") " + print(pre+config["mailbox_symbol"],end=' ') + # if there's no unread count file, just exit quietly + +def writeUnreadFile(unseen): + try: + with open(os.path.expanduser('~/.uvgotmail/unread'),'w') as f: + f.write(str(len(unseen))) + except Exception as e: + if config["debug"]: print(e) + print("uvgotmail: could not write to unread file") + +# we're implementing this in a disgusting way. i'm sorry. +import time +import socket +import re + +def idleFunction(s,unseen): + s._startIdle = time.time() + if 'IDLE' not in s.capabilities: + raise s.error('Server does not support IDLE') + idle_tag = s._command('IDLE') # start idling + s._get_response() + while line := s._get_line(): + if b'FETCH' in line: + l = str(line) + msg = re.findall(r'\d+',l)[0] + if config["debug"]: print(msg, unseen) + if "seen" in l.lower(): + unseen.remove(msg) + else: + unseen.append(msg) + print(unseen) + writeUnreadFile(unseen) + else: + # we only want to idle for ten minutes maximum. we will use the keep-alive messages to keep track in a single thread. + if(time.time() - s.start > 600): + s.send(b'DONE' + imaplib.CRLF) + return s._command_complete('IDLE', idle_tag) + + +# thanks to trentbuck on github for this awful idea + +def daemon(): + import imaplib + imaplib.Commands['IDLE'] = ('AUTH', 'SELECTED') + class IMAP4_SSL_plus_IDLE(imaplib.IMAP4_SSL): + def idle(self, unseen): + idleFunction(self,unseen) + class IMAP4_plus_IDLE(imaplib.IMAP4): + def idle(self, unseen): + idleFunction(self,unseen) + + IMAP4_SSL = IMAP4_SSL_plus_IDLE + IMAP4 = IMAP4_plus_IDLE + socket.setdefaulttimeout(120) + + + + # first check if daemon is already running + pidfile = os.path.expanduser('~/.uvgotmail/.run.pid') + if os.path.exists(pidfile): + with open(pidfile) as f: + pid = f.read() + if pid != '': + pid = int(pid) + else: + pid = 0 + running_cmd = os.popen("ps -p" + str(pid) +" -o command").read() + current_cmd = os.popen("ps -p" + str(os.getpid()) +" -o command").read() + if running_cmd == current_cmd or os.path.basename(sys.argv[0]) in running_cmd: + # the process is running; we don't need to start up at all + exit(0) + try: + with open(pidfile,'w') as f: + f.write(str(os.getpid())) + def delPid(pidfile): + os.remove(pidfile) + atexit.register(delPid,pidfile) + except: + print("uvgotmail: failed to write to PID file") + exit(1) + + # main loop + import time + start_time = time.time(); + IMAP4_func = IMAP4_SSL if config["ssl"] else IMAP4 + while True: + try: + with IMAP4_func(config["server"],config["port"]) as conn: + conn.login(user=config["user"],password=config["password"]) + conn.select(mailbox='INBOX',readonly=True) + typ,msgnums = conn.search(None, "UNSEEN"); + unseen = [n.decode() for n in msgnums if n != b''] + if len(unseen)>0: + unseen=unseen[0].split(' ') + else: + unseen=[] + if config["debug"]: print(unseen) + writeUnreadFile(unseen) + #try: + resp = conn.idle(unseen) + #except Exception as e: + if config["debug"]: print(e) + if time.time() - start_time <= 10: + print("uvgotmail: too many imap errors. exiting.") + sys.exit(1) + start_time = time.time(); + + except Exception as e: + if config["debug"]: + print(e) + print("uvgotmail: failed to log in to server. dumping config.") + print(config); + exit(1) + +# handle ctrl+c gracefully +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print('Interrupted') + exit(130) +