import sys import socket import re import logging import pytermgui as ptg from typing import Tuple PORT = 1900 logger = logging.getLogger('nex') logger.setLevel(logging.INFO) logger.addHandler(logging.FileHandler('nex.log')) def parse_url(url: str) -> Tuple[str, str]: clean_url = url.replace('nex://', '') split_url = clean_url.split('/', 1) if len(split_url) == 1: return clean_url, '' return split_url[0], split_url[1] class Page: def __init__(self, url): self.url = url self.host, self.selector = parse_url(url) self.raw_content = '' self.content = self.raw_content self.links = ['' for i in range(10)] def request(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: sock.connect((self.host, PORT)) except ConnectionError as e: logger.error(e) self.content = f'[error]Could not connect to {self.host}' return selector = self.selector + '\r\n' sock.sendall(selector.encode('ascii')) buf = bytearray() while True: data = sock.recv(4096) if len(data) == 0: break buf.extend(data) try: self.raw_content = buf.decode('ascii') except UnicodeDecodeError as e: logger.error(e) self.content = '[warning]Content could not be decoded' return self.content = self.raw_content self._parse_links() for i, l in enumerate(self.links): self.content = re.sub(l, f'[bold primary]\[{i}][/] {l}', self.content, 1) @property def title(self): prefix = 'nex://' if self.url.startswith(prefix): return self.url[len(prefix):] return self.url def _parse_links(self): pattern = re.compile(r'=>\s*([\w/:\.~-]*)\s*') self.links = pattern.findall(self.raw_content) def create_landing_page() -> Page: landing_page = Page('about:home') landing_page.raw_content = 'Welcome to PyNex' landing_page.content = landing_page.raw_content return landing_page class Session: def __init__(self): home_page = create_landing_page() self.pages = {home_page.url: home_page} self.history = [home_page.url] self.curr_index = 0 @property def current_page(self): return self.pages[self.history[self.curr_index]] def prev(self): if self.curr_index > 0: self.curr_index -= 1 return self.current_page def next(self): if self.curr_index < len(self.history) - 1: self.curr_index += 1 return self.current_page def new_page(self, url): self.curr_index += 1 if self.curr_index >= len(self.history): self.history.append(url) else: self.history[self.curr_index] = url page = Page(url) page.request() self.pages[url] = page return self.current_page class Browser(ptg.WindowManager): def __init__(self, session=Session(), *args): super().__init__() self.layout = ptg.Layout() self.session = session self.page_container = ptg.Container(self.session.current_page.content) self.body = ptg.Window(self.page_container, overflow=ptg.Overflow.SCROLL, vertical_align=ptg.VerticalAlignment.TOP) self.footer = ptg.Window( ptg.Splitter( ["(B)ack", self.back], ["(F)orward", self.forward], ["(R)eload", self.reload], ["(Q)uit", lambda *_: self.stop()] ), box="EMPTY", is_persistant=True ) self._create_layout() self._create_key_bindings() self.update() def _create_key_bindings(self): self.bind(ptg.keys.DOWN, lambda *_: self.body.scroll(5)) self.bind(ptg.keys.UP, lambda *_: self.body.scroll(-5)) self.bind('j', lambda *_: self.body.scroll(5)) self.bind('k', lambda *_: self.body.scroll(-5)) self.bind('b', self.back) self.bind('f', self.forward) self.bind('r', self.reload) self.bind('q', lambda *_: self.stop()) for i in range(10): self.bind(f'{i}', lambda widget, key: self.follow_link(key)) def _create_layout(self): self.layout.add_slot("Body") self.layout.add_break() self.layout.add_slot("Footer", height=1) self.add(self.body) self.add(self.footer) def update(self): page = self.session.current_page self.page_container.set_widgets([page.content]) self.body.set_title(f'[bold tertiary]{page.title}', 1) def back(self, *args): self.session.prev() self.update() def forward(self, *args): self.session.next() self.update() def reload(self, *args): self.session.current_page.request() self.update() def go(self, url): self.session.new_page(url) self.update() def follow_link(self, key): index = int(key) url = self.session.current_page.links[index] if url.startswith('nex://'): self.session.new_page(url) else: url = '/'.join([self.session.current_page.url.rstrip('/'), url]) self.session.new_page(url) self.update() def main(url=None): with Browser() as browser: if url is not None: browser.go(url) if __name__ == '__main__': if len(sys.argv) > 1: url = sys.argv[1] main(url)