488 lines
17 KiB
Python
488 lines
17 KiB
Python
|
|
import sys, os, traceback
|
|
import dateutil.parser
|
|
from time import strftime, localtime
|
|
|
|
# returns html-formatted string
|
|
def make_buttons(btn_dict, msg_id):
|
|
buttons = "<div class=\"buttons\">"
|
|
fmt = "<a href=\"%s\">[%s]</a>"
|
|
for key in btn_dict:
|
|
url = btn_dict[key]
|
|
if url[-1] == '=':
|
|
# then interpret it as a query string
|
|
url += str(msg_id)
|
|
buttons += fmt % (url,key)
|
|
buttons += "</div>"
|
|
return buttons
|
|
|
|
# apply div classes for use with .css
|
|
def make_post(num, timestamp, conf, msg):
|
|
fmt = conf["format"]
|
|
if "buttons" in conf:
|
|
b = make_buttons(conf["buttons"], num)
|
|
else:
|
|
b = ""
|
|
return fmt.format(
|
|
__timestamp__=timestamp, __num__=num, __msg__=msg, __btn__=b)
|
|
|
|
def make_gallery(indices, w, conf=None):
|
|
tag = []
|
|
if indices == []:
|
|
return tag
|
|
template = '''
|
|
<div class=\"panel\">
|
|
<a href=\"%s\"><img src=\"%s\" class=\"embed\"></a>
|
|
</div>
|
|
'''
|
|
tag.append("<div class=\"gallery\">")
|
|
for index in reversed(indices):
|
|
image = w.pop(index)
|
|
is_path = image[0] == '.' or image[0] == '/'
|
|
if conf and not is_path:
|
|
thumb = "%s/%s" % (conf["path_to_thumb"], image)
|
|
full = "%s/%s" % (conf["path_to_fullsize"], image)
|
|
tag.append(template % (full,thumb))
|
|
continue
|
|
elif not conf and not is_path:
|
|
msg = ("Warning: no path defined for image %s!" % image)
|
|
print(msg,file=sys.stderr)
|
|
else:
|
|
pass
|
|
tag.append(template % (image, image))
|
|
tag.append("</div>")
|
|
return tag
|
|
|
|
# apply basic HTML formatting - only div class here is gallery
|
|
from html import escape
|
|
def markup(message, config):
|
|
def is_image(s, image_formats):
|
|
l = s.rsplit('.', maxsplit=1)
|
|
if len(l) < 2:
|
|
return False
|
|
# Python 3.10.5
|
|
# example result that had to be filtered:
|
|
# string: started.
|
|
# result: ['started', '']
|
|
if l[1] == str(''):
|
|
return False
|
|
#print(s, l, file=sys.stderr)
|
|
if l[1] in image_formats:
|
|
return True
|
|
return False
|
|
|
|
result = 0
|
|
tagged = ""
|
|
# support multiple images (gallery style)
|
|
tags = [] # list of strings
|
|
output = []
|
|
gallery = []
|
|
ptags = config["tag_paragraphs"]
|
|
sep = ""
|
|
if "line_separator" in config:
|
|
sep = config["line_separator"]
|
|
for line in message:
|
|
images = [] # list of integers
|
|
words = line.split()
|
|
for i in range(len(words)):
|
|
word = words[i]
|
|
# don't help people click http
|
|
if word.find("src=") == 0 or word.find("href=") == 0:
|
|
continue
|
|
elif word.find("https://") != -1:
|
|
w = escape(word)
|
|
new_word = ("<a href=\"%s\">%s</a>") % (w, w)
|
|
words[i] = new_word
|
|
elif word.find("#") != -1 and len(word) > 1:
|
|
# split by unicode blank character if present
|
|
# allows tagging such as #fanfic|tion
|
|
w = word.split(chr(8206))
|
|
# w[0] is the portion closest to the #
|
|
tags.append(w[0])
|
|
new_word = "<span class=\"hashtag\">%s</span>" % (w[0])
|
|
if len(w) > 1:
|
|
new_word += w[1]
|
|
words[i] = new_word
|
|
elif is_image(word, config["accepted_images"]):
|
|
images.append(i)
|
|
if len(images) > 0:
|
|
# function invokes pop() which modifies list 'words'
|
|
gc = config["gallery"] if "gallery" in config else None
|
|
gallery = make_gallery(images, words, gc)
|
|
if ptags and len(words) > 0:
|
|
words.insert(0,"<p>")
|
|
words.append("</p>")
|
|
output.append(" ".join(words))
|
|
# avoid paragraph with an image gallery
|
|
if len(gallery) > 0:
|
|
output.append("".join(gallery))
|
|
gallery = []
|
|
return sep.join(output), tags
|
|
|
|
class Post:
|
|
def __init__(self, ts, msg):
|
|
self.timestamp = ts.strip() # string
|
|
self.message = msg # list
|
|
|
|
# format used for sorting
|
|
def get_epoch_time(self):
|
|
t = dateutil.parser.parse(self.timestamp)
|
|
return int(t.timestamp())
|
|
|
|
# format used for display
|
|
def get_short_time(self):
|
|
t = dateutil.parser.parse(self.timestamp)
|
|
return t.strftime("%y %b %d")
|
|
|
|
def parse_txt(filename):
|
|
content = []
|
|
with open(filename, 'r') as f:
|
|
content = f.readlines()
|
|
posts = [] # list of posts - same order as file
|
|
message = [] # list of lines
|
|
# {-1 = init;; 0 = timestamp is next, 1 = message is next}
|
|
state = -1
|
|
timestamp = ""
|
|
for line in content:
|
|
if state == -1:
|
|
state = 0
|
|
continue
|
|
elif state == 0:
|
|
timestamp = line
|
|
state = 1
|
|
elif state == 1:
|
|
if len(line) > 1:
|
|
message.append(line)
|
|
else:
|
|
p = Post(timestamp, message)
|
|
posts.append(p)
|
|
# reset
|
|
message = []
|
|
state = 0
|
|
return posts
|
|
|
|
def get_posts(posts, config):
|
|
taginfos = []
|
|
tagcloud = dict() # (tag, count)
|
|
tagged = dict() # (tag, index of message)
|
|
total = len(posts)
|
|
count = total
|
|
index = count # - 1
|
|
timeline = []
|
|
btns = None
|
|
for post in posts:
|
|
markedup, tags = markup(post.message, config)
|
|
count -= 1
|
|
index -= 1
|
|
timeline.append(
|
|
make_post(count, post.get_short_time(), config, markedup)
|
|
)
|
|
for tag in tags:
|
|
if tagcloud.get(tag) == None:
|
|
tagcloud[tag] = 0
|
|
tagcloud[tag] += 1
|
|
if tagged.get(tag) == None:
|
|
tagged[tag] = []
|
|
tagged[tag].append(index)
|
|
return timeline, tagcloud, tagged
|
|
|
|
def make_tagcloud(d, rell):
|
|
sorted_d = {k: v for k,
|
|
v in sorted(d.items(),
|
|
key=lambda item: -item[1])}
|
|
output = []
|
|
fmt = "<span class=\"hashtag\"><a href=\"%s\">%s(%i)</a></span>"
|
|
#fmt = "<span class=\"hashtag\">%s(%i)</span>"
|
|
for key in d.keys():
|
|
link = rell % key[1:]
|
|
output.append(fmt % (link, key, d[key]))
|
|
return output
|
|
|
|
class Paginator:
|
|
def __init__(self, post_count, ppp, loc=None):
|
|
if post_count <= 0:
|
|
raise Exception
|
|
if not loc:
|
|
loc = "pages"
|
|
if loc and not os.path.exists(loc):
|
|
os.mkdir(loc)
|
|
self.TOTAL_POSTS = post_count
|
|
self.PPP = ppp
|
|
self.TOTAL_PAGES = int(post_count/self.PPP)
|
|
self.SUBDIR = loc
|
|
self.FILENAME = "%i.html"
|
|
self.written = []
|
|
|
|
def toc(self, current_page=None, path=None): #style 1
|
|
if self.TOTAL_PAGES < 1:
|
|
return "[no pages]"
|
|
if path == None:
|
|
path = self.SUBDIR
|
|
# For page 'n' do not create an anchor tag
|
|
fmt = "<a href=\"%s\">[%i]</a>" #(filename, page number)
|
|
anchors = []
|
|
for i in reversed(range(self.TOTAL_PAGES)):
|
|
if i != current_page:
|
|
x = path + "/" + (self.FILENAME % i)
|
|
anchors.append(fmt % (x, i))
|
|
else:
|
|
anchors.append("<b>[%i]</b>" % i)
|
|
return "\n".join(anchors)
|
|
|
|
# makes one page
|
|
def singlepage(self, template, tagcloud, timeline_, i=None, p=None):
|
|
tc = "\n".join(tagcloud)
|
|
tl = "\n\n".join(timeline_)
|
|
toc = self.toc(i, p)
|
|
return template.format(
|
|
postcount=self.TOTAL_POSTS, tags=tc, pages=toc, timeline=tl
|
|
)
|
|
|
|
def paginate(self, template, tagcloud, timeline, is_tagline=False):
|
|
outfile = "%s/%s" % (self.SUBDIR, self.FILENAME)
|
|
timeline.reverse() # reorder from oldest to newest
|
|
start = 0
|
|
for i in range(start, self.TOTAL_PAGES):
|
|
fn = outfile % i
|
|
with open(fn, 'w') as f:
|
|
self.written.append(fn)
|
|
prev = self.PPP * i
|
|
curr = self.PPP * (i+1)
|
|
sliced = timeline[prev:curr]
|
|
sliced.reverse()
|
|
f.write(self.singlepage(template, tagcloud, sliced, i, "."))
|
|
return
|
|
|
|
import argparse
|
|
if __name__ == "__main__":
|
|
def sort(filename):
|
|
def export(new_content, new_filename):
|
|
with open(new_filename, 'w') as f:
|
|
print(file=f)
|
|
for post in new_content:
|
|
print(post.timestamp, file=f)
|
|
print("".join(post.message), file=f)
|
|
return
|
|
posts = parse_txt(filename)
|
|
posts.sort(key=lambda e: e.get_epoch_time())
|
|
outfile = ("%s.sorted" % filename)
|
|
print("Sorted text written to ", outfile)
|
|
export(reversed(posts), outfile)
|
|
|
|
def get_args():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("template", help="an html template file")
|
|
p.add_argument("content", help="text file for microblog content")
|
|
p.add_argument("--sort", \
|
|
help="sorts content from oldest to newest"
|
|
" (this is a separate operation from page generation)", \
|
|
action="store_true")
|
|
args = p.parse_args()
|
|
if args.sort:
|
|
sort(args.content)
|
|
exit()
|
|
return args.template, args.content
|
|
|
|
# assume relative path
|
|
def demote_css(template, css_list, level=1):
|
|
prepend = ""
|
|
if level == 1:
|
|
prepend = '.'
|
|
else:
|
|
for i in range(level):
|
|
prepend = ("../%s" % prepend)
|
|
tpl = template
|
|
for css in css_list:
|
|
tpl = tpl.replace(css, ("%s%s" % (prepend, css) ))
|
|
return tpl
|
|
|
|
# needs review / clean-up
|
|
# ideally relate 'lvl' with sub dir instead of hardcoding
|
|
def writepage(template, timeline, tagcloud, config, subdir = None):
|
|
html = ""
|
|
with open(template,'r') as f:
|
|
html = f.read()
|
|
try:
|
|
count = len(timeline)
|
|
p = config["postsperpage"]
|
|
pagectrl = Paginator(count, p, subdir)
|
|
except ZeroDivisionError as e:
|
|
print("error: ",e, ". check 'postsperpage' in config", file=sys.stderr)
|
|
exit()
|
|
except Exception as e:
|
|
print("error: ",e, ("(number of posts = %i)" % count), file=sys.stderr)
|
|
exit()
|
|
latest = timeline if count <= pagectrl.PPP else timeline[:pagectrl.PPP]
|
|
if subdir == None: # if top level page
|
|
lvl = 1
|
|
tcloud = make_tagcloud(tagcloud, "./tags/%s/latest.html")
|
|
print(pagectrl.singlepage(html, tcloud, latest))
|
|
tcloud = make_tagcloud(tagcloud, "../tags/%s/latest.html")
|
|
pagectrl.paginate(
|
|
demote_css(html, config["relative_css"], lvl),
|
|
tcloud, timeline
|
|
)
|
|
elif subdir == "placeholder":
|
|
lvl = 1
|
|
tcloud = make_tagcloud(tagcloud, "./tags/%s/latest.html")
|
|
with open ("webring.html", 'w') as f:
|
|
print(pagectrl.singlepage(html, tcloud, timeline),file=f)
|
|
else: # if timelines per tag
|
|
is_tagline = True
|
|
lvl = 2
|
|
newhtml = demote_css(html, config["relative_css"], lvl)
|
|
tcloud = make_tagcloud(tagcloud, "../%s/latest.html")
|
|
fn = "%s/latest.html" % subdir
|
|
with open(fn, 'w') as f:
|
|
pagectrl.written.append(fn)
|
|
f.write(
|
|
pagectrl.singlepage(newhtml, tcloud, latest, p=".")
|
|
)
|
|
pagectrl.paginate(newhtml, tcloud, timeline, is_tagline)
|
|
return pagectrl.written
|
|
|
|
import toml
|
|
def load_settings():
|
|
s = dict()
|
|
filename = "settings.toml"
|
|
if os.path.exists(filename):
|
|
with open(filename, 'r') as f:
|
|
s = toml.loads(f.read())
|
|
else:
|
|
s = None
|
|
return s
|
|
|
|
import json
|
|
# generate json basd on profile
|
|
def export_profile(post_count, last_update, config):
|
|
if "profile" not in config:
|
|
return
|
|
if "username" not in config["profile"] \
|
|
or "url" not in config["profile"]:
|
|
print("Warning: no profile exported", file=sys.stderr)
|
|
return
|
|
profile = {
|
|
"username" : config["profile"]["username"],
|
|
"url": config["profile"]["url"],
|
|
"last-updated": last_update,
|
|
"short-bio" : config["profile"]["short_bio"],
|
|
"post-count" : post_count
|
|
}
|
|
if "avatar" in config["profile"]:
|
|
profile["avatar"] = config["profile"]["avatar"]
|
|
with open(config["file_output"], 'w') as f:
|
|
print(json.dumps(profile), file=f)
|
|
|
|
import urllib.request as http
|
|
def fn1(webring_config): # come up with better name later
|
|
def fetch(follow_list):
|
|
other_people = []
|
|
for someone in follow_list:
|
|
try:
|
|
with http.urlopen(someone) as response:
|
|
data = response.read()
|
|
other_people.append(json.loads(data))
|
|
except http.HTTPError as e:
|
|
print(e,": ", someone, file=sys.stderr)
|
|
pass
|
|
return other_people
|
|
# list_of_json_objs = []
|
|
# with open("meta.json",'r') as f:
|
|
# list_of_json_objs.append(json.loads(f.read()))
|
|
list_of_json_objs = fetch(webring_config["list"])
|
|
list_of_json_objs.sort(
|
|
key=lambda e: e["last-updated"], reverse=True)
|
|
if list_of_json_objs == []:
|
|
print("no remote profiles loaded", file=sys.stderr)
|
|
return []
|
|
rendered = []
|
|
template = webring_config["format"]
|
|
SHORT_BIO_LIMIT = 150
|
|
for profile in list_of_json_objs:
|
|
epoch_timestamp = profile["last-updated"]
|
|
if not isinstance(epoch_timestamp, int):
|
|
epoch_timestamp = 0
|
|
|
|
post_count = profile["post-count"]
|
|
if not isinstance(post_count, int):
|
|
post_count = 0
|
|
|
|
self_desc = profile["short-bio"]
|
|
if len(profile["short-bio"]) >= SHORT_BIO_LIMIT:
|
|
self_desc = profile["short-bio"][:SHORT_BIO_LIMIT] + "..."
|
|
|
|
rendered.append(template.format(
|
|
__avatar__=escape(profile["avatar"]),
|
|
__handle__=escape(profile["username"]),
|
|
__url__=escape(profile["url"]),
|
|
__post_count__ = post_count,
|
|
__shortbio__= escape(self_desc),
|
|
__lastupdated__= strftime(
|
|
"%Y %b %d", localtime(epoch_timestamp))
|
|
))
|
|
# print(html, file=sys.stderr)
|
|
return rendered
|
|
|
|
def main():
|
|
tpl, content = get_args()
|
|
cfg = load_settings()
|
|
if cfg == None:
|
|
print("exit: no settings.toml found.", file=sys.stderr)
|
|
return
|
|
if "post" not in cfg:
|
|
print("exit: table 'post' absent in settings.toml", file=sys.stderr)
|
|
return
|
|
if "page" not in cfg:
|
|
print("exit: table 'page' absent in settings.toml", file=sys.stderr)
|
|
return
|
|
p = parse_txt(content)
|
|
tl, tc, tg = get_posts(p, cfg["post"])
|
|
if tl == []:
|
|
return
|
|
# main timeline
|
|
updated = []
|
|
updated += writepage(tpl, tl, tc, cfg["page"])
|
|
# timeline per tag
|
|
if tc != dict() and tg != dict():
|
|
if not os.path.exists("tags"):
|
|
os.mkdir("tags")
|
|
for key in tg.keys():
|
|
tagline = []
|
|
for index in tg[key]:
|
|
tagline.append(tl[index])
|
|
# [1:] means to omit hashtag from dir name
|
|
updated += writepage(
|
|
tpl, tagline, tc, cfg["page"], \
|
|
subdir="tags/%s" % key[1:] \
|
|
)
|
|
if "webring" in cfg:
|
|
if cfg["webring"]["enabled"] == True:
|
|
export_profile(
|
|
len(p), p[0].get_epoch_time(), cfg["webring"] )
|
|
if "following" in cfg["webring"]:
|
|
fellows = fn1(cfg["webring"]["following"] )
|
|
if fellows != []:
|
|
updated += writepage(
|
|
tpl, fellows, tc, cfg["page"], subdir="placeholder" )
|
|
with open("updatedfiles.txt", 'w') as f:
|
|
for filename in updated:
|
|
print(filename, file=f) # sys.stderr)
|
|
if "latestpage" in cfg:
|
|
print(cfg["latestpage"], file=f)
|
|
if "latestpages" in cfg:
|
|
for page in cfg["latestpages"]:
|
|
print(page, file=f)
|
|
try:
|
|
main()
|
|
except KeyError as e:
|
|
traceback.print_exc()
|
|
print("\n\tA key may be missing from your settings file.", file=sys.stderr)
|
|
except dateutil.parser._parser.ParserError:
|
|
traceback.print_exc()
|
|
print("\n\tFailed to interpret a date from string..",
|
|
"\n\tYour file of posts may be malformed.",
|
|
"\n\tCheck if your file starts with a line break.", file=sys.stderr)
|
|
except toml.decoder.TomlDecodeError:
|
|
traceback.print_exc()
|
|
print("\n\tYour configuration file is malformed.")
|