#!/usr/bin/env python # creator: Silas Jelley # created: 2020-08-11 09:52:32 # updated: 2024-09-22 15:38:57 # version: 3.0 # /// script # requires-python = ">=3.12" # dependencies = [ # "Pillow ", # "filelock", # "jinja2", # "urllib3", # "pygments", # "pillow_heif", # "pillow_avif-plugin", # "typing-extensions", # "beautifulsoup4", # "csscompressor", # ] # /// # Imports from collections import Counter, defaultdict from hashlib import md5 from pathlib import Path from subprocess import run from typing import List, Dict, Any, Tuple, Set, Optional import asyncio import datetime import logging import os import random import re import tomllib import sys from csscompressor import compress from bs4 import BeautifulSoup from jinja2 import Environment, FileSystemLoader, StrictUndefined from urllib.parse import urlparse import incremental_build from shortcodes import process_shortcodes from shortcodes.collection import process_collection from attribution import process_source_information from code_highlight import highlight_code from models import SiteMetadata, DocumentMetadata, AssetMetadata from asset_processing import process_assets from error_reporting import report_error, validate_toml_file # Load configuration validate_toml_file(Path("config.toml")) with open("config.toml", "rb") as config_file: config = tomllib.load(config_file) # Constants ASSET_DIR = Path(config["paths"]["asset_dir"]) MANIFEST_DIR = Path(config["paths"]["manifest_dir"]) TEMPLATE_DIR = Path(config["paths"]["template_dir"]) TEMPLATE_FEED = config["templates"]["feed"] TEMPLATE_FEED_XSL = config["templates"]["feed_xsl"] TEMPLATE_SITEMAP = config["templates"]["sitemap"] OUTPUT_DIR = Path(config["paths"]["output_dir"]) NOTES_DIR = Path(config["paths"]["notes_dir"]) LINK_REPORT_PATH = Path(config["paths"]["link_report_path"]) INIT_DIR = os.getcwd() # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) def init_site(): site_config = config["site"] return SiteMetadata( name=site_config["name"], created=site_config["created"], url=site_config["url"], baseurl=site_config["baseurl"], uid=site_config["uid"], description=site_config["description"], creator=site_config["creator"], ) def preprocess_asset_metadata( uid: str, asset_data: Dict[str, Any], manifest_path: Path ) -> Dict[str, Any]: """Preprocess asset metadata to ensure it meets AssetMetadata requirements.""" processed = asset_data.copy() # Handle dates for date_field in ["created", "updated", "available"]: if isinstance(processed.get(date_field), str): processed[date_field] = _parse_date(processed[date_field]) elif isinstance(processed.get(date_field), datetime.datetime): processed[date_field] = processed[date_field].replace(tzinfo=None) else: processed[date_field] = datetime.datetime.now() # Set required fields with defaults if not present processed.setdefault("uid", uid) return processed def load_assets() -> Dict[str, AssetMetadata]: """Load asset manifests and convert them to AssetMetadata instances.""" assets = {} asset_manifests = list(MANIFEST_DIR.glob("*.toml")) for manifest in asset_manifests: with open(manifest, "rb") as f: manifest_data = tomllib.load(f) for uid, asset_data in manifest_data.items(): try: processed_data = preprocess_asset_metadata(uid, asset_data, manifest) processed_data["filepath"] = ASSET_DIR / processed_data["filepath"] assets[uid] = AssetMetadata(**processed_data) except Exception as e: if isinstance(e, TypeError): import dataclasses # Rerun to get processed_data for error reporting processed_data_for_error = preprocess_asset_metadata( uid, asset_data, manifest ) known_fields = {f.name for f in dataclasses.fields(AssetMetadata)} unknown_fields = set(processed_data_for_error.keys()) - known_fields error_msg = ( f"Error processing asset '{uid}' from {manifest}\n\n {e}" ) if unknown_fields: error_msg += ( f"\n Unknown field(s): {', '.join(unknown_fields)}" ) raise ValueError(error_msg) from e else: raise ValueError( f"Error processing asset {uid} from {manifest}\n\n {e}" ) from e return assets def setup_jinja_environment(): file_loader = FileSystemLoader(TEMPLATE_DIR) env = Environment(loader=file_loader, undefined=StrictUndefined) # Add custom filters env.filters["shuffle"] = lambda seq: random.sample(seq, len(seq)) env.filters["time_local"] = lambda value, format="%-I:%M%p": value.strftime( format ).lower() env.filters["year"] = lambda value, format="%Y": value.strftime(format) env.filters["month"] = lambda value, format="%m": value.strftime(format) env.filters["day"] = lambda value, format="%d": value.strftime(format) env.filters["year_month"] = lambda value, format="%Y/%m": value.strftime(format) env.filters["year_month_day"] = lambda value, format="%Y/%m/%d": value.strftime( format ) env.filters["iso_date"] = lambda value, format="%Y-%m-%d": value.strftime(format) env.filters["date_long_short_month"] = ( lambda value, format="%b %e, %Y": value.strftime(format) ) env.filters["datetime_w3c"] = ( lambda value, format="%Y-%m-%dT%H:%M:%S": value.strftime(format) ) env.filters["date_long_full_month"] = ( lambda value, format="%B %e, %Y": value.strftime(format) ) env.filters["timedate_long"] = ( lambda value, format="%-I:%M%p %B %e, %Y": value.strftime(format) ) env.filters["highlight_code"] = highlight_code return env def get_files() -> List[Path]: return [f for f in NOTES_DIR.glob("**/*.md") if "available = " in f.read_text()] def extract_markdown_links(text: str) -> List[Tuple[str, str]]: """Extract markdown links, properly handling parentheses in URLs.""" links = [] pattern = r"\[([^\]]+)\]\((https?://)" for match in re.finditer(pattern, text): link_text = match.group(1) url_start = match.end() # Count parentheses to find the real end paren_count = 1 # We've seen the opening ( pos = url_start while pos < len(text) and paren_count > 0: if text[pos] == "(": paren_count += 1 elif text[pos] == ")": paren_count -= 1 pos += 1 url = match.group(2) + text[url_start : pos - 1] links.append((link_text, url)) return links def extract_external_links( text: str, site, status ) -> Tuple[List, Optional[Tuple[str, str]]]: """Extract external links from text. Returns: Tuple of (sorted external links list, optional (link_text, url) tuple if problematic link found) """ # Characters that must be percent-encoded in URLs (RFC 3986) UNENCODED_CHARS = { "(": "%28", ")": "%29", " ": "%20", "<": "%3C", ">": "%3E", '"': "%22", "\\": "%5C", "^": "%5E", "`": "%60", "{": "%7B", "}": "%7D", "|": "%7C", } # Extract markdown links properly handling parens markdown_matches = extract_markdown_links(text) # Check for markdown links with characters that should be encoded for link_text, url in markdown_matches: for char in UNENCODED_CHARS.keys(): if char in url: return [], (link_text, url) # Pattern that matches properly encoded URLs url_pattern = ( r"https?://" # scheme r"(?:[-\w.]|(?:%[\da-fA-F]{2}))+" # domain r"(?:" # optional path and query r"/(?:[^\s\"'<>)\|\\^`{}]|(?:%[\da-fA-F]{2}))*" # path segments r")?" ) matches = re.findall(url_pattern, text) external_links = set() for url in matches: # Clean up any trailing punctuation that might have been caught url = url.rstrip(".,;:!?") # Skip URLs with unencoded problematic characters if any(char in url for char in UNENCODED_CHARS.keys()): continue parsed_url = urlparse(url) if parsed_url.netloc.lower() != "silasjelley.com": external_links.add(url) # Only add to site.links if not a draft if status != "draft": site.links["external"].add(url) return sorted(external_links), None async def process_document( filepath: Path, site: SiteMetadata ) -> Tuple[str, DocumentMetadata]: """Process a document file and return its UID and metadata.""" # Validate TOML file first for better error reporting validate_toml_file(filepath) with open(filepath, "rb") as f: parsed_toml = tomllib.load(f) # The UID is now the top-level table name uid = parsed_toml["uid"] # Process metadata into DocumentMetadata instance document = preprocess_metadata(filepath, parsed_toml) # Extract external links from the plain text content try: plain_text = ( document.content.get("plain", "") + " " + document.source.get("url", "") + " " + document.via.get("url", "") ) status = document.status if document.status else "" external_links, problematic = extract_external_links(plain_text, site, status) if problematic: link_text, url = problematic # Simply encode parentheses and other problematic characters encoded_url = url.replace("(", "%28").replace(")", "%29") # Add other character replacements as needed encoded_url = encoded_url.replace(" ", "%20") encoded_url = encoded_url.replace("<", "%3C").replace(">", "%3E") encoded_url = encoded_url.replace('"', "%22") encoded_url = encoded_url.replace("\\", "%5C") encoded_url = encoded_url.replace("^", "%5E") encoded_url = encoded_url.replace("`", "%60") encoded_url = encoded_url.replace("{", "%7B").replace("}", "%7D") encoded_url = encoded_url.replace("|", "%7C") raise ValueError( f"\n\n ERROR: Document contains URL with improperly encoded characters:\n" f" Title: {document.title}\n" f" File: {filepath}\n" f" Link: [{link_text}]({url})\n\n" f" Replace with:\n" f" [{link_text}]({encoded_url})\n\n" ) document.links["external"] = external_links except KeyError: logger.warn(f"KeyError while compiling external links from {document.filepath}") pass return uid, document async def ingest_documents( site: SiteMetadata, cached_documents: Dict[str, DocumentMetadata] | None = None, changed_files: set[Path] | None = None, ) -> Dict[str, Any]: logger.info("Ingesting files") file_list = get_files() documents = {} slug_to_title_lookup = {} slug_to_uid_lookup = {} uuid_collision_lookup = [] # Start with cached documents if available if cached_documents: logger.info(f"Loading {len(cached_documents)} documents from cache") documents = cached_documents.copy() # Rebuild lookups from cached documents for uid, doc in documents.items(): slug_to_title_lookup[doc.slug] = doc.title slug_to_uid_lookup[doc.slug] = uid site.categories.add(doc.category) site.secondaries.add(doc.secondary) site.tags.update(doc.tags) uuid_collision_lookup.append(uid) # Only process changed/new files files_to_process = [] if changed_files is not None: files_to_process = list(changed_files) logger.info(f"Processing {len(files_to_process)} changed/new files") else: # No cache available, process all files files_to_process = file_list if files_to_process: tasks = [process_document(filepath, site) for filepath in files_to_process] results = await asyncio.gather(*tasks) for uid, doc in results: documents[uid] = doc slug_to_title_lookup[doc.slug] = doc.title slug_to_uid_lookup[doc.slug] = uid site.categories.add(doc.category) site.secondaries.add(doc.secondary) site.tags.update(doc.tags) if uid not in uuid_collision_lookup: uuid_collision_lookup.append(uid) site.slug_to_uid_lookup = slug_to_uid_lookup site.slug_to_title_lookup = slug_to_title_lookup check_uuid_collisions(uuid_collision_lookup) site.pagecount = len(documents) logger.info(f"Total documents: {site.pagecount}") return documents def _parse_date(date_str: str) -> datetime.datetime: """Parses a date string into a datetime object, handling both date and datetime inputs.""" try: return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S%z").replace( tzinfo=None ) except ValueError: return datetime.datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=None) def preprocess_metadata(filepath: Path, metadata: Dict[str, Any]) -> DocumentMetadata: """Preprocesses metadata for a document and converts it to a DocumentMetadata instance.""" # Create a working copy to avoid modifying the input processed = metadata.copy() # Parse date fields for date_field in ["available", "created", "updated"]: if isinstance(processed.get(date_field), str): processed[date_field] = _parse_date(processed[date_field]) elif isinstance(processed.get(date_field), datetime.datetime): processed[date_field] = processed[date_field].replace(tzinfo=None) # Set default updated time if not provided processed.setdefault("updated", processed.get("available")) # Process source information if present if "source" in processed: processed["attribution"] = process_source_information( processed["source"], processed.get("via", {}) ) else: processed["attribution"] = {} processed["source"] = {} if "via" not in processed: processed["via"] = {} # Handle draft status if processed.get("status") == "draft": processed["slug"] = f"drafts/{processed['uid']}" # Add filepath as it's required but comes from function parameter processed["filepath"] = filepath # Determine title processed["title"] = ( processed.get("title") or processed.get("attribution", {}).get("plain") or processed["available"].strftime("%B %e, %Y %-I.%M%p") ) # Create and return DocumentMetadata instance try: return DocumentMetadata(**processed) except TypeError as e: import dataclasses known_fields = {f.name for f in dataclasses.fields(DocumentMetadata)} unknown_fields = set(processed.keys()) - known_fields error_msg = f"Error processing metadata for {filepath}\n\n {e}" if unknown_fields: error_msg += ( f"\n Unknown field(s) in front matter: {', '.join(unknown_fields)}" ) raise ValueError(error_msg) from e def check_uuid_collisions(uuid_list): prefixes = [uuid[:8] for uuid in uuid_list] if len(set(prefixes)) != len(prefixes): collisions = [ prefix for prefix, count in Counter(prefixes).items() if count > 1 ] raise ValueError( f"CRITICAL ERROR: UUID prefix collision for: {', '.join(collisions)}" ) def generate_html(documents, changed_uids: set[str] | None = None): logger.info("Generating HTML") # If no changed_uids provided, process all documents if changed_uids is None: docs_to_process = documents.keys() logger.info(f"Generating HTML for all {len(docs_to_process)} documents") else: docs_to_process = changed_uids logger.info(f"Generating HTML for {len(docs_to_process)} changed documents") for key in docs_to_process: page = documents[key] if page.content.get("plain"): page.content["html"] = run_jotdown(page.content["plain"], page) if page.source.get("text"): page.source["html"] = run_jotdown(page.source["text"], page) if not page.description: html_content = page.content.get("html") or page.source.get("html") if html_content: soup = BeautifulSoup(html_content, "html.parser") text = soup.get_text(separator=" ", strip=True) # Normalize whitespace (collapse multiple spaces) text = " ".join(text.split()) page.description = text[:200] # Aggregate site-wide counts from per-page counts site.words["self"] = sum( p.words["self"] for p in documents.values() if p.status != "draft" ) site.words["drafts"] = sum( p.words["self"] for p in documents.values() if p.status == "draft" ) site.words["references"] = sum(p.words["references"] for p in documents.values()) site.words["code"]["lines"] = sum( p.words["code"]["lines"] for p in documents.values() ) site.words["code"]["words"] = sum( p.words["code"]["words"] for p in documents.values() ) site.words["total"] = ( site.words["self"] + site.words["drafts"] + site.words["references"] ) def run_jotdown(plaintext: str, page) -> str: """ Process djot formatted plaintext in HTML while also making accurate counts of words of prose, references, and code """ CODE_BLOCK_RE = re.compile( r"( *)````*(=html|\s*(?:(\w+)\n))?(.*?)( *)````*", re.DOTALL ) HTML_CODE_BLOCK_RE = re.compile( r'
]*>]*>(.*?)', re.DOTALL | re.IGNORECASE
)
code_blocks = []
marker_template = "§CODE_BLOCK_{}§"
def save_code_block(match):
leading_space = match.group(1)
raw_html_marker = match.group(2)
language = match.group(3)
code = match.group(4).rstrip()
trailing_space = match.group(5)
code_words = len(code.split())
code_lines = len(code.splitlines())
page.words["code"]["lines"] += code_lines
page.words["code"]["words"] += code_words
# Check if this is a raw HTML block
if raw_html_marker == "=html":
return f"{leading_space}```=html\n{code}\n{trailing_space}```"
# For all other cases, including 'html' language, highlight the code
highlighted = highlight_code(code, language)
marker = marker_template.format(len(code_blocks))
code_blocks.append(highlighted)
return f"{leading_space}```=html\n{marker}\n{trailing_space}```"
def save_html_code_block(match):
code = match.group(1)
code_words = len(code.split())
code_lines = len(code.splitlines())
page.words["code"]["lines"] += code_lines
page.words["code"]["words"] += code_words
# HTML code blocks are already highlighted, so keep the whole block
marker = marker_template.format(len(code_blocks))
code_blocks.append(match.group(0))
return marker
# First, replace all Djot code blocks with markers
processed_text = CODE_BLOCK_RE.sub(save_code_block, plaintext)
# Then, replace HTML code blocks with markers
processed_text = HTML_CODE_BLOCK_RE.sub(save_html_code_block, processed_text)
# Count prose words from the processed text
# Remove the markers before counting
text_without_markers = processed_text
for i in range(len(code_blocks)):
marker = marker_template.format(i)
text_without_markers = text_without_markers.replace(marker, "")
# Also remove raw HTML blocks for word counting
RAW_HTML_BLOCK_RE = re.compile(r"```=html\n.*?\n```", re.DOTALL)
text_without_markers = RAW_HTML_BLOCK_RE.sub("", text_without_markers)
# Count words from the cleaned markdown prose
prose_wordcount = len(text_without_markers.split())
if page.category == "references":
page.words["references"] += prose_wordcount
else:
page.words["self"] += prose_wordcount
# Run through jotdown
html = run("jotdown", input=processed_text, text=True, capture_output=True).stdout
# Replace markers with actual highlighted code
for i, code in enumerate(code_blocks):
marker = marker_template.format(i)
html = html.replace(marker, code)
return html
def build_backlinks(documents, site):
logger.info("Building backlinks")
INLINE_LINK_RE = re.compile(
r"\[[^\]]*(?:\[[^\]]*\][^\]]*)*\]\(\/([^)#]*)\)", re.DOTALL
)
FOOTNOTE_LINK_URL_RE = re.compile(r"\[.+?\]:\s\/(.*)", re.DOTALL)
interlink_count = 0
for key, page in documents.items():
if "nobacklinks" in page.options or page.status == "draft":
continue
logger.debug(page.filepath)
text = page.content.get("plain")
# Skip if no main content
if not text:
continue
interlinks = set(documents[key].links["internal"])
combined_refs = INLINE_LINK_RE.findall(text) + FOOTNOTE_LINK_URL_RE.findall(
text
)
for slug in combined_refs:
try:
link_uid = site.slug_to_uid_lookup[slug]
interlinks.add(link_uid)
interlink_count += 1
except KeyError:
if should_ignore_slug(slug):
continue
logger.warning(f"\nKeyError in {page.title} ({key}): {slug}")
documents[key].links["internal"] = sorted(interlinks)
for interlink_key in interlinks:
documents[interlink_key].links["backlinks"].add(key)
"""
TODO: REMOVE SITE.BACKLINKS in favour a 'stats' or 'count' (templates will need updating
"""
site.backlinks += interlink_count
def should_ignore_slug(slug):
return (
slug.startswith(("feeds/", "images/", "$"))
or slug.endswith((".jpg", ".webp", ".png", ".svg", ".pdf", ".gif", ".html"))
or slug in ["publickey", "humans.txt", "build.py", "links.txt"]
)
def build_collections(
documents: Dict[str, DocumentMetadata], site: SiteMetadata
) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]:
logger.info("Building collections")
collections = {
category: []
for category in list(site.categories)
+ list(site.secondaries)
+ list(site.tags)
+ ["everything", "main", "cd68b918-ac5f-4d6c-abb5-a55a0318846b"]
}
sitemap = []
for key, page in sorted(
documents.items(), key=lambda k_v: k_v[1].available, reverse=True
):
if page.status == "draft":
collections["cd68b918-ac5f-4d6c-abb5-a55a0318846b"].append(page)
continue
elif page.status == "hidden":
continue
elif "nofeed" in page.options:
sitemap.append(page)
continue
else:
sitemap.append(page)
collections["everything"].append(page)
collections[page.category].append(page)
collections[page.secondary].append(page)
for tag in page.tags:
collections[tag].append(page)
if page.secondary in [
"essays",
"wandering",
"rambling",
"pearls",
]:
collections["main"].append(page)
return collections, sitemap
def output_html(
assets: Dict[str, AssetMetadata],
documents: Dict[str, DocumentMetadata],
collections: Dict[str, List[Dict[str, Any]]],
site: SiteMetadata,
env: Environment,
output_dir: Path,
) -> None:
"""HTML output with incremental builds."""
incremental_build.output_html_incremental(
assets=assets,
documents=documents,
collections=collections,
site=site,
env=env,
output_dir=output_dir,
)
def output_feeds(collections, site, env, output_dir):
logger.info("Generating Feeds")
feed_list = list(site.categories) + list(site.secondaries) + ["everything", "main"]
for entry in feed_list:
feed = render_feed(entry, collections, site, env)
write_feed(feed, output_dir)
logger.debug(f" {entry} >> {feed['path']}")
output_feed_stylesheet(site, env, output_dir)
def render_feed(feed_name, collections, site, env):
slug = f"feeds/{feed_name}"
feed_path = f"{slug}/index.xml"
template = env.get_template(TEMPLATE_FEED)
feed_content = template.render(
site=site,
slug=slug,
collection=feed_name,
feed=collections[feed_name],
)
return {"name": feed_name, "output": feed_content, "path": feed_path}
def write_feed(feed, output_dir):
feed_path = output_dir / feed["path"]
feed_path.parent.mkdir(parents=True, exist_ok=True)
feed_path.write_text(feed["output"])
def output_link_report(site, link_report_path):
logger.info("Creating plaintext link files")
with open(link_report_path, "w") as file:
for link in sorted(site.links["external"]):
file.write(f"{link}\n")
logger.debug(f" {link_report_path}")
def output_feed_stylesheet(site, env, output_dir):
logger.info("Creating XSL Stylesheet")
template = env.get_template(TEMPLATE_FEED_XSL)
output_path = output_dir / "feed.xsl"
output = template.render(site=site)
output_path.write_text(output)
logger.debug(f" {output_path}")
def output_sitemap(sitemap, site, env, output_dir):
logger.info("Generating Sitemap")
template = env.get_template(TEMPLATE_SITEMAP)
output = template.render(sitemap=sitemap, site=site)
output_path = output_dir / "sitemap.xml"
output_path.write_text(output)
logger.debug(f" {output_path}")
def process_pending_collections(
site: SiteMetadata,
documents: Dict[str, DocumentMetadata],
collections: Dict[str, List[Dict[str, Any]]],
env: Environment,
):
"""
Renders and inserts collections for all documents that used the @collection shortcode.
This runs iteratively to handle nested collections.
"""
logger.info("Processing pending collections")
if not site.docs_with_collections:
logger.info("No collections to process, skipping.")
return
max_passes = 10 # Safeguard against infinite loops
for i in range(max_passes):
replacements_this_pass = 0
# Process all documents that have pending collections
for uid in list(site.docs_with_collections):
page = documents[uid]
if not page.pending_collections:
continue
remaining_pending = []
for pending in page.pending_collections:
params = pending["params"]
placeholder = (
f"COLLECTION-PLACEHOLDER-{pending['placeholder_id']}
" ) if placeholder not in page.content.get("html", ""): continue # 1. Process collection to get final list of items # We pass the current page's UID to handle self-exclusion sliced_items = process_collection(params, documents, collections, uid) # 2. Check for unresolved dependencies. If style is 'body', # we must ensure all sub-collections are rendered first. can_render = True if params.style == "body": for item in sliced_items: if "COLLECTION-PLACEHOLDER" in item.content.get("html", ""): can_render = False break if not can_render: remaining_pending.append(pending) # Keep for the next pass continue # 3. Render collection_template = env.get_template("collection") rendered_html = collection_template.render( site=site, documents=documents, collection=sliced_items, style=params.style, include_title=params.include_title, show_date=params.show_date, numbered=params.numbered, ) # 4. Replace placeholder page.content["html"] = page.content["html"].replace( placeholder, rendered_html ) replacements_this_pass += 1 page.pending_collections = remaining_pending logger.info( f"Collection Pass {i + 1}: {replacements_this_pass} replacements made." ) if replacements_this_pass == 0: logger.info("Collection processing complete.") break else: logger.warning( "Max passes reached for collection processing. " "There might be unresolved collections or a circular dependency." ) # Final check for any unresolved collections unresolved_docs = [ documents[uid].title for uid in site.docs_with_collections if documents[uid].pending_collections ] if unresolved_docs: logger.warning( f"Could not resolve all collections. Unresolved documents: {', '.join(unresolved_docs)}" ) def validate_slug_uniqueness( documents: Dict[str, DocumentMetadata], assets: Dict[str, AssetMetadata] ) -> None: """Validate that all slugs are unique across documents and assets.""" slug_to_items = defaultdict(list) for doc in documents.values(): slug_to_items[doc.slug].append( ("document", doc.uid, doc.title, str(doc.filepath)) ) for asset in assets.values(): slug_to_items[asset.slug].append( ("asset", asset.uid, asset.title, str(asset.filepath)) ) collisions = { slug: items for slug, items in slug_to_items.items() if len(items) > 1 } if collisions: error_lines = ["\n\n ERROR: Duplicate slugs found:"] for slug, items in collisions.items(): for item_type, uid, title, filepath in items: error_lines.extend( [ f" {item_type.capitalize()}: {uid}", f" Title: {title}", f" File: {filepath}", f" Slug: {slug}", "", ] ) error_msg = "\n".join(error_lines) raise ValueError(error_msg) async def main(): # Initialize site and load assets import sys global site site = init_site() # Set up Jinja environment env = setup_jinja_environment() core_stylesheet_content = "" deferred_stylesheet_content = "" stylesheet_dir = Path(config["paths"]["stylesheet_dir"]) core_css_files = config.get("css", {}).get("core_files", []) if stylesheet_dir.is_dir(): css_files = sorted(stylesheet_dir.glob("*.css")) for css_file in css_files: if str(css_file) in core_css_files: core_stylesheet_content += css_file.read_text() + "\n" else: deferred_stylesheet_content += css_file.read_text() + "\n" # Write deferred stylesheet to output output_stylesheet_path = OUTPUT_DIR / "deferred.css" output_stylesheet_path.parent.mkdir(parents=True, exist_ok=True) output_stylesheet_path.write_text(deferred_stylesheet_content) # Assign content and hashes to site context site.data["core_stylesheet"] = core_stylesheet_content site.data["core_stylesheet_minified"] = compress(core_stylesheet_content) if deferred_stylesheet_content: site.data["deferred_stylesheet_hash"] = md5( deferred_stylesheet_content.encode("utf-8") ).hexdigest() # Check for full rebuild flag force_rebuild = "--full-rebuild" in sys.argv if force_rebuild: logger.info("Full rebuild requested, clearing cache") incremental_build.clear_build_cache(OUTPUT_DIR) # Try to load cached state cached_state = None changed_files = None cached_documents = None cached_assets = None file_hashes = {} if not force_rebuild: cached_state = incremental_build.load_state_cache(OUTPUT_DIR) # Get list of content files (needed for change detection and ingestion) file_list = get_files() if cached_state: cached_documents, cached_assets, cached_site, file_hashes = cached_state logger.info("Loaded state from cache") # Identify changed files changed_files, new_file_hashes = incremental_build.get_changed_files( file_list, file_hashes ) file_hashes = new_file_hashes if changed_files: logger.info(f"Detected {len(changed_files)} changed file(s)") else: logger.info("No content files changed") # Restore site metadata from cache where appropriate site.data = cached_site.data else: logger.info("No cache found, performing full build") # Compute file hashes for the cache we'll save at the end _, file_hashes = incremental_build.get_changed_files(file_list, {}) # Load assets (always reload to catch new/changed assets) assets = load_assets() # Process assets await asyncio.to_thread(process_assets, assets, ASSET_DIR, OUTPUT_DIR) # Ingest and process documents (using cache when available) documents = await ingest_documents(site, cached_documents, changed_files) # Validate slug uniqueness validate_slug_uniqueness(documents, assets) # Track which documents need processing changed_uids = set() if changed_files: # Map changed file paths to UIDs for filepath in changed_files: for uid, doc in documents.items(): if doc.filepath == filepath: changed_uids.add(uid) break # Process shortcodes # We need to pass ALL documents/assets to shortcodes for cross-references to work if changed_uids: logger.info(f"Processing shortcodes for {len(changed_uids)} changed document(s)") # Create a modified process_shortcodes call that only processes changed docs from shortcodes.processor import ShortcodeProcessor, ShortcodeError processor = ShortcodeProcessor() base_context = { "documents": documents, # ALL documents for references "assets": assets, # ALL assets for references "site": site, "processor": processor, "env": env, } errors = [] for uid in changed_uids: page = documents[uid] if page.content.get("plain"): page_context = {**base_context, "current_page_uid": uid} try: processed_content = processor.process( page.content["plain"], page_context ) page.content["plain"] = processed_content except Exception as e: errors.append( f"\nERROR: Shortcode error in document\n uid: {uid}\n path: {page.filepath}\n title: {page.title}\n ERROR: {e}\n" ) if errors: import sys for error in errors: print(error, file=sys.stderr) sys.exit(1) elif not cached_state: # No cache, process all logger.info("Processing shortcodes") process_shortcodes(documents, assets, site, env) # Generate HTML only for changed documents if changed_uids: generate_html(documents, changed_uids) elif not cached_state: generate_html(documents) else: # No changes, but still need to aggregate word counts from cached data site.words["self"] = sum( p.words["self"] for p in documents.values() if p.status != "draft" ) site.words["drafts"] = sum( p.words["self"] for p in documents.values() if p.status == "draft" ) site.words["references"] = sum(p.words["references"] for p in documents.values()) site.words["code"]["lines"] = sum( p.words["code"]["lines"] for p in documents.values() ) site.words["code"]["words"] = sum( p.words["code"]["words"] for p in documents.values() ) site.words["total"] = ( site.words["self"] + site.words["drafts"] + site.words["references"] ) # Build backlinks and collections (always rebuild to handle cross-document references) collections, sitemap = build_collections(documents, site) process_pending_collections(site, documents, collections, env) build_backlinks(documents, site) # Attempting to make final order of 'backlinks' deterministic for key, page in documents.items(): # Sort interlinks based on published dates documents[key].links["internal"] = sorted( documents[key].links["internal"], key=lambda x: documents[x].available, reverse=True, # Most recent first ) # Sort backlinks based on published dates documents[key].links["backlinks"] = sorted( documents[key].links["backlinks"], key=lambda x: documents[x].available, reverse=True, # Most recent first ) # Save state cache before output incremental_build.save_state_cache(documents, assets, site, file_hashes, OUTPUT_DIR) # Output HTML, feeds, and sitemap output_html(assets, documents, collections, site, env, OUTPUT_DIR) output_link_report(site, LINK_REPORT_PATH) output_feeds(collections, site, env, OUTPUT_DIR) output_sitemap(sitemap, site, env, OUTPUT_DIR) # Change back to the initial directory os.chdir(INIT_DIR) # Print summary logger.info("Build complete!") logger.info(f"Pages: {site.pagecount}") logger.info(f"Words: {site.words['total']}") logger.info(f"Internal links: {site.backlinks}") logger.info(f"External links: {len(site.links['external'])}") if __name__ == "__main__": try: asyncio.run(main()) except Exception as e: report_error(e, "Site Build Failed")