#!/usr/bin/env python # creator: Silas Jelley # created: 2020-08-11 09:52:32 # updated: 2024-09-22 15:38:57 # version: 3.0 # /// script # requires-python = ">=3.12" # dependencies = [ # "Pillow ", # "filelock", # "jinja2", # "urllib3", # "pygments", # "pillow_heif", # "pillow_avif-plugin", # ] # /// # Imports from collections import Counter from dataclasses import dataclass, field, asdict from hashlib import md5 from pathlib import Path from shutil import copyfile from subprocess import run from typing import List, Dict, Any, Tuple, Set from typing_extensions import TypedDict import asyncio import datetime import logging import multiprocessing import os import random import re import tomllib from PIL import Image, ImageOps from filelock import FileLock from jinja2 import Environment, FileSystemLoader from pillow_heif import register_heif_opener import pillow_avif from pygments import highlight from pygments.formatters import HtmlFormatter from pygments.lexer import RegexLexer, bygroups from pygments.lexers import get_lexer_by_name, guess_lexer from pygments.token import * from pygments.util import ClassNotFound from urllib.parse import urlparse register_heif_opener() # Load configuration with open("config.toml", "rb") as config_file: config = tomllib.load(config_file) # Constants ASSET_DIR = Path(config["paths"]["asset_dir"]) TEMPLATE_DIR = Path(config["paths"]["template_dir"]) STYLESHEET = Path(config["paths"]["stylesheet"]) OUTPUT_DIR = Path(config["paths"]["output_dir"]) NOTES_DIR = Path(config["paths"]["notes_dir"]) TEMPLATE_FEED = config["templates"]["feed"] TEMPLATE_FEED_XSL = config["templates"]["feed_xsl"] TEMPLATE_SITEMAP = config["templates"]["sitemap"] TEMPLATE_DEFAULT = config["templates"]["default"] INIT_DIR = os.getcwd() # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Dataclasses @dataclass class SiteMetadata: name: str created: str url: str baseurl: str uid: str description: str creator: Dict[str, str] backlinks: int = 0 words: Dict[str, Any] = field( default_factory=lambda: { "self": 0, "drafts": 0, "code": { "lines": 0, "words": 0, }, "references": 0, } ) links: Dict[str, Any] = field( default_factory=lambda: { "internal": list(), "backlinks": list(), "external": set(), } ) pagecount: int = 0 references: int = 0 categories: Set = field(default_factory=set) secondaries: Set = field(default_factory=set) tags: Set = field(default_factory=set) data: Dict[str, Any] = field(default_factory=dict) stylesheet_hash: str = "" slug_to_uid_lookup: Dict[str, str] = field(default_factory=dict) slug_to_title_lookup: Dict[str, str] = field(default_factory=dict) class LinksDict(TypedDict): internal: list[str] external: list[str] backlinks: list[str] @dataclass class DocumentMetadata: filepath: Path uid: str slug: str title: str category: str secondary: str available: datetime.datetime created: datetime.datetime updated: datetime.datetime creator: str = "" note: str = "" favourite: bool = False parent: str = "" description: str = "" layout: str = TEMPLATE_DEFAULT source: Dict = field(default_factory=dict) via: Dict = field(default_factory=dict) location: Dict[str, Any] = field( default_factory=lambda: { "continent": "", "country": "", "region": "", "city": "", "note": "", "lat": int, "lng": int, } ) collection: Dict[str, Any] = field( default_factory=lambda: { "style": "title", "order": "chronological", "include": [], } ) attribution: Dict[str, str] = field( default_factory=lambda: { "plain": "", "djot": "", "html": "", } ) media: str = "application/toml" words: Dict[str, Any] = field( default_factory=lambda: { "self": 0, "code": { "lines": 0, "words": 0, }, "references": 0, } ) status: str = "" links: LinksDict = field( default_factory=lambda: { "internal": list(), "external": list(), "backlinks": list(), } ) options: List[str] = field(default_factory=list) tags: List[str] = field(default_factory=list) styles: str = "" content: Dict[str, str] = field(default_factory=dict) def __post_init__(self): # Validate links dictionary structure required_link_types = {"internal", "external", "backlinks"} if ( not isinstance(self.links, dict) or set(self.links.keys()) != required_link_types ): raise ValueError( f"links must be a dictionary with exactly these keys: {required_link_types}" ) for key in self.links: if not isinstance(self.links[key], set): self.links[key] = set(self.links[key]) @dataclass class AssetMetadata: filepath: Path media: str uid: str slug: str title: str available: datetime.datetime available: datetime.datetime created: datetime.datetime updated: datetime.datetime creator: str = "" note: str = "" favourite: bool = False source: Dict = field(default_factory=dict) via: Dict = field(default_factory=dict) hash: str = "" output_width: int = 0 output_height: int = 0 location: Dict[str, Any] = field( default_factory=lambda: { "continent": "", "country": "", "region": "", "city": "", "note": "", "lat": int, "lng": int, } ) attribution: Dict[str, str] = field( default_factory=lambda: { "plain": "", "djot": "", "html": "", } ) words: Dict[str, Any] = field( default_factory=lambda: { "self": 0, "code": { "lines": 0, "words": 0, }, "references": 0, } ) links: LinksDict = field( default_factory=lambda: { "internal": list(), "external": list(), "backlinks": list(), } ) tags: List[str] = field(default_factory=list) content: Dict[str, str] = field(default_factory=dict) def __post_init__(self): # Validate links dictionary structure required_link_types = {"internal", "external", "backlinks"} if ( not isinstance(self.links, dict) or set(self.links.keys()) != required_link_types ): raise ValueError( f"links must be a dictionary with exactly these keys: {required_link_types}" ) for key in self.links: if not isinstance(self.links[key], set): self.links[key] = set(self.links[key]) def init_site(): site_config = config["site"] return SiteMetadata( name=site_config["name"], created=site_config["created"], url=site_config["url"], baseurl=site_config["baseurl"], uid=site_config["uid"], description=site_config["description"], creator=site_config["creator"], stylesheet_hash=md5(STYLESHEET.read_bytes()).hexdigest(), ) def preprocess_asset_metadata( uid: str, asset_data: Dict[str, Any], manifest_path: Path ) -> Dict[str, Any]: """Preprocess asset metadata to ensure it meets AssetMetadata requirements.""" processed = asset_data.copy() # Handle dates for date_field in ["created", "updated", "available"]: if isinstance(processed.get(date_field), str): processed[date_field] = _parse_date(processed[date_field]) elif isinstance(processed.get(date_field), datetime.datetime): processed[date_field] = processed[date_field].replace(tzinfo=None) else: processed[date_field] = datetime.datetime.now() # Set required fields with defaults if not present processed.setdefault("uid", uid) return processed def load_assets() -> Dict[str, AssetMetadata]: """Load asset manifests and convert them to AssetMetadata instances.""" assets = {} asset_manifests = list(ASSET_DIR.glob("manifests/*.toml")) for manifest in asset_manifests: with open(manifest, "rb") as f: manifest_data = tomllib.load(f) for uid, asset_data in manifest_data.items(): try: processed_data = preprocess_asset_metadata(uid, asset_data, manifest) processed_data["filepath"] = ASSET_DIR / processed_data["filepath"] assets[uid] = AssetMetadata(**processed_data) except Exception as e: logger.error( f"Error processing asset {uid}\n{' ' * 26}{manifest}\n{' ' * 26}{str(e)}" ) continue return assets def setup_jinja_environment(): file_loader = FileSystemLoader(TEMPLATE_DIR) env = Environment(loader=file_loader) # Add custom filters env.filters["shuffle"] = lambda seq: random.sample(seq, len(seq)) env.filters["time_local"] = lambda value, format="%-I:%M%p": value.strftime( format ).lower() env.filters["year"] = lambda value, format="%Y": value.strftime(format) env.filters["month"] = lambda value, format="%m": value.strftime(format) env.filters["day"] = lambda value, format="%d": value.strftime(format) env.filters["year_month"] = lambda value, format="%Y/%m": value.strftime(format) env.filters["year_month_day"] = lambda value, format="%Y/%m/%d": value.strftime( format ) env.filters["date_long_short_month"] = ( lambda value, format="%b %e, %Y": value.strftime(format) ) env.filters["datetime_w3c"] = ( lambda value, format="%Y-%m-%dT%H:%M:%S": value.strftime(format) ) env.filters["date_long_full_month"] = ( lambda value, format="%B %e, %Y": value.strftime(format) ) env.filters["timedate_long"] = ( lambda value, format="%-I:%M%p %B %e, %Y": value.strftime(format) ) env.filters["highlight_code"] = highlight_code return env def get_files() -> List[Path]: return [f for f in NOTES_DIR.glob("**/*.md") if "available = " in f.read_text()] def extract_external_links(text: str, site) -> List: url_pattern = r"(https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+(?:/[^)\s]*)?)" matches = re.findall(url_pattern, text) # Convert to set immediately external_links = set() for url in matches: parsed_url = urlparse(url) if parsed_url.netloc.lower() != "silasjelley.com": external_links.add(url) site.links["external"].add(url) return sorted(external_links) async def process_document( filepath: Path, site: SiteMetadata ) -> Tuple[str, DocumentMetadata]: """Process a document file and return its UID and metadata.""" with open(filepath, "rb") as f: try: parsed_toml = tomllib.load(f) except: logger.error(f"Error while processing document: {filepath}") import sys sys.exit(1) # The UID is now the top-level table name uid = parsed_toml["uid"] # Process metadata into DocumentMetadata instance document = preprocess_metadata(filepath, parsed_toml) # Extract external links from the plain text content try: plain_text = ( document.content.get("plain", "") + " " + document.source.get("url", "") + " " + document.via.get("url", "") ) external_links = extract_external_links(plain_text, site) document.links["external"] = external_links except KeyError: logger.warn(f"KeyError while compiling external links from {document.filepath}") pass return uid, document async def ingest_documents(site: SiteMetadata) -> Dict[str, Any]: logger.info("Ingesting files") file_list = get_files() documents = {} slug_to_title_lookup = {} slug_to_uid_lookup = {} uuid_collision_lookup = [] tasks = [process_document(filepath, site) for filepath in file_list] results = await asyncio.gather(*tasks) for uid, doc in results: documents[uid] = doc slug_to_title_lookup[doc.slug] = doc.title slug_to_uid_lookup[doc.slug] = uid site.categories.add(doc.category) site.secondaries.add(doc.secondary) site.tags.update(doc.tags) uuid_collision_lookup.append(uid) site.slug_to_uid_lookup = slug_to_uid_lookup site.slug_to_title_lookup = slug_to_title_lookup check_uuid_collisions(uuid_collision_lookup) site.pagecount = len(documents) logger.info(f"Ingested {site.pagecount} files") return documents def process_image_parallel(input_data: Tuple[Path, Path, int, AssetMetadata]) -> None: workaround_import = pillow_avif.AvifImagePlugin input_image, output_path, output_width, asset_metadata = input_data lock_path = output_path.with_suffix(".lock") lock = FileLock(str(lock_path)) # Define AVIF output path avif_output_path = output_path.with_suffix(".avif") # Check if AVIF support is available avif_available = "AVIF" in Image.SAVE if output_path.exists() and avif_output_path.exists(): return try: with lock: os.makedirs(output_path.parent, exist_ok=True) with Image.open(input_image) as im: original_format = im.format im = ImageOps.exif_transpose(im) output_height = int(im.size[1] * (output_width / im.size[0])) asset_metadata.output_width = output_width asset_metadata.output_height = output_height logger.debug(f"Output width parameter: {output_width}") logger.debug(f"Image size before resize calculation: {im.size}") output_height = int(im.size[1] * (output_width / im.size[0])) logger.debug(f"Calculated output height: {output_height}") with im.resize( (output_width, output_height), Image.Resampling.LANCZOS ) as output_image: # Save JPEG version if ( original_format != "JPEG" and str(output_path).endswith("jpg") and output_image.mode in ("RGBA", "P") ): output_image = output_image.convert("RGB") output_image.save(output_path, quality=85, optimize=True) # Save AVIF version only if support is available if avif_available: try: if output_image.mode in ("RGBA", "P"): avif_image = output_image.convert("RGB") else: avif_image = output_image.copy() avif_image.save( avif_output_path, format="AVIF", quality=60, # Lower quality for better compression, still maintains good visual quality speed=5, # Slowest speed = best compression (0 is slowest, 10 is fastest) bits=10, # Use 10-bit color depth for better quality-to-size ratio compress_level=8, # Highest compression level (range 0-8) color_space="bt709", # Use YUV BT.709 color space chroma=0, # 4:4:4 chroma sampling (0=4:4:4, 1=4:2:0, 2=4:2:2) num_threads=0, # Use all available CPU threads for encoding ) logger.debug( f"Processed image: {input_image} -> {output_path} and {avif_output_path}" ) except Exception as e: logger.error( f"Error saving AVIF version of {input_image}: {e}" ) else: logger.error( "AVIF support not available. Skipping AVIF conversion." ) logger.debug(f"Processed image: {input_image} -> {output_path}") except OSError as e: logger.error(f"OS error processing {input_image}: {e}") except Exception as e: logger.error(f"Error processing {input_image}: {e}") finally: if lock_path.exists(): try: lock_path.unlink() except OSError: pass def process_assets( assets: Dict[str, AssetMetadata], asset_dir: Path, output_dir: Path ) -> None: logger.info("Processing assets") manifest_images = [] for asset_identifier, asset_metadata in assets.items(): source_path = Path(asset_metadata.filepath) output_path = output_dir / asset_metadata.slug os.makedirs(output_path.parent, exist_ok=True) if not source_path.exists(): raise FileNotFoundError( f"Missing asset: {asset_identifier} at {source_path}" ) if source_path.suffix == ".gpx": with open(source_path, "rb") as file_to_hash: asset_metadata.hash = md5(file_to_hash.read()).hexdigest() copyfile(source_path, output_path) elif output_path.exists(): continue elif source_path.suffix in (".jpg", ".png", ".heic", ".webp"): width = 3000 if "PANO" in str(output_path) else 1600 manifest_images.append((source_path, output_path, width, asset_metadata)) else: copyfile(source_path, output_path) for asset in list(asset_dir.glob("*")): if asset.is_file(): output_path = output_dir / asset.relative_to(asset_dir) os.makedirs(output_path.parent, exist_ok=True) copyfile(asset, output_path) with multiprocessing.Pool() as pool: pool.map(process_image_parallel, manifest_images) logger.info("Finished processing assets") def _parse_date(date_str: str) -> datetime.datetime: """Parses a date string into a datetime object, handling both date and datetime inputs.""" try: return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S%z").replace( tzinfo=None ) except ValueError: return datetime.datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=None) def preprocess_metadata(filepath: Path, metadata: Dict[str, Any]) -> DocumentMetadata: """Preprocesses metadata for a document and converts it to a DocumentMetadata instance.""" # Create a working copy to avoid modifying the input processed = metadata.copy() # Parse date fields for date_field in ["available", "created", "updated"]: if isinstance(processed.get(date_field), str): processed[date_field] = _parse_date(processed[date_field]) elif isinstance(processed.get(date_field), datetime.datetime): processed[date_field] = processed[date_field].replace(tzinfo=None) # Set default updated time if not provided processed.setdefault("updated", processed.get("available")) # Process source information if present if "source" in processed: processed["attribution"] = process_source_information( processed["source"], processed.get("via", {}) ) else: processed["attribution"] = {} processed["source"] = {} if "via" not in processed: processed["via"] = {} # Handle draft status if processed.get("status") == "draft": processed["slug"] = f"drafts/{processed['uid']}" # Add filepath as it's required but comes from function parameter processed["filepath"] = filepath # Determine title processed["title"] = ( processed.get("title") or processed.get("attribution", {}).get("plain") or processed["available"].strftime("%B %e, %Y %-I.%M%p") ) # Create and return DocumentMetadata instance return DocumentMetadata(**processed) def process_source_information(source: Dict[str, Any], via) -> Dict[str, str]: creator = source.get("creator") or source.get("director") title = source.get("title") or ( " ♫ " + str(source.get("track")) if source.get("track") else source.get("description") ) date = source.get("published") or source.get("year") or source.get("created") volume = source.get("volume") chapter = source.get("chapter") pages = source.get("pages") url = source.get("url", "") speaker = source.get("speaker") or source.get("character") edition = source.get("edition") publisher = source.get("publisher") partsplain = [] partsdjot = [] partshtml = [] partsshared = [] partsvia = "" if speaker: speaker = f"{speaker} in " else: speaker = "" if edition: edition = f"{edition} edition" else: edition = "" if creator: if title: partsplain.append(f"{creator}, {title}") partsdjot.append(f"{creator}, {{_{title}_}}") if url: partshtml.append(f"{creator}, [{{_{title}_}}]({escape_url(url)})") else: partshtml.append(f"{creator}, {{_{title}_}}") else: partsplain.append(creator) partsdjot.append(creator) if url: partshtml.append(f"[{creator}]({escape_url(url)})") else: partshtml.append(f"{creator}") elif title: partsplain.append(title) partsdjot.append(f"{{_{title}_}}") if url: partshtml.append(f"[{{_{title}_}}]({escape_url(url)})") else: partshtml.append(f"{{_{title}_}}") else: logger.error(f"No creator or title {source}") if "album" in source: partsshared.append(source["album"]) if "show" in source: partsshared.append(source["show"]) if "season" in source: partsshared.append(f"season {source['season']}") if "episode" in source: partsshared.append(f"episode {source['episode']}") if "publication" in source: partsshared.append(source["publication"]) if publisher and edition: partsshared.append(f"{publisher} ({edition})") elif publisher: partsshared.append(publisher) elif edition: partsshared.append(edition) if chapter: partsshared.append(f"Ch. {chapter}") if volume: partsshared.append(f"Vol. {volume}") if pages: partsshared.append(f"p. {pages}") if date: partsshared.append(str(date.year if isinstance(date, datetime.date) else date)) if via: via_url = via.get("url", "") if not via_url == "": partsvia = f" ([via]({escape_url(via['url'])}))" return { "plain": f"{speaker}{', '.join(partsplain + partsshared)}", "djot": f"{speaker}{', '.join(partsdjot + partsshared)}", "html": format_rich_attribution( " — " + f"{speaker}{', '.join(partshtml + partsshared) + partsvia}" ), } def escape_url(url: str) -> str: return url.replace(")", "%29") def format_rich_attribution(attribution: str) -> str: return run( "jotdown", input=attribution, text=True, capture_output=True ).stdout.strip() def check_uuid_collisions(uuid_list): prefixes = [uuid[:8] for uuid in uuid_list] if len(set(prefixes)) != len(prefixes): collisions = [ prefix for prefix, count in Counter(prefixes).items() if count > 1 ] raise ValueError( f"CRITICAL ERROR: UUID prefix collision for: {', '.join(collisions)}" ) def insert_substitutions( documents: Dict[str, DocumentMetadata], assets: Dict[str, AssetMetadata], site: SiteMetadata, ) -> None: logger.info("Performing substitutions") REF_LINK_RE = re.compile(r"!?\[([^\]]*?)\](\((.*?::)([^)]+)\))") REF_SLUG_RE = re.compile(r"(? str: for match in regex.finditer(text): ref_type, ref_short_id = match.groups() full_match = match.group(0) ref_id = next((k for k in merged_data if k.startswith(ref_short_id)), None) if ref_id: try: replacement = f"/{merged_data[ref_id].slug}" except AttributeError: replacement = f"/{merged_data[ref_id].slug}" text = text.replace(full_match, replacement) return text def replace_title_references( text: str, regex: re.Pattern, merged_data: Dict[str, DocumentMetadata] ) -> str: for match in regex.finditer(text): opening, ref_type, ref_short_id, comment, closing = match.groups() full_match = match.group(0) ref_id = next((k for k in merged_data if k.startswith(ref_short_id)), None) if ref_id: replacement = merged_data[ref_id].title text = text.replace(full_match, replacement) return text def replace_cite_references( text: str, regex: re.Pattern, merged_data: Dict[str, DocumentMetadata] ) -> str: for match in regex.finditer(text): opening, ref_type, ref_short_id, comment, closing = match.groups() full_match = match.group(0) ref_id = next((k for k in merged_data if k.startswith(ref_short_id)), None) if ref_id: replacement = f"[{merged_data[ref_id].attribution['djot']}](/{merged_data[ref_id].slug})" text = text.replace(full_match, replacement) return text def replace_import_references( text: str, regex: re.Pattern, merged_data: Dict[str, DocumentMetadata], key: str, page: DocumentMetadata, ) -> str: for match in regex.finditer(text): opening, ref_type, ref_short_id, comment, closing = match.groups() full_match = match.group(0) ref_id = next((k for k in merged_data if k.startswith(ref_short_id)), None) if ref_id: ref_text = merged_data[ref_id].content["plain"] if ref_type == "import::": replacement = ref_text elif ref_type == "aside::": ref_title = merged_data[ref_id].title ref_slug = merged_data[ref_id].slug ref_location = merged_data[ref_id].location location_string = ( " ⚕ " + ref_location["city"] + ", " + ref_location["country"] or "" ) replacement = ( f"{{.aside}}\n{':' * 78}\n" f"{ref_text}\n" f'``{{=html}}\n" f"{':' * 78}" ) else: raise ValueError(f"Unrecognised reference type: {ref_type}") if not page.status == "draft": merged_data[ref_id].links["backlinks"].add(key) text = text.replace(full_match, replacement) return text def process_reference_links( text: str, regex: re.Pattern, merged_data: Dict[str, Any], key: str ) -> str: for ref_text_match, _, ref_type, ref_short_id in regex.findall(text): match = f"[{ref_text_match}]({ref_type}{ref_short_id})" ref_id = next( (k for k in merged_data.keys() if k.startswith(ref_short_id)), None ) if ref_id is None: logger.error(f"No match found for {ref_short_id}") if not ref_id: raise ValueError( f"Unmatched UUID reference in document {key}: {ref_short_id}" ) if ref_type not in ["link::", "img::", "video::", "quote::"]: raise ValueError( f"Unexpected Internal Reference type '{ref_type}' in document {key}: {match}" ) ref_text = get_reference_text(ref_text_match, merged_data[ref_id]) ref_slug = f"/{merged_data[ref_id].slug}" if ref_type == "link::": try: # Double quotes within a page title are escaped so that they don't break the HTML 'title' element ref_title = merged_data[ref_id].title.replace('"', '\\"') if merged_data[ref_id].category != "references": ref_title += f" | {merged_data[ref_id].available.strftime('%B %Y')}" except AttributeError: ref_title = merged_data[ref_id].title.replace('"', '\\"') replacement = f'[{ref_text}]({ref_slug}){{title="{ref_title}"}}' elif ref_type == "img::": match = f"![{ref_text_match}]({ref_type}{ref_short_id})" if ref_slug.endswith("svg"): replacement = f'```=html\n{ref_text}\n```' if ref_slug.endswith("png"): replacement = ( f"```=html\n" f" \n" f' \n' f' {ref_text}\n' f" \n" f"```\n" ) else: if merged_data[ref_id].output_height != 0: dimensions = f'height="{merged_data[ref_id].output_height}" width="{merged_data[ref_id].output_width}"' else: dimensions = "" replacement = ( f"```=html\n" f" \n" f' \n' f' {ref_text}\n' f" \n" f"```\n" ) elif ref_type == "quote::": replacement, replacement_wordcount = create_quote_replacement( merged_data[ref_id], ref_slug ) merged_data[key].words["references"] += replacement_wordcount elif ref_type == "video::": replacement = create_video_replacement(ref_slug) else: continue text = text.replace(match, replacement) return text def get_reference_text(ref_text_match: str, ref_data) -> str: if ref_text_match.startswith("::") or ref_text_match == "": return ref_data.title return ref_text_match def create_quote_replacement( ref_data: DocumentMetadata, ref_slug: str ) -> Tuple[str, int]: ref_src = ref_data.attribution["djot"] try: ref_text = ref_data.source["text"].replace("\n\n", "\n> \n> ").strip() ref_text_len = len(ref_text.split()) replacement = f"> {ref_text}\n> \n> `
`{{=html}}\n> — [{ref_src}]({ref_slug})\n> `
`{{=html}}\n" return replacement, ref_text_len except: logger.error(f"Error creating quote replacement: {ref_data.uid}") import sys sys.exit() def create_video_replacement(ref_slug: str) -> str: return f"""```=html ```""" def generate_html(documents): logger.info("Generating HTML") for key, page in documents.items(): if page.content.get("plain"): page.content["html"] = run_jotdown(page.content["plain"], page) if page.source.get("text"): page.source["html"] = run_jotdown(page.source["text"], page) site.words["total"] = ( site.words["drafts"] + site.words["references"] + site.words["self"] ) class LedgerLexer(RegexLexer): name = "Ledger" aliases = ["ledger"] filenames = ["*.ledger", "*.journal"] tokens = { "root": [ (r";.*$", Comment.Single), ( r"^(\d{4}[-/]\d{2}[-/]\d{2})([ \t]+)([^;\n]*)", bygroups(Name.Tag, Text, Name.Entity), ), ( r"^([ \t]+)([A-Za-z][A-Za-z0-9: ]+)([ \t]+)([<\[]?[-+$€£¥]?[\d.,]+\s*[A-Za-z]*[>\]]?)", bygroups(Text, Name.Variable, Text, Number), ), ( r"^([ \t]+)([A-Za-z][A-Za-z0-9: ]+)[ \t]*$", bygroups(Text, Name.Variable), ), (r"(=[ \t]*[-+$€£¥]?[\d.,]+)", Number.Float), # Fixed the nested brackets (r"^[ \t]+[!@#$%^&*()]{1}.*$", Name.Attribute), (r".+\n", Text), ] } def highlight_code(code: str, language: str) -> str: """ Highlight code using Pygments with specified or guessed language. """ try: if language and language.lower() == "ledger": lexer = LedgerLexer() elif language: lexer = get_lexer_by_name(language.lower()) else: lexer = guess_lexer(code) formatter = HtmlFormatter( style=config["syntax_highlighting"]["style"], linenos="table" if config["syntax_highlighting"].get("line_numbers", False) else False, cssclass="highlight", ) return highlight(code, lexer, formatter) except ClassNotFound: # If language isn't found, return code wrapped in pre tags logger.error(f"Lexer not found for lang: {language}") logger.error(f"{code}\n") return f"
{code}
" def run_jotdown(plaintext: str, page) -> str: """ Modified to handle code blocks with syntax highlighting. Fixed to properly handle both raw HTML and HTML code blocks. """ CODE_BLOCK_RE = re.compile( r"( *)````*(=html|\s*(?:(\w+)\n))?(.*?)( *)````*", re.DOTALL ) code_blocks = [] marker_template = "§CODE_BLOCK_{}§" def save_code_block(match): leading_space = match.group(1) raw_html_marker = match.group(2) language = match.group(3) code = match.group(4).rstrip() trailing_space = match.group(5) code_words = len(code.split()) code_lines = len(code.splitlines()) page.words["code"]["lines"] += code_lines page.words["code"]["words"] += code_words site.words["code"]["lines"] += code_lines site.words["code"]["words"] += code_words # Remove the wordcount of codeblocks from the prose wordcounts page.words["self"] -= code_words site.words["self"] -= code_words # Check if this is a raw HTML block if raw_html_marker == "=html": return f"{leading_space}```=html\n{code}\n{trailing_space}```" # For all other cases, including 'html' language, highlight the code highlighted = highlight_code(code, language) marker = marker_template.format(len(code_blocks)) code_blocks.append(highlighted) return f"{leading_space}```=html\n{marker}\n{trailing_space}```" # First, replace all code blocks with markers processed_text = CODE_BLOCK_RE.sub(save_code_block, plaintext) """ TODO: Exclude codeblocks from wordcounts! """ # Run through jotdown html = run("jotdown", input=processed_text, text=True, capture_output=True).stdout prose_wordcount = len(html.split()) # Replace markers with actual highlighted code for i, code in enumerate(code_blocks): marker = marker_template.format(i) html = html.replace(marker, code) return html def build_backlinks(documents, site): logger.info("Building backlinks") INLINE_LINK_RE = re.compile( r"\[[^\]]*(?:\[[^\]]*\][^\]]*)*\]\(\/([^)#]*)\)", re.DOTALL ) FOOTNOTE_LINK_URL_RE = re.compile(r"\[.+?\]:\s\/(.*)", re.DOTALL) interlink_count = 0 for key, page in documents.items(): if "nobacklinks" in page.options or page.status == "draft": continue logger.debug(page.filepath) text = page.content.get("plain") # Skip if no main content if not text: continue interlinks = set(documents[key].links["internal"]) combined_refs = INLINE_LINK_RE.findall(text) + FOOTNOTE_LINK_URL_RE.findall( text ) for slug in combined_refs: try: link_uid = site.slug_to_uid_lookup[slug] interlinks.add(link_uid) interlink_count += 1 except KeyError: if should_ignore_slug(slug): continue logger.warning(f"\nKeyError in {page.title} ({key}): {slug}") documents[key].links["internal"] = sorted(interlinks) for interlink_key in interlinks: documents[interlink_key].links["backlinks"].add(key) """ TODO: REMOVE SITE.BACKLINKS in favour a 'stats' or 'count' (templates will need updating """ site.backlinks += interlink_count def should_ignore_slug(slug): return ( slug.startswith(("feeds/", "images/", "$")) or slug.endswith((".jpg", ".webp", ".png", ".svg", ".pdf", ".gif", ".html")) or slug in ["publickey", "humans.txt", "build.py"] ) def build_collections( documents: Dict[str, DocumentMetadata], site: SiteMetadata ) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]: collections = { category: [] for category in list(site.categories) + list(site.secondaries) + list(site.tags) + ["everything", "main", "cd68b918-ac5f-4d6c-abb5-a55a0318846b"] } sitemap = [] for key, page in sorted( documents.items(), key=lambda k_v: k_v[1].available, reverse=True ): if page.status == "draft": collections["cd68b918-ac5f-4d6c-abb5-a55a0318846b"].append(page) continue elif page.status == "hidden": continue elif "nofeed" in page.options: sitemap.append(page) continue else: sitemap.append(page) collections["everything"].append(page) collections[page.category].append(page) collections[page.secondary].append(page) for tag in page.tags: collections[tag].append(page) if page.secondary in [ "essays", "wandering", "rambling", "dialog", "pearls", ]: collections["main"].append(page) return collections, sitemap def output_html( assets: Dict[str, AssetMetadata], documents: Dict[str, DocumentMetadata], collections: Dict[str, List[Dict[str, Any]]], site: SiteMetadata, env: Environment, output_dir: Path, ) -> None: logger.info("Generating Hypertext") for key, page in documents.items(): template_file = page.layout template = env.get_template(template_file) collection = build_page_collection(page, collections) output = template.render( documents=documents, assets=assets, collections=collections, collection=collection, page=asdict(page), site=site, ) output_path = output_dir / page.slug / "index.html" output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w") as f: f.write(output) logger.debug(f" {page.filepath} >> {output_path}") def build_page_collection(page, collections): try: collection = [ item for include in page.collection["include"] for item in collections[include] ] return sorted(collection, key=lambda x: x.available, reverse=True) except KeyError: logger.error(f"Failed collection for {page.filepath}") return [] def output_feeds(collections, site, env, output_dir): logger.info("Generating Feeds") feed_list = list(site.categories) + list(site.secondaries) + ["everything", "main"] for entry in feed_list: feed = render_feed(entry, collections, site, env) write_feed(feed, output_dir) logger.debug(f" {entry} >> {feed['path']}") output_feed_stylesheet(site, env, output_dir) def render_feed(feed_name, collections, site, env): slug = f"feeds/{feed_name}" feed_path = f"{slug}/index.xml" template = env.get_template(TEMPLATE_FEED) feed_content = template.render( site=site, slug=slug, collection=feed_name, feed=collections[feed_name], ) return {"name": feed_name, "output": feed_content, "path": feed_path} def write_feed(feed, output_dir): feed_path = output_dir / feed["path"] feed_path.parent.mkdir(parents=True, exist_ok=True) feed_path.write_text(feed["output"]) def output_link_report(site, output_dir): logger.info("Creating plaintext link files") output_path = output_dir / "links.txt" with open(output_path, "w") as file: for link in sorted(site.links["external"]): file.write(f"{link}\n") logger.debug(f" {output_path}") def output_feed_stylesheet(site, env, output_dir): logger.info("Creating XSL Stylesheet") template = env.get_template(TEMPLATE_FEED_XSL) output_path = output_dir / "feed.xsl" output = template.render(site=site) output_path.write_text(output) logger.debug(f" {output_path}") def output_sitemap(sitemap, site, env, output_dir): logger.info("Generating Sitemap") template = env.get_template(TEMPLATE_SITEMAP) output = template.render(sitemap=sitemap, site=site) output_path = output_dir / "sitemap.xml" output_path.write_text(output) logger.debug(f" {output_path}") async def main(): # Initialize site and load assets global site site = init_site() assets = load_assets() # Set up Jinja environment env = setup_jinja_environment() # Process assets await asyncio.to_thread(process_assets, assets, ASSET_DIR, OUTPUT_DIR) # Ingest and process documents documents = await ingest_documents(site) insert_substitutions(documents, assets, site) generate_html(documents) # Build backlinks and collections build_backlinks(documents, site) collections, sitemap = build_collections(documents, site) # Attempting to make final order of 'backlinks' deterministic for key, page in documents.items(): # Sort interlinks based on published dates documents[key].links["internal"] = sorted( documents[key].links["internal"], key=lambda x: documents[x].available, reverse=True, # Most recent first ) # Sort backlinks based on published dates documents[key].links["backlinks"] = sorted( documents[key].links["backlinks"], key=lambda x: documents[x].available, reverse=True, # Most recent first ) # Output HTML, feeds, and sitemap output_html(assets, documents, collections, site, env, OUTPUT_DIR) output_link_report(site, OUTPUT_DIR) output_feeds(collections, site, env, OUTPUT_DIR) output_sitemap(sitemap, site, env, OUTPUT_DIR) # Change back to the initial directory os.chdir(INIT_DIR) # Print summary logger.info("Build complete!") logger.info(f"Pages: {site.pagecount}") logger.info(f"Words: {site.words['total']}") logger.info(f"Internal links: {site.backlinks}") logger.info(f"External links: {len(site.links['external'])}") if __name__ == "__main__": asyncio.run(main())