broken idle version
This commit is contained in:
		
							parent
							
								
									0a56d15a83
								
							
						
					
					
						commit
						b5cce2a049
					
				
							
								
								
									
										303
									
								
								uvgotmail_buggy_idle.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										303
									
								
								uvgotmail_buggy_idle.py
									
									
									
									
									
										Executable file
									
								
							| @ -0,0 +1,303 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import argparse | ||||
| import sys | ||||
| import os | ||||
| import atexit | ||||
| 
 | ||||
| def parseYN(s, default = False): | ||||
|     if default == False: | ||||
|         return s.lower() in ['true', '1', 't', 'y', 'yes', 'yeah', 'yup', 'certainly', 'uh-huh', 'go for it'] | ||||
|     else: return not s.lower() in ['false', '0', 'f', 'n', 'no', 'nah', 'nope', 'certainly not', 'nuh-huh', 'bugger that'] | ||||
| 
 | ||||
| def getYN(b): | ||||
|     return "yes" if b else "no" | ||||
| 
 | ||||
| def parseConfig(args=None,config_path=None): | ||||
|     # default settings | ||||
|     if args: | ||||
|         config = { "no_count": args.no_count, "mailbox_symbol": "📬", "mutt": args.mutt, "ssl": True, "server": "", "port": 0, "user": "", "password": "" } | ||||
|     if not config_path: | ||||
|         config_path = args.config | ||||
|     filename = os.path.expanduser(config_path) | ||||
|     # if we have a config file, parse it | ||||
|     if os.path.exists(filename): | ||||
|         with open(filename) as f: | ||||
|             for line in f: | ||||
|                 line = line.strip() | ||||
|                 if len(line) == 0: continue | ||||
|                 if line[0] == "#": continue # skip commented lines | ||||
|                 kv = line.strip().split("=",1); | ||||
|                 kv[0] = kv[0].replace("-","_"); | ||||
|                 if len(kv)==1: | ||||
|                     continue | ||||
|                 try: | ||||
|                     # load types appropriately | ||||
|                     if type(config[kv[0]]) == type(True): | ||||
|                         config[kv[0]] = (kv[1] == 'True') # cast to boolean | ||||
|                     elif type(config[kv[0]]) == type(1): | ||||
|                         config[kv[0]] = int(kv[1]) | ||||
|                     else: | ||||
|                         config[kv[0]] = kv[1] | ||||
|                 except: | ||||
|                     print("uvgotmail: unknown option: "+kv[0]) | ||||
|             # sort booleans | ||||
|     # otherwise, prompt the user to create one | ||||
|     elif(args.daemon and not args.mutt): | ||||
|         if parseYN(input("uvgotmail: no config file exists. create now? [Y/n] "),default=True): | ||||
|             createConfig(config_path, config,args) | ||||
|             return | ||||
|         else: | ||||
|             print("uvgotmail: failed to start.") | ||||
|             exit(1) | ||||
| 
 | ||||
|     # if no port is specified, use default | ||||
|     if config["port"] == 0: | ||||
|         config["port"] = 993 if config["ssl"] else 143 | ||||
| 
 | ||||
|     # read muttrc if we should do that | ||||
|     if config["mutt"] == True: | ||||
|         try: # try both locations of dotfile | ||||
|             with open(os.path.expanduser("~/.muttrc")) as f: | ||||
|                 muttrc = f.readlines(); | ||||
|         except Exception as e: | ||||
|             try: | ||||
|                 with open(os.path.expanduser("~/.mutt/muttrc")) as f: | ||||
|                     muttrc = f.readlines(); | ||||
|             except Exception as f: | ||||
|                 print(e) | ||||
|                 print(f) | ||||
|                 print("uvgotmail: could not read your muttrc. does it exist?") | ||||
|                 exit(1) | ||||
|         for line in muttrc: | ||||
|             line = line.strip() | ||||
|             if len(line) == 0: | ||||
|                 continue | ||||
|             if line[0] == "#": continue # skip commented lines | ||||
|             if "set imap_user" in line: | ||||
|                 config["user"]=line.split("=",1)[1].strip().replace('"','') | ||||
|             if "set imap_pass" in line: | ||||
|                 # voodoo to remove "s only from the start and end of password, if present | ||||
|                 config["password"]="".join(line.split("=",1)[1].strip().replace('"','',1).rsplit('"',1)) | ||||
|             if "set folder" in line: | ||||
|                 folder = line.split("=",1)[1].strip().replace('"','') | ||||
|                 if folder.startswith("imaps"): | ||||
|                     config["ssl"] = True | ||||
|                 server_port = folder.split("//")[1].split(":") | ||||
|                 config["server"] = server_port[0] | ||||
|                 config["port"] = server_port[1] | ||||
| 
 | ||||
|     # make sure at least a server and a username are set | ||||
|     if config["server"] == "" or config["user"] == "": | ||||
|         print("uvgotmail: failed to start. no imap server or username specified.") | ||||
|     return config | ||||
| 
 | ||||
| def createConfig(config_path, config,args): | ||||
|     # helper function to deal with setting each item | ||||
|     def getInput(prompt, existing): | ||||
|         boolean = type(existing) == type(True) | ||||
|         existing_str = getYN(existing) if boolean else str(existing) | ||||
|         inp = input(prompt + " [" + existing_str + "]: ") | ||||
|         if inp=="": | ||||
|             return existing | ||||
|         if boolean: | ||||
|             inp = parseYN(inp) | ||||
|         return inp | ||||
| 
 | ||||
|     import datetime | ||||
|     config_path = getInput("where do you want to put your config file? (directory tree will be created if necessary)",config_path) | ||||
|     config["mailbox_symbol"] = getInput("what symbol do you want to display when there are unread messages?",config["mailbox_symbol"]) | ||||
|     config["no_count"] = not getInput("do you want to display the number of unread messages?",not config["no_count"]) | ||||
|     config["mutt"] = getInput("do you want to use your muttrc for the server settings?",config["mutt"]) | ||||
|     if not config["mutt"]: | ||||
|         config["server"] = getInput("what's your IMAP server?",config["server"]) | ||||
|         config["ssl"] = getInput("are you using ssl? it's " + str(datetime.date.today().year) +" so you really should be",config["ssl"]) | ||||
|         if(config["port"] == ""): | ||||
|             config["port"] = 993 if config["ssl"] else 143 | ||||
|         config["port"] = getInput("what's the IMAP port?",config["port"]) | ||||
|         config["user"]= getInput("what's your IMAP username?",config["user"]) | ||||
|         config["password"] = getInput("what's your IMAP password?",config["password"]) | ||||
|     # confirm all this is correct; if not go round again | ||||
|     if(not parseYN(input("is this correct? [y/N] "))): | ||||
|         createConfig(config_path,config) | ||||
|         return | ||||
|     else: | ||||
|         from pathlib import Path | ||||
|         while True: | ||||
|             path =  Path(os.path.expanduser(config_path)) | ||||
|             try: | ||||
|                 path.parent.mkdir(parents=True, exist_ok=True,mode=0o700) | ||||
|                 path.touch(mode=0o600,exist_ok=False) | ||||
|                 with path.open(mode="w") as f: | ||||
|                     for i,key in enumerate(config): | ||||
|                         f.write(key+"="+str(config[key])+("\n" if i < len(config)-1 else "")) | ||||
|                 print("config file created; starting daemon") | ||||
|                 parseConfig(config_path = config_path,args=args) | ||||
|                 break | ||||
|             except Exception as e: | ||||
|                 if not parseYN(input("failed to create config file: " + str(e) +"... retry? [Y/n] ")): | ||||
|                     exit(1) | ||||
|                 config_path = getInput("where do you want to put your config file? (directory tree will be created if necessary)",config_path) | ||||
| 
 | ||||
| 
 | ||||
| def main(): | ||||
|     parser = argparse.ArgumentParser() | ||||
|     parser.add_argument("--daemon", help="run in daemon mode (put this at end of .bashrc)", action="store_true") | ||||
|     parser.add_argument("--config", help="specify config file (default: ~/.uvgotmail/config)", default="~/.uvgotmail/config") | ||||
|     parser.add_argument("--mutt", help="use existing mutt configuration (experimental)", action="store_true") | ||||
|     parser.add_argument("--check", help="run in check mode (put this in PS1)", action="store_true") | ||||
|     parser.add_argument("--no-count", help="don't display how many unread messages there are", action="store_true") | ||||
|     parser.add_argument("--mailbox-symbol", help="specify a string instead of the mailbox emoji",default="") | ||||
|     parser.add_argument("--debug", help="print errors",action="store_true") | ||||
|     args = parser.parse_args() | ||||
| 
 | ||||
| 
 | ||||
|     global config | ||||
|     config = parseConfig(args); | ||||
| 
 | ||||
|     config["debug"] = args.debug | ||||
| 
 | ||||
|     if(args.daemon == args.check): | ||||
|         parser.print_help() | ||||
|         sys.exit(0) | ||||
|     if(args.daemon): | ||||
|         daemon() | ||||
|     else: | ||||
|         check() | ||||
| 
 | ||||
| def check(): | ||||
|     filename = os.path.expanduser('~/.uvgotmail/unread') | ||||
|     if os.path.exists(filename): | ||||
|         with open(filename) as f: | ||||
|             unread = f.read(); | ||||
|             if unread != '': | ||||
|                 unread = int(unread) | ||||
|             else: | ||||
|                 unread = 0 | ||||
|         if unread == 0: | ||||
|             sys.exit(0) | ||||
|         pre = "" | ||||
|         if(unread > 1 and config["no_count"] == False): | ||||
|             pre = "("+str(unread)+") " | ||||
|         print(pre+config["mailbox_symbol"],end=' ') | ||||
|     # if there's no unread count file, just exit quietly | ||||
| 
 | ||||
| def writeUnreadFile(unseen): | ||||
|     try: | ||||
|         with open(os.path.expanduser('~/.uvgotmail/unread'),'w') as f: | ||||
|             f.write(str(len(unseen))) | ||||
|     except Exception as e: | ||||
|         if config["debug"]: print(e) | ||||
|         print("uvgotmail: could not write to unread file") | ||||
| 
 | ||||
| # we're implementing this in a disgusting way. i'm sorry. | ||||
| import time | ||||
| import socket | ||||
| import re | ||||
| 
 | ||||
| def idleFunction(s,unseen): | ||||
|     s._startIdle = time.time() | ||||
|     if 'IDLE' not in s.capabilities: | ||||
|         raise s.error('Server does not support IDLE') | ||||
|     idle_tag = s._command('IDLE')  # start idling | ||||
|     s._get_response() | ||||
|     while line := s._get_line(): | ||||
|         if b'FETCH' in line: | ||||
|             l = str(line) | ||||
|             msg = re.findall(r'\d+',l)[0] | ||||
|             if config["debug"]: print(msg, unseen) | ||||
|             if "seen" in l.lower(): | ||||
|                 unseen.remove(msg) | ||||
|             else: | ||||
|                 unseen.append(msg) | ||||
|             print(unseen) | ||||
|             writeUnreadFile(unseen) | ||||
|         else: | ||||
|             # we only want to idle for ten minutes maximum. we will use the keep-alive messages to keep track in a single thread. | ||||
|             if(time.time() - s.start > 600): | ||||
|                 s.send(b'DONE' + imaplib.CRLF) | ||||
|                 return s._command_complete('IDLE', idle_tag) | ||||
| 
 | ||||
| 
 | ||||
| # thanks to trentbuck on github for this awful idea | ||||
| 
 | ||||
| def daemon(): | ||||
|     import imaplib | ||||
|     imaplib.Commands['IDLE'] = ('AUTH', 'SELECTED') | ||||
|     class IMAP4_SSL_plus_IDLE(imaplib.IMAP4_SSL): | ||||
|         def idle(self, unseen): | ||||
|             idleFunction(self,unseen) | ||||
|     class IMAP4_plus_IDLE(imaplib.IMAP4): | ||||
|         def idle(self, unseen): | ||||
|             idleFunction(self,unseen) | ||||
| 
 | ||||
|     IMAP4_SSL = IMAP4_SSL_plus_IDLE | ||||
|     IMAP4 = IMAP4_plus_IDLE | ||||
|     socket.setdefaulttimeout(120) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|     # first check if daemon is already running | ||||
|     pidfile = os.path.expanduser('~/.uvgotmail/.run.pid') | ||||
|     if os.path.exists(pidfile): | ||||
|         with open(pidfile) as f: | ||||
|             pid = f.read() | ||||
|             if pid != '': | ||||
|                 pid = int(pid) | ||||
|             else: | ||||
|                 pid = 0 | ||||
|             running_cmd = os.popen("ps -p" + str(pid) +" -o command").read() | ||||
|             current_cmd = os.popen("ps -p" + str(os.getpid()) +" -o command").read() | ||||
|             if running_cmd == current_cmd or os.path.basename(sys.argv[0]) in running_cmd: | ||||
|                 # the process is running; we don't need to start up at all | ||||
|                 exit(0) | ||||
|     try: | ||||
|         with open(pidfile,'w') as f: | ||||
|             f.write(str(os.getpid())) | ||||
|         def delPid(pidfile): | ||||
|             os.remove(pidfile) | ||||
|         atexit.register(delPid,pidfile) | ||||
|     except: | ||||
|         print("uvgotmail: failed to write to PID file") | ||||
|         exit(1) | ||||
| 
 | ||||
|     # main loop | ||||
|     import time | ||||
|     start_time = time.time(); | ||||
|     IMAP4_func = IMAP4_SSL if config["ssl"] else IMAP4 | ||||
|     while True: | ||||
|         try: | ||||
|             with IMAP4_func(config["server"],config["port"]) as conn: | ||||
|                 conn.login(user=config["user"],password=config["password"]) | ||||
|                 conn.select(mailbox='INBOX',readonly=True) | ||||
|                 typ,msgnums = conn.search(None, "UNSEEN"); | ||||
|                 unseen = [n.decode() for n in msgnums if n != b''] | ||||
|                 if len(unseen)>0: | ||||
|                     unseen=unseen[0].split(' ') | ||||
|                 else: | ||||
|                     unseen=[] | ||||
|                 if config["debug"]: print(unseen) | ||||
|                 writeUnreadFile(unseen) | ||||
|                 #try: | ||||
|                 resp = conn.idle(unseen) | ||||
|                 #except Exception as e: | ||||
|                 if config["debug"]: print(e) | ||||
|                 if time.time() - start_time <= 10: | ||||
|                     print("uvgotmail: too many imap errors. exiting.") | ||||
|                     sys.exit(1) | ||||
|                 start_time = time.time(); | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             if config["debug"]: | ||||
|                 print(e) | ||||
|             print("uvgotmail: failed to log in to server. dumping config.") | ||||
|             print(config); | ||||
|             exit(1) | ||||
| 
 | ||||
| # handle ctrl+c gracefully | ||||
| if __name__ == '__main__': | ||||
|     try: | ||||
|         main() | ||||
|     except KeyboardInterrupt: | ||||
|         print('Interrupted') | ||||
|         exit(130) | ||||
| 
 | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user