discussfeed/discussfeed.py

102 lines
2.8 KiB
Python

import os
import argparse
import logging
import xml.etree.ElementTree as ET
import requests
from datetime import datetime, timezone
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
LOGGER.addHandler(ch)
def tag(tagname: str) -> str:
el_prefix = '{http://www.w3.org/2005/Atom}'
return el_prefix + tagname
class Entry:
def __init__(self, entry):
self.title = entry.find(tag('title')).text
self.url = entry.find(tag('id')).text
self.author = entry.find(tag('author')).find(tag('name')).text
self.date = datetime.fromisoformat(entry.find(tag('published')).text)
def __str__(self):
return f'New post by {self.author}: {self.title}\n{self.url}'
def get_last_update_time(file: str) -> datetime:
try:
stat = os.stat(file)
return datetime.fromtimestamp(stat.st_mtime,
datetime.now().astimezone().tzinfo)
except FileNotFoundError:
return datetime.now(timezone.utc)
def update_rss_file(file, feed):
r = requests.get(feed, stream=True)
with open(file, 'w') as fd:
for chunk in r.iter_content(chunk_size=1024, decode_unicode=True):
fd.write(chunk)
def parse_rss_file(file) -> ET.Element:
tree = ET.parse(file)
return tree.getroot()
def get_new_posts(time, root) -> list[Entry]:
posts = []
for entry in root.findall(tag('entry')):
e = Entry(entry)
if e.date > time:
posts.append(e)
return posts
def send_message(entry, url) -> int:
obj = {
'content': str(entry)
}
r = requests.post(url, obj)
return r.status_code
# Check last mod date of current rss file
# Download new version of file
# Iterate over entries, storing ones with a newer date in an array
# send POST to discord webhook
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='discussfeed',
description='Checks for new entries from an atom feed and sends them to a discord webhook')
parser.add_argument('-w', '--webhook')
parser.add_argument('feed_url')
args = parser.parse_args()
filename = os.path.basename(args.feed_url)
last_updated = get_last_update_time(filename)
LOGGER.info(f'last updated: {last_updated}')
update_rss_file(filename, args.feed_url)
root = parse_rss_file(filename)
posts = get_new_posts(last_updated, root)
posts.sort(key=lambda post: post.date)
LOGGER.info(f'found {len(posts)} new posts')
for post in posts:
status = send_message(post, args.webhook)
if status >= 300:
LOGGER.error(f'Response code {status}')
else:
LOGGER.info(f'response code {status}')