#!/usr/bin/env python # creator: Silas Jelley # created: 2020-08-11 09:52:32 # updated: 2026-03-27 00:00:00 # version: 4.3 # Imports from collections import Counter, defaultdict from hashlib import md5 from pathlib import Path from subprocess import run from typing import List, Dict, Any, Tuple, Optional import asyncio import datetime import logging import os import random import re import time import signal import tomllib import sys from bs4 import BeautifulSoup from jinja2 import Environment, FileSystemLoader, StrictUndefined from urllib.parse import urlparse from . import incremental_build from .attribution import process_source_information from .collection import process_collection from .models import SiteMetadata, DocumentMetadata, AssetMetadata from .asset_processing import process_assets, terminate_workers from .error_reporting import report_error, validate_toml_file from .terminal import Progress, step, note, build_summary # Load configuration validate_toml_file(Path("config.toml")) with open("config.toml", "rb") as config_file: config = tomllib.load(config_file) # Constants ASSET_DIR = Path(config["paths"]["asset_dir"]) MANIFEST_DIR = Path(config["paths"]["manifest_dir"]) TEMPLATE_DIR = Path(config["paths"]["template_dir"]) TEMPLATE_FEED = config["templates"]["feed"] TEMPLATE_FEED_XSL = config["templates"]["feed_xsl"] TEMPLATE_SITEMAP = config["templates"]["sitemap"] # Allow overriding the output dir via env var, used by `site compare` OUTPUT_DIR = Path(os.environ.get("SITE_OUTPUT_DIR", config["paths"]["output_dir"])) if "notes_dir" in config["paths"]: NOTES_DIRS = [Path(config["paths"]["notes_dir"])] else: NOTES_DIRS = [Path(p) for p in config["paths"]["notes_dirs"]] LINK_REPORT_PATH = Path(config["paths"]["link_report_path"]) INIT_DIR = os.getcwd() # Keep a minimal logger for actual errors/warnings that should go to stderr logging.basicConfig(level=logging.WARNING, format="%(levelname)s: %(message)s") logger = logging.getLogger(__name__) def init_site(): site_config = config["site"] return SiteMetadata( name=site_config["name"], created=site_config["created"], url=site_config["url"], baseurl=site_config["baseurl"], uid=site_config["uid"], description=site_config["description"], creator=site_config["creator"], ) def preprocess_asset_metadata( uid: str, asset_data: Dict[str, Any], manifest_path: Path ) -> Dict[str, Any]: """Preprocess asset metadata to ensure it meets AssetMetadata requirements.""" processed = asset_data.copy() # Handle dates for date_field in ["created", "updated", "available"]: if isinstance(processed.get(date_field), str): processed[date_field] = _parse_date(processed[date_field]) elif isinstance(processed.get(date_field), datetime.datetime): processed[date_field] = processed[date_field].replace(tzinfo=None) else: processed[date_field] = datetime.datetime.now() # Set required fields with defaults if not present processed.setdefault("uid", uid) return processed def load_assets() -> Dict[str, AssetMetadata]: """Load asset manifests and convert them to AssetMetadata instances.""" assets = {} asset_manifests = list(MANIFEST_DIR.glob("*.toml")) for manifest in asset_manifests: with open(manifest, "rb") as f: manifest_data = tomllib.load(f) for uid, asset_data in manifest_data.items(): try: processed_data = preprocess_asset_metadata(uid, asset_data, manifest) processed_data["filepath"] = ASSET_DIR / processed_data["filepath"] assets[uid] = AssetMetadata(**processed_data) except Exception as e: if isinstance(e, TypeError): import dataclasses # Rerun to get processed_data for error reporting processed_data_for_error = preprocess_asset_metadata( uid, asset_data, manifest ) known_fields = {f.name for f in dataclasses.fields(AssetMetadata)} unknown_fields = set(processed_data_for_error.keys()) - known_fields error_msg = ( f"Error processing asset '{uid}' from {manifest}\n\n {e}" ) if unknown_fields: error_msg += ( f"\n Unknown field(s): {', '.join(unknown_fields)}" ) raise ValueError(error_msg) from e else: raise ValueError( f"Error processing asset {uid} from {manifest}\n\n {e}" ) from e return assets def setup_jinja_environment(): file_loader = FileSystemLoader(TEMPLATE_DIR) # autoescape=True ensures all template output is HTML-escaped by default; # intentional raw HTML must be marked with | safe explicitly. env = Environment(loader=file_loader, undefined=StrictUndefined, autoescape=True) # Add custom filters env.filters["shuffle"] = lambda seq: random.sample(seq, len(seq)) env.filters["time_local"] = lambda value, format="%-I:%M%p": value.strftime( format ).lower() env.filters["year"] = lambda value, format="%Y": value.strftime(format) env.filters["month"] = lambda value, format="%m": value.strftime(format) env.filters["day"] = lambda value, format="%d": value.strftime(format) env.filters["year_month"] = lambda value, format="%Y/%m": value.strftime(format) env.filters["year_month_day"] = lambda value, format="%Y/%m/%d": value.strftime( format ) env.filters["iso_date"] = lambda value, format="%Y-%m-%d": value.strftime(format) env.filters["date_long_short_month"] = lambda value, format="%b %e, %Y": ( value.strftime(format) ) env.filters["datetime_w3c"] = lambda value, format="%Y-%m-%dT%H:%M:%S": ( value.strftime(format) ) env.filters["date_long_full_month"] = lambda value, format="%B %e, %Y": ( value.strftime(format) ) env.filters["timedate_long"] = lambda value, format="%-I:%M%p %B %e, %Y": ( value.strftime(format) ) return env def get_files() -> List[Path]: all_files = [] for notes_dir in NOTES_DIRS: # Typst documents (.typ) — must contain '#metadata(' and 'available:' to be valid # Only read the first 512 bytes since metadata is always at the top of the file all_files.extend( [ f for f in notes_dir.glob("**/*.typ") if (t := f.read_text(errors="ignore")[:512]) and "#metadata(" in t and "available:" in t ] ) return all_files async def process_document( filepath: Path, site: SiteMetadata ) -> Tuple[str, DocumentMetadata]: """Process a document file and return its UID and metadata.""" from .formats import typst as typst_format raw_metadata, _ = await typst_format.load_document_async(filepath) document = typst_format.preprocess_metadata(filepath, raw_metadata) return document.uid, document async def ingest_documents( site: SiteMetadata, cached_documents: Dict[str, DocumentMetadata] | None = None, changed_files: set[Path] | None = None, ) -> Dict[str, Any]: file_list = get_files() documents = {} slug_to_title_lookup = {} slug_to_uid_lookup = {} uuid_collision_lookup = [] # Start with cached documents if available if cached_documents: documents = cached_documents.copy() # Rebuild lookups from cached documents for uid, doc in documents.items(): slug_to_title_lookup[doc.slug] = doc.title slug_to_uid_lookup[doc.slug] = uid site.categories.add(doc.category) site.secondaries.add(doc.secondary) site.tags.update(doc.tags) uuid_collision_lookup.append(uid) # Only process changed/new files files_to_process = [] if changed_files is not None: files_to_process = list(changed_files) else: # No cache available, process all files files_to_process = file_list if files_to_process: label = "Ingesting" if not cached_documents else "Re-ingesting" progress = Progress(label, total=len(files_to_process)) async def _process_with_progress(filepath): try: result = await process_document(filepath, site) progress.update(str(filepath.name)) return filepath, result, None except Exception as exc: progress.update(str(filepath.name)) return filepath, None, exc tasks = [_process_with_progress(filepath) for filepath in files_to_process] raw_results = await asyncio.gather(*tasks) progress.finish(f"{label} {len(raw_results)} files") # Collect all ingestion errors and report them all at once ingestion_errors = [(fp, exc) for fp, _, exc in raw_results if exc is not None] if ingestion_errors: import sys lines = [ f"\n \u2717 {len(ingestion_errors)} document(s) failed to ingest:\n" ] for fp, exc in ingestion_errors: lines.append(f" File: {fp}\n {exc}\n") print("".join(lines), file=sys.stderr) sys.exit(1) results = [result for _, result, _ in raw_results] for uid, doc in results: # Check for exact UID duplicates if uid in documents: existing = documents[uid] # Allow re-processing the same file (incremental rebuild of a changed file) if existing.filepath == doc.filepath: del documents[uid] uuid_collision_lookup.remove(uid) else: raise ValueError( f"\n\n ERROR: Duplicate UID found:\n" f" UID: {uid}\n\n" f" File 1: {existing.filepath}\n" f" Title: {existing.title}\n\n" f" File 2: {doc.filepath}\n" f" Title: {doc.title}\n" ) # Check for slug duplicates during ingestion if doc.slug in slug_to_uid_lookup: existing_uid = slug_to_uid_lookup[doc.slug] existing = documents.get(existing_uid) # Allow re-processing the same file (incremental rebuild of a changed file) if existing is not None and existing.filepath != doc.filepath: raise ValueError( f"\n\n ERROR: Duplicate slug found:\n" f" Slug: {doc.slug}\n\n" f" File 1: {existing.filepath if existing else existing_uid}\n" f" UID: {existing_uid}\n" f" Title: {existing.title if existing else '?'}\n\n" f" File 2: {doc.filepath}\n" f" UID: {uid}\n" f" Title: {doc.title}\n" ) documents[uid] = doc slug_to_title_lookup[doc.slug] = doc.title slug_to_uid_lookup[doc.slug] = uid site.categories.add(doc.category) site.secondaries.add(doc.secondary) site.tags.update(doc.tags) if uid not in uuid_collision_lookup: uuid_collision_lookup.append(uid) site.slug_to_uid_lookup = slug_to_uid_lookup site.slug_to_title_lookup = slug_to_title_lookup check_uuid_collisions(uuid_collision_lookup) site.pagecount = len(documents) return documents def _parse_date(date_str: str) -> datetime.datetime: """Parses a date string into a datetime object, handling both date and datetime inputs.""" try: return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S%z").replace( tzinfo=None ) except ValueError: return datetime.datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=None) def preprocess_metadata(filepath: Path, metadata: Dict[str, Any]) -> DocumentMetadata: """Preprocesses metadata for a document and converts it to a DocumentMetadata instance.""" # Create a working copy to avoid modifying the input processed = metadata.copy() # Parse date fields for date_field in ["available", "created", "updated"]: if isinstance(processed.get(date_field), str): processed[date_field] = _parse_date(processed[date_field]) elif isinstance(processed.get(date_field), datetime.datetime): processed[date_field] = processed[date_field].replace(tzinfo=None) # Set default updated time if not provided processed.setdefault("updated", processed.get("available")) # Process source information if present if "source" in processed: processed["attribution"] = process_source_information( processed["source"], processed.get("via", {}) ) else: processed["attribution"] = {} processed["source"] = {} if "via" not in processed: processed["via"] = {} # Handle draft status if processed.get("status") == "draft": processed["slug"] = f"drafts/{processed['uid']}" # Add filepath as it's required but comes from function parameter processed["filepath"] = filepath # Determine title processed["title"] = ( processed.get("title") or processed.get("attribution", {}).get("plain") or processed["available"].strftime("%B %e, %Y %-I.%M%p") ) # Create and return DocumentMetadata instance try: return DocumentMetadata(**processed) except TypeError as e: import dataclasses known_fields = {f.name for f in dataclasses.fields(DocumentMetadata)} unknown_fields = set(processed.keys()) - known_fields error_msg = f"Error processing metadata for {filepath}\n\n {e}" if unknown_fields: error_msg += ( f"\n Unknown field(s) in front matter: {', '.join(unknown_fields)}" ) raise ValueError(error_msg) from e def check_uuid_collisions(uuid_list): prefixes = [uuid[:8] for uuid in uuid_list] if len(set(prefixes)) != len(prefixes): collisions = [ prefix for prefix, count in Counter(prefixes).items() if count > 1 ] raise ValueError( f"CRITICAL ERROR: UUID prefix collision for: {', '.join(collisions)}" ) async def generate_html(documents, assets=None, changed_uids: set[str] | None = None): # If no changed_uids provided, process all documents if changed_uids is None: docs_to_process = documents.keys() else: docs_to_process = changed_uids from .formats import typst as typst_format typst_docs = list(docs_to_process) # Compile all Typst documents concurrently if typst_docs: async def _compile_typst(key): page = documents[key] try: html = await typst_format.render_to_html_async( page.filepath, page, documents, assets or {} ) # For quotes, the compiled body is the source text — route it to # source["html"] so the template wraps it in
. if page.secondary == "quotes": page.source["html"] = html.strip() else: page.content["html"] = html return page.filepath, None except Exception as exc: return page.filepath, exc compile_results = await asyncio.gather( *[_compile_typst(key) for key in typst_docs] ) compile_errors = [(fp, exc) for fp, exc in compile_results if exc is not None] if compile_errors: import sys lines = [ f"\n \u2717 {len(compile_errors)} document(s) failed HTML compilation:\n" ] for fp, exc in compile_errors: lines.append(f" File: {fp}\n {exc}\n") print("".join(lines), file=sys.stderr) sys.exit(1) # Aggregate Typst external links into site-wide set (including source/via URLs) for key in typst_docs: page = documents[key] if page.status == "draft": continue site.links["external"].update(page.links["external"]) # Also include source.url and via.url for url_field in (page.source.get("url", ""), page.via.get("url", "")): if url_field and url_field.startswith("http"): parsed = urlparse(url_field) if parsed.netloc.lower() != "silasjelley.com": page.links["external"].add(url_field) site.links["external"].add(url_field) # Generate descriptions for all processed docs for key in docs_to_process: page = documents[key] if not page.description: html_content = page.content.get("html") or page.source.get("html") if html_content: soup = BeautifulSoup(html_content, "html.parser") text = soup.get_text(separator=" ", strip=True) # Normalize whitespace (collapse multiple spaces) text = " ".join(text.split()) page.description = text[:200] if not page.description: logger.warn( f"DEBUG: Description is empty for {page.filepath} after extraction. HTML content len: {len(html_content)}" ) # Aggregate site-wide counts from per-page counts site.words["self"] = sum( p.words["self"] for p in documents.values() if p.status != "draft" ) site.words["drafts"] = sum( p.words["self"] for p in documents.values() if p.status == "draft" ) site.words["references"] = sum(p.words["references"] for p in documents.values()) site.words["code"]["lines"] = sum( p.words["code"]["lines"] for p in documents.values() ) site.words["code"]["words"] = sum( p.words["code"]["words"] for p in documents.values() ) site.words["total"] = ( site.words["self"] + site.words["drafts"] + site.words["references"] ) def build_backlinks(documents, site): HTML_LINK_RE = re.compile(r'href="/([^"]*)"') interlink_count = 0 for key, page in documents.items(): if "nobacklinks" in page.options or page.status == "draft": continue logger.debug(page.filepath) html_content = page.content.get("html", "") + page.source.get("html", "") if not html_content: continue interlinks = set() for raw_slug in HTML_LINK_RE.findall(html_content): # Strip fragment (#...) and query string (?...) to get bare path slug = raw_slug.split("#")[0].split("?")[0] try: link_uid = site.slug_to_uid_lookup[slug] interlinks.add(link_uid) interlink_count += 1 except KeyError: if should_ignore_slug(slug): continue logger.warning(f"\nKeyError in {page.title} ({key}): {slug}") documents[key].links["internal"] = sorted(interlinks) for interlink_key in interlinks: documents[interlink_key].links["backlinks"].add(key) """ TODO: REMOVE SITE.BACKLINKS in favour a 'stats' or 'count' (templates will need updating """ site.backlinks += interlink_count def should_ignore_slug(slug): return ( slug.startswith(("feeds/", "images/", "$")) or slug.endswith((".jpg", ".webp", ".png", ".svg", ".pdf", ".gif", ".html")) or slug in ["publickey", "humans.txt", "build.py", "links.txt"] ) def build_collections( documents: Dict[str, DocumentMetadata], site: SiteMetadata ) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]: collections = { category: [] for category in list(site.categories) + list(site.secondaries) + list(site.tags) + ["everything", "main", "cd68b918-ac5f-4d6c-abb5-a55a0318846b"] } sitemap = [] for key, page in sorted( documents.items(), key=lambda k_v: k_v[1].available, reverse=True ): if page.status == "draft": collections["cd68b918-ac5f-4d6c-abb5-a55a0318846b"].append(page) continue elif page.status == "hidden": continue elif "nofeed" in page.options: sitemap.append(page) continue else: sitemap.append(page) collections["everything"].append(page) collections[page.category].append(page) collections[page.secondary].append(page) for tag in page.tags: collections[tag].append(page) if page.secondary in [ "essays", "wandering", "rambling", "pearls", ]: collections["main"].append(page) # Deduplicate each collection by UID — a document can be added more than # once if its tag name matches its category or secondary name. for key in collections: seen = set() deduped = [] for page in collections[key]: if page.uid not in seen: seen.add(page.uid) deduped.append(page) collections[key] = deduped return collections, sitemap def output_html( assets: Dict[str, AssetMetadata], documents: Dict[str, DocumentMetadata], collections: Dict[str, List[Dict[str, Any]]], site: SiteMetadata, env: Environment, output_dir: Path, ) -> None: """HTML output with incremental builds.""" incremental_build.output_html_incremental( assets=assets, documents=documents, collections=collections, site=site, env=env, output_dir=output_dir, ) def output_feeds(collections, site, env, output_dir) -> int: feed_list = list(site.categories) + list(site.secondaries) + ["everything", "main"] for entry in feed_list: feed = render_feed(entry, collections, site, env) write_feed(feed, output_dir) logger.debug(f" {entry} >> {feed['path']}") output_feed_stylesheet(site, env, output_dir) return len(feed_list) def render_feed(feed_name, collections, site, env): slug = f"feeds/{feed_name}" feed_path = f"{slug}/index.xml" template = env.get_template(TEMPLATE_FEED) feed_content = template.render( site=site, slug=slug, collection=feed_name, feed=collections[feed_name], ) return {"name": feed_name, "output": feed_content, "path": feed_path} def write_feed(feed, output_dir): feed_path = output_dir / feed["path"] feed_path.parent.mkdir(parents=True, exist_ok=True) feed_path.write_text(feed["output"]) def output_link_report(site, link_report_path): with open(link_report_path, "w") as file: for link in sorted(site.links["external"]): file.write(f"{link}\n") def output_feed_stylesheet(site, env, output_dir): template = env.get_template(TEMPLATE_FEED_XSL) output_path = output_dir / "feed.xsl" output = template.render(site=site) output_path.write_text(output) def output_sitemap(sitemap, site, env, output_dir): template = env.get_template(TEMPLATE_SITEMAP) output = template.render(sitemap=sitemap, site=site) output_path = output_dir / "sitemap.xml" output_path.write_text(output) def process_pending_collections( site: SiteMetadata, documents: Dict[str, DocumentMetadata], collections: Dict[str, List[Dict[str, Any]]], env: Environment, ): """ Renders and inserts collections for all documents that used the @collection shortcode. This runs iteratively to handle nested collections. """ if not site.docs_with_collections: return max_passes = 10 # Safeguard against infinite loops for i in range(max_passes): replacements_this_pass = 0 # Process all documents that have pending collections for uid in list(site.docs_with_collections): page = documents[uid] if not page.pending_collections: continue remaining_pending = [] for pending in page.pending_collections: params = pending["params"] placeholder = ( f"

COLLECTION-PLACEHOLDER-{pending['placeholder_id']}

" ) if placeholder not in page.content.get("html", ""): continue # 1. Process collection to get final list of items # We pass the current page's UID to handle self-exclusion sliced_items = process_collection(params, documents, collections, uid) # 2. Check for unresolved dependencies. If style is 'body', # we must ensure all sub-collections are rendered first. can_render = True if params.style == "body": for item in sliced_items: if "COLLECTION-PLACEHOLDER" in item.content.get("html", ""): can_render = False break if not can_render: remaining_pending.append(pending) # Keep for the next pass continue # 3. Render collection_template = env.get_template("collection") rendered_html = collection_template.render( site=site, documents=documents, collection=sliced_items, style=params.style, include_title=params.include_title, show_date=params.show_date, numbered=params.numbered, ) # 4. Replace placeholder page.content["html"] = page.content["html"].replace( placeholder, rendered_html ) replacements_this_pass += 1 page.pending_collections = remaining_pending logger.debug( f"Collection Pass {i + 1}: {replacements_this_pass} replacements made." ) if replacements_this_pass == 0: logger.debug("Collection processing complete.") break else: logger.warning( "Max passes reached for collection processing. " "There might be unresolved collections or a circular dependency." ) # Final check for any unresolved collections unresolved_docs = [ documents[uid].title for uid in site.docs_with_collections if documents[uid].pending_collections ] if unresolved_docs: logger.warning( f"Could not resolve all collections. Unresolved documents: {', '.join(unresolved_docs)}" ) def process_pending_embeds( documents: Dict[str, DocumentMetadata], assets: Dict[str, AssetMetadata], ): """ Resolves site-embed placeholders inserted during Typst HTML post-processing. Embeds are deferred because the referenced document may not yet have compiled HTML when the embedding document is processed concurrently. """ from .resolver import find_any for page in documents.values(): if not page.pending_embeds: continue html = page.content.get("html", "") for embed in page.pending_embeds: placeholder = f"

EMBED-PLACEHOLDER-{embed['placeholder_id']}

" if placeholder not in html: continue try: _, ref = find_any(embed["ref_id"], documents, assets) except ValueError: logger.warning( f"Cannot resolve embed '{embed['ref_id']}' in {page.filepath}" ) html = html.replace(placeholder, "") continue variant = embed["variant"] ref_content = ref.content.get("html", "") if hasattr(ref, "content") else "" if variant == "aside": ref_title = ref.title ref_available = ref.available location_string = "" if ( hasattr(ref, "location") and ref.location and ref.location.get("city") ): location_string = ( f" ⚕ {ref.location['city']}, {ref.location['country']}" ) embed_html = ( f'" ) else: embed_html = ref_content html = html.replace(placeholder, embed_html) page.content["html"] = html page.pending_embeds = [] def process_pending_quotes( documents: Dict[str, DocumentMetadata], ): """ Resolves site-quote placeholders inserted during Typst HTML post-processing. Quotes are deferred because the referenced quote document may not yet have its source["html"] populated when the embedding document is processed concurrently. """ from .resolver import find_document for page in documents.values(): if not page.pending_quotes: continue html = page.content.get("html", "") for quote in page.pending_quotes: placeholder = f"

QUOTE-PLACEHOLDER-{quote['placeholder_id']}

" if placeholder not in html: continue try: _, ref = find_document(quote["ref_id"], documents) except ValueError: logger.warning( f"Cannot resolve quote '{quote['ref_id']}' in {page.filepath}" ) html = html.replace(placeholder, "") continue ref_slug = f"/{ref.slug}" attr_plain = ref.attribution.get("plain", "") attr_linked = f'— {attr_plain}' ref_text = ref.source.get("html", "") variant = quote["variant"] if variant == "inline": quote_html = f'"{ref_text}" {attr_linked}' elif variant == "compact": quote_html = f"

{ref_text} {attr_linked}

" else: # Full variant: preserve newline between

and directly since # we're doing string replacement rather than BeautifulSoup parsing. quote_html = f"

{ref_text}

\n{attr_linked}
" html = html.replace(placeholder, quote_html) page.content["html"] = html page.pending_quotes = [] def validate_slug_uniqueness( documents: Dict[str, DocumentMetadata], assets: Dict[str, AssetMetadata] ) -> None: """Validate that all slugs are unique across documents and assets.""" slug_to_items = defaultdict(list) for doc in documents.values(): slug_to_items[doc.slug].append( ("document", doc.uid, doc.title, str(doc.filepath)) ) for asset in assets.values(): slug_to_items[asset.slug].append( ("asset", asset.uid, asset.title, str(asset.filepath)) ) collisions = { slug: items for slug, items in slug_to_items.items() if len(items) > 1 } if collisions: error_lines = ["\n\n ERROR: Duplicate slugs found:"] for slug, items in collisions.items(): for item_type, uid, title, filepath in items: error_lines.extend( [ f" {item_type.capitalize()}: {uid}", f" Title: {title}", f" File: {filepath}", f" Slug: {slug}", "", ] ) error_msg = "\n".join(error_lines) raise ValueError(error_msg) async def main(): import sys global site site = init_site() t_start = time.perf_counter() # Set up Jinja environment env = setup_jinja_environment() stylesheet_content = "" stylesheet_dir = Path(config["paths"]["stylesheet_dir"]) if stylesheet_dir.is_dir(): for css_file in sorted(stylesheet_dir.glob("*.css")): stylesheet_content += css_file.read_text() + "\n" output_stylesheet_path = OUTPUT_DIR / "style.css" output_stylesheet_path.parent.mkdir(parents=True, exist_ok=True) output_stylesheet_path.write_text(stylesheet_content) site.data["stylesheet_hash"] = md5( stylesheet_content.encode("utf-8") ).hexdigest() # Check for full rebuild flag force_rebuild = "--full-rebuild" in sys.argv if force_rebuild: note("Full rebuild requested, clearing cache") incremental_build.clear_build_cache(OUTPUT_DIR) # Try to load cached state cached_state = None changed_files = None cached_documents = None cached_assets = None file_hashes = {} if not force_rebuild: cached_state = incremental_build.load_state_cache(OUTPUT_DIR) # Get list of content files (needed for change detection and ingestion) file_list = get_files() if cached_state: cached_documents, cached_assets, cached_site, file_hashes = cached_state note("Loaded state from cache") # Identify changed files changed_files, new_file_hashes = incremental_build.get_changed_files( file_list, file_hashes ) file_hashes = new_file_hashes if changed_files: note(f"Detected {len(changed_files)} changed file(s)") # Restore site metadata from cache where appropriate site.data = cached_site.data site.links["external"] = cached_site.links.get("external", set()) else: note("No cache found, full build") # Compute file hashes for the cache we'll save at the end _, file_hashes = incremental_build.get_changed_files(file_list, {}) # Load assets (always reload to catch new/changed assets) assets = load_assets() # Process assets await asyncio.to_thread(process_assets, assets, ASSET_DIR, OUTPUT_DIR) # Ingest and process documents (using cache when available) documents = await ingest_documents(site, cached_documents, changed_files) # Validate slug uniqueness validate_slug_uniqueness(documents, assets) # Track which documents need processing changed_uids = set() if changed_files: # Map changed file paths to UIDs for filepath in changed_files: for uid, doc in documents.items(): if doc.filepath == filepath: changed_uids.add(uid) break # Generate HTML only for changed documents t_html = step("Generating HTML") if changed_uids: await generate_html(documents, assets, changed_uids) elif not cached_state: await generate_html(documents, assets) else: # No changes, but still need to aggregate word counts from cached data site.words["self"] = sum( p.words["self"] for p in documents.values() if p.status != "draft" ) site.words["drafts"] = sum( p.words["self"] for p in documents.values() if p.status == "draft" ) site.words["references"] = sum( p.words["references"] for p in documents.values() ) site.words["code"]["lines"] = sum( p.words["code"]["lines"] for p in documents.values() ) site.words["code"]["words"] = sum( p.words["code"]["words"] for p in documents.values() ) site.words["total"] = ( site.words["self"] + site.words["drafts"] + site.words["references"] ) t_html.done() # Resolve deferred embeds and quotes before building backlinks so that any # internal links injected by these placeholders are visible to build_backlinks. process_pending_embeds(documents, assets) process_pending_quotes(documents) # Build backlinks and collections (always rebuild to handle cross-document references) t_bl = step("Building backlinks") collections, sitemap = build_collections(documents, site) site.backlinks = 0 build_backlinks(documents, site) t_bl.done() # Make final order of 'backlinks' deterministic before collection rendering for key, page in documents.items(): # Sort interlinks based on published dates documents[key].links["internal"] = sorted( documents[key].links["internal"], key=lambda x: documents[x].available, reverse=True, # Most recent first ) # Sort backlinks based on published dates documents[key].links["backlinks"] = sorted( documents[key].links["backlinks"], key=lambda x: documents[x].available, reverse=True, # Most recent first ) # Register any Typst documents that have pending_collections populated # during HTML post-processing (site-collection elements). for uid, page in documents.items(): if page.pending_collections: site.docs_with_collections.add(uid) # Now render collections with complete backlink data t_col = step("Rendering collections") process_pending_collections(site, documents, collections, env) t_col.done() # Re-derive descriptions for pages whose description still contains placeholder text # (placeholders are resolved after the initial description pass, so collection/embed # pages would otherwise end up with placeholder strings in their meta tags). for page in documents.values(): if page.description and "PLACEHOLDER" in page.description: page.description = None if not page.description: html_content = page.content.get("html") or page.source.get("html") if html_content: soup = BeautifulSoup(html_content, "html.parser") text = soup.get_text(separator=" ", strip=True) page.description = " ".join(text.split())[:200] # Save state cache before output incremental_build.save_state_cache(documents, assets, site, file_hashes, OUTPUT_DIR) # Output HTML, feeds, and sitemap output_html(assets, documents, collections, site, env, OUTPUT_DIR) t_other = step("Writing feeds and sitemap") output_link_report(site, LINK_REPORT_PATH) feed_count = output_feeds(collections, site, env, OUTPUT_DIR) output_sitemap(sitemap, site, env, OUTPUT_DIR) t_other.done(detail=f"{feed_count} feeds") # Change back to the initial directory os.chdir(INIT_DIR) build_summary( pages=site.pagecount, words=site.words["total"], links_internal=site.backlinks, links_external=len(site.links["external"]), elapsed=time.perf_counter() - t_start, ) def build_styles(): """Concatenate CSS source files into the output stylesheet. This is a fast path for CSS-only changes — it skips all document processing and just re-bundles the stylesheet, making the round-trip from edit to browser refresh as short as possible. """ stylesheet_dir = Path(config["paths"]["stylesheet_dir"]) output_stylesheet_path = OUTPUT_DIR / "style.css" if not stylesheet_dir.is_dir(): print(f"Stylesheet directory not found: {stylesheet_dir}", file=sys.stderr) sys.exit(1) stylesheet_content = "" for css_file in sorted(stylesheet_dir.glob("*.css")): stylesheet_content += css_file.read_text() + "\n" output_stylesheet_path.parent.mkdir(parents=True, exist_ok=True) output_stylesheet_path.write_text(stylesheet_content) print(f"Stylesheet written to {output_stylesheet_path}") def _sigint_handler(signum, frame): # Fires on the main thread immediately when Ctrl+C is pressed, before # asyncio gets a chance to cancel tasks. This is the only reliable place # to terminate the multiprocessing pool, since process_assets runs in a # to_thread() worker that never receives KeyboardInterrupt. terminate_workers() signal.signal(signal.SIGINT, signal.SIG_DFL) signal.raise_signal(signal.SIGINT) if __name__ == "__main__": if "--styles-only" in sys.argv: build_styles() else: signal.signal(signal.SIGINT, _sigint_handler) try: asyncio.run(main()) except KeyboardInterrupt: print("\nInterrupted") sys.exit(130) except Exception as e: report_error(e, "Site Build Failed")