101 lines
2.8 KiB
Python
101 lines
2.8 KiB
Python
import os
|
|
import argparse
|
|
import logging
|
|
import xml.etree.ElementTree as ET
|
|
import requests
|
|
from datetime import datetime, timezone
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
LOGGER.setLevel(logging.INFO)
|
|
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
ch.setFormatter(formatter)
|
|
|
|
LOGGER.addHandler(ch)
|
|
|
|
|
|
def tag(tagname: str) -> str:
|
|
el_prefix = '{http://www.w3.org/2005/Atom}'
|
|
return el_prefix + tagname
|
|
|
|
|
|
class Entry:
|
|
def __init__(self, entry):
|
|
self.title = entry.find(tag('title')).text
|
|
self.url = entry.find(tag('id')).text
|
|
self.author = entry.find(tag('author')).find(tag('name')).text
|
|
self.date = datetime.fromisoformat(entry.find(tag('published')).text)
|
|
|
|
def __str__(self):
|
|
return f'New post by {self.author}: {self.title}\n{self.url}'
|
|
|
|
|
|
def get_last_update_time(file: str) -> datetime:
|
|
try:
|
|
stat = os.stat(file)
|
|
return datetime.fromtimestamp(stat.st_mtime,
|
|
datetime.now().astimezone().tzinfo)
|
|
except FileNotFoundError:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def update_rss_file(file, feed):
|
|
r = requests.get(feed, stream=True)
|
|
with open(file, 'w') as fd:
|
|
for chunk in r.iter_content(chunk_size=1024, decode_unicode=True):
|
|
fd.write(chunk)
|
|
|
|
|
|
def parse_rss_file(file) -> ET.Element:
|
|
tree = ET.parse(file)
|
|
return tree.getroot()
|
|
|
|
|
|
def get_new_posts(time, root) -> list[Entry]:
|
|
posts = []
|
|
for entry in root.findall(tag('entry')):
|
|
e = Entry(entry)
|
|
if e.date > time:
|
|
posts.append(e)
|
|
return posts
|
|
|
|
|
|
def send_message(entry, url) -> int:
|
|
obj = {
|
|
'content': str(entry)
|
|
}
|
|
r = requests.post(url, obj)
|
|
return r.status_code
|
|
|
|
# Check last mod date of current rss file
|
|
# Download new version of file
|
|
# Iterate over entries, storing ones with a newer date in an array
|
|
# send POST to discord webhook
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
prog='discussfeed',
|
|
description='Checks for new entries from an atom feed and sends them to a discord webhook')
|
|
parser.add_argument('-w', '--webhook')
|
|
parser.add_argument('feed_url')
|
|
args = parser.parse_args()
|
|
|
|
filename = os.path.basename(args.feed_url)
|
|
last_updated = get_last_update_time(filename)
|
|
LOGGER.info(f'last updated: {last_updated}')
|
|
|
|
update_rss_file(filename, args.feed_url)
|
|
root = parse_rss_file(filename)
|
|
posts = get_new_posts(last_updated, root)
|
|
LOGGER.info(f'found {len(posts)} new posts')
|
|
|
|
for post in posts:
|
|
status = send_message(post, args.webhook)
|
|
if status >= 300:
|
|
LOGGER.error(f'Response code {status}')
|
|
else:
|
|
LOGGER.info(f'response code {status}')
|