import os import argparse import logging import xml.etree.ElementTree as ET import requests from datetime import datetime, timezone LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) LOGGER.addHandler(ch) def tag(tagname: str) -> str: el_prefix = '{http://www.w3.org/2005/Atom}' return el_prefix + tagname class Entry: def __init__(self, entry): self.title = entry.find(tag('title')).text self.url = entry.find(tag('id')).text self.author = entry.find(tag('author')).find(tag('name')).text self.date = datetime.fromisoformat(entry.find(tag('published')).text) def __str__(self): return f'New post by {self.author}: {self.title}\n{self.url}' def get_last_update_time(file: str) -> datetime: try: stat = os.stat(file) return datetime.fromtimestamp(stat.st_mtime, datetime.now().astimezone().tzinfo) except FileNotFoundError: return datetime.now(timezone.utc) def update_rss_file(file, feed): r = requests.get(feed, stream=True) with open(file, 'w') as fd: for chunk in r.iter_content(chunk_size=1024, decode_unicode=True): fd.write(chunk) def parse_rss_file(file) -> ET.Element: tree = ET.parse(file) return tree.getroot() def get_new_posts(time, root) -> list[Entry]: posts = [] for entry in root.findall(tag('entry')): e = Entry(entry) if e.date > time: posts.append(e) return posts def send_message(entry, url) -> int: obj = { 'content': str(entry) } r = requests.post(url, obj) return r.status_code # Check last mod date of current rss file # Download new version of file # Iterate over entries, storing ones with a newer date in an array # send POST to discord webhook if __name__ == "__main__": parser = argparse.ArgumentParser( prog='discussfeed', description='Checks for new entries from an atom feed and sends them to a discord webhook') parser.add_argument('-w', '--webhook') parser.add_argument('feed_url') args = parser.parse_args() filename = os.path.basename(args.feed_url) last_updated = get_last_update_time(filename) LOGGER.info(f'last updated: {last_updated}') update_rss_file(filename, args.feed_url) root = parse_rss_file(filename) posts = get_new_posts(last_updated, root) posts.sort(key=lambda post: post.date) LOGGER.info(f'found {len(posts)} new posts') for post in posts: status = send_message(post, args.webhook) if status >= 300: LOGGER.error(f'Response code {status}') else: LOGGER.info(f'response code {status}')