#!/usr/bin/env python
# creator: Silas Jelley
# created: 2020-08-11 09:52:32
# updated: 2024-09-22 15:38:57
# version: 3.0
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "Pillow ",
# "filelock",
# "jinja2",
# "urllib3",
# "pygments",
# "pillow_heif",
# "pillow_avif-plugin",
# ]
# ///
# Imports
from collections import Counter
from dataclasses import dataclass, field, asdict
from hashlib import md5
from pathlib import Path
from shutil import copyfile
from subprocess import run
from typing import List, Dict, Any, Tuple, Set
from typing_extensions import TypedDict
import asyncio
import datetime
import logging
import multiprocessing
import os
import random
import re
import tomllib
from PIL import Image, ImageOps
from filelock import FileLock
from jinja2 import Environment, FileSystemLoader
from pillow_heif import register_heif_opener
import pillow_avif
from pygments import highlight
from pygments.formatters import HtmlFormatter
from pygments.lexer import RegexLexer, bygroups
from pygments.lexers import get_lexer_by_name, guess_lexer
from pygments.token import *
from pygments.util import ClassNotFound
from urllib.parse import urlparse
register_heif_opener()
# Load configuration
with open("config.toml", "rb") as config_file:
config = tomllib.load(config_file)
# Constants
ASSET_DIR = Path(config["paths"]["asset_dir"])
TEMPLATE_DIR = Path(config["paths"]["template_dir"])
STYLESHEET = Path(config["paths"]["stylesheet"])
OUTPUT_DIR = Path(config["paths"]["output_dir"])
NOTES_DIR = Path(config["paths"]["notes_dir"])
TEMPLATE_FEED = config["templates"]["feed"]
TEMPLATE_FEED_XSL = config["templates"]["feed_xsl"]
TEMPLATE_SITEMAP = config["templates"]["sitemap"]
TEMPLATE_DEFAULT = config["templates"]["default"]
INIT_DIR = os.getcwd()
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Dataclasses
@dataclass
class SiteMetadata:
name: str
created: str
url: str
baseurl: str
uid: str
description: str
creator: Dict[str, str]
backlinks: int = 0
words: Dict[str, Any] = field(
default_factory=lambda: {
"self": 0,
"drafts": 0,
"code": {
"lines": 0,
"words": 0,
},
"references": 0,
}
)
links: Dict[str, Any] = field(
default_factory=lambda: {
"internal": list(),
"backlinks": list(),
"external": set(),
}
)
pagecount: int = 0
references: int = 0
categories: Set = field(default_factory=set)
secondaries: Set = field(default_factory=set)
tags: Set = field(default_factory=set)
data: Dict[str, Any] = field(default_factory=dict)
stylesheet_hash: str = ""
slug_to_uid_lookup: Dict[str, str] = field(default_factory=dict)
slug_to_title_lookup: Dict[str, str] = field(default_factory=dict)
class LinksDict(TypedDict):
internal: list[str]
external: list[str]
backlinks: list[str]
@dataclass
class DocumentMetadata:
filepath: Path
uid: str
slug: str
title: str
category: str
secondary: str
available: datetime.datetime
created: datetime.datetime
updated: datetime.datetime
creator: str = ""
note: str = ""
favourite: bool = False
parent: str = ""
description: str = ""
layout: str = TEMPLATE_DEFAULT
source: Dict = field(default_factory=dict)
via: Dict = field(default_factory=dict)
location: Dict[str, Any] = field(
default_factory=lambda: {
"continent": "",
"country": "",
"region": "",
"city": "",
"note": "",
"lat": int,
"lng": int,
}
)
collection: Dict[str, Any] = field(
default_factory=lambda: {
"style": "title",
"order": "chronological",
"include": [],
}
)
attribution: Dict[str, str] = field(
default_factory=lambda: {
"plain": "",
"djot": "",
"html": "",
}
)
media: str = "application/toml"
words: Dict[str, Any] = field(
default_factory=lambda: {
"self": 0,
"code": {
"lines": 0,
"words": 0,
},
"references": 0,
}
)
status: str = ""
links: LinksDict = field(
default_factory=lambda: {
"internal": list(),
"external": list(),
"backlinks": list(),
}
)
options: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
styles: str = ""
content: Dict[str, str] = field(default_factory=dict)
def __post_init__(self):
# Validate links dictionary structure
required_link_types = {"internal", "external", "backlinks"}
if (
not isinstance(self.links, dict)
or set(self.links.keys()) != required_link_types
):
raise ValueError(
f"links must be a dictionary with exactly these keys: {required_link_types}"
)
for key in self.links:
if not isinstance(self.links[key], set):
self.links[key] = set(self.links[key])
@dataclass
class AssetMetadata:
filepath: Path
media: str
uid: str
slug: str
title: str
available: datetime.datetime
available: datetime.datetime
created: datetime.datetime
updated: datetime.datetime
creator: str = ""
note: str = ""
favourite: bool = False
source: Dict = field(default_factory=dict)
via: Dict = field(default_factory=dict)
hash: str = ""
output_width: int = 0
output_height: int = 0
location: Dict[str, Any] = field(
default_factory=lambda: {
"continent": "",
"country": "",
"region": "",
"city": "",
"note": "",
"lat": int,
"lng": int,
}
)
attribution: Dict[str, str] = field(
default_factory=lambda: {
"plain": "",
"djot": "",
"html": "",
}
)
words: Dict[str, Any] = field(
default_factory=lambda: {
"self": 0,
"code": {
"lines": 0,
"words": 0,
},
"references": 0,
}
)
links: LinksDict = field(
default_factory=lambda: {
"internal": list(),
"external": list(),
"backlinks": list(),
}
)
tags: List[str] = field(default_factory=list)
content: Dict[str, str] = field(default_factory=dict)
def __post_init__(self):
# Validate links dictionary structure
required_link_types = {"internal", "external", "backlinks"}
if (
not isinstance(self.links, dict)
or set(self.links.keys()) != required_link_types
):
raise ValueError(
f"links must be a dictionary with exactly these keys: {required_link_types}"
)
for key in self.links:
if not isinstance(self.links[key], set):
self.links[key] = set(self.links[key])
def init_site():
site_config = config["site"]
return SiteMetadata(
name=site_config["name"],
created=site_config["created"],
url=site_config["url"],
baseurl=site_config["baseurl"],
uid=site_config["uid"],
description=site_config["description"],
creator=site_config["creator"],
stylesheet_hash=md5(STYLESHEET.read_bytes()).hexdigest(),
)
def preprocess_asset_metadata(
uid: str, asset_data: Dict[str, Any], manifest_path: Path
) -> Dict[str, Any]:
"""Preprocess asset metadata to ensure it meets AssetMetadata requirements."""
processed = asset_data.copy()
# Handle dates
for date_field in ["created", "updated", "available"]:
if isinstance(processed.get(date_field), str):
processed[date_field] = _parse_date(processed[date_field])
elif isinstance(processed.get(date_field), datetime.datetime):
processed[date_field] = processed[date_field].replace(tzinfo=None)
else:
processed[date_field] = datetime.datetime.now()
# Set required fields with defaults if not present
processed.setdefault("uid", uid)
return processed
def load_assets() -> Dict[str, AssetMetadata]:
"""Load asset manifests and convert them to AssetMetadata instances."""
assets = {}
asset_manifests = list(ASSET_DIR.glob("manifests/*.toml"))
for manifest in asset_manifests:
with open(manifest, "rb") as f:
manifest_data = tomllib.load(f)
for uid, asset_data in manifest_data.items():
try:
processed_data = preprocess_asset_metadata(uid, asset_data, manifest)
processed_data["filepath"] = ASSET_DIR / processed_data["filepath"]
assets[uid] = AssetMetadata(**processed_data)
except Exception as e:
logger.error(
f"Error processing asset {uid}\n{' ' * 26}{manifest}\n{' ' * 26}{str(e)}"
)
continue
return assets
def setup_jinja_environment():
file_loader = FileSystemLoader(TEMPLATE_DIR)
env = Environment(loader=file_loader)
# Add custom filters
env.filters["shuffle"] = lambda seq: random.sample(seq, len(seq))
env.filters["time_local"] = lambda value, format="%-I:%M%p": value.strftime(
format
).lower()
env.filters["year"] = lambda value, format="%Y": value.strftime(format)
env.filters["month"] = lambda value, format="%m": value.strftime(format)
env.filters["day"] = lambda value, format="%d": value.strftime(format)
env.filters["year_month"] = lambda value, format="%Y/%m": value.strftime(format)
env.filters["year_month_day"] = lambda value, format="%Y/%m/%d": value.strftime(
format
)
env.filters["date_long_short_month"] = (
lambda value, format="%b %e, %Y": value.strftime(format)
)
env.filters["datetime_w3c"] = (
lambda value, format="%Y-%m-%dT%H:%M:%S": value.strftime(format)
)
env.filters["date_long_full_month"] = (
lambda value, format="%B %e, %Y": value.strftime(format)
)
env.filters["timedate_long"] = (
lambda value, format="%-I:%M%p %B %e, %Y": value.strftime(format)
)
env.filters["highlight_code"] = highlight_code
return env
def get_files() -> List[Path]:
return [f for f in NOTES_DIR.glob("**/*.md") if "available = " in f.read_text()]
def extract_external_links(text: str, site) -> List:
url_pattern = r"(https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+(?:/[^)\s]*)?)"
matches = re.findall(url_pattern, text)
# Convert to set immediately
external_links = set()
for url in matches:
parsed_url = urlparse(url)
if parsed_url.netloc.lower() != "silasjelley.com":
external_links.add(url)
site.links["external"].add(url)
return sorted(external_links)
async def process_document(
filepath: Path, site: SiteMetadata
) -> Tuple[str, DocumentMetadata]:
"""Process a document file and return its UID and metadata."""
with open(filepath, "rb") as f:
try:
parsed_toml = tomllib.load(f)
except:
logger.error(f"Error while processing document: {filepath}")
import sys
sys.exit(1)
# The UID is now the top-level table name
uid = parsed_toml["uid"]
# Process metadata into DocumentMetadata instance
document = preprocess_metadata(filepath, parsed_toml)
# Extract external links from the plain text content
try:
plain_text = (
document.content.get("plain", "")
+ " "
+ document.source.get("url", "")
+ " "
+ document.via.get("url", "")
)
external_links = extract_external_links(plain_text, site)
document.links["external"] = external_links
except KeyError:
logger.warn(f"KeyError while compiling external links from {document.filepath}")
pass
return uid, document
async def ingest_documents(site: SiteMetadata) -> Dict[str, Any]:
logger.info("Ingesting files")
file_list = get_files()
documents = {}
slug_to_title_lookup = {}
slug_to_uid_lookup = {}
uuid_collision_lookup = []
tasks = [process_document(filepath, site) for filepath in file_list]
results = await asyncio.gather(*tasks)
for uid, doc in results:
documents[uid] = doc
slug_to_title_lookup[doc.slug] = doc.title
slug_to_uid_lookup[doc.slug] = uid
site.categories.add(doc.category)
site.secondaries.add(doc.secondary)
site.tags.update(doc.tags)
uuid_collision_lookup.append(uid)
site.slug_to_uid_lookup = slug_to_uid_lookup
site.slug_to_title_lookup = slug_to_title_lookup
check_uuid_collisions(uuid_collision_lookup)
site.pagecount = len(documents)
logger.info(f"Ingested {site.pagecount} files")
return documents
def process_image_parallel(input_data: Tuple[Path, Path, int, AssetMetadata]) -> None:
workaround_import = pillow_avif.AvifImagePlugin
input_image, output_path, output_width, asset_metadata = input_data
lock_path = output_path.with_suffix(".lock")
lock = FileLock(str(lock_path))
# Define AVIF output path
avif_output_path = output_path.with_suffix(".avif")
# Check if AVIF support is available
avif_available = "AVIF" in Image.SAVE
if output_path.exists() and avif_output_path.exists():
return
try:
with lock:
os.makedirs(output_path.parent, exist_ok=True)
with Image.open(input_image) as im:
original_format = im.format
im = ImageOps.exif_transpose(im)
output_height = int(im.size[1] * (output_width / im.size[0]))
asset_metadata.output_width = output_width
asset_metadata.output_height = output_height
logger.debug(f"Output width parameter: {output_width}")
logger.debug(f"Image size before resize calculation: {im.size}")
output_height = int(im.size[1] * (output_width / im.size[0]))
logger.debug(f"Calculated output height: {output_height}")
with im.resize(
(output_width, output_height), Image.Resampling.LANCZOS
) as output_image:
# Save JPEG version
if (
original_format != "JPEG"
and str(output_path).endswith("jpg")
and output_image.mode in ("RGBA", "P")
):
output_image = output_image.convert("RGB")
output_image.save(output_path, quality=85, optimize=True)
# Save AVIF version only if support is available
if avif_available:
try:
if output_image.mode in ("RGBA", "P"):
avif_image = output_image.convert("RGB")
else:
avif_image = output_image.copy()
avif_image.save(
avif_output_path,
format="AVIF",
quality=60, # Lower quality for better compression, still maintains good visual quality
speed=5, # Slowest speed = best compression (0 is slowest, 10 is fastest)
bits=10, # Use 10-bit color depth for better quality-to-size ratio
compress_level=8, # Highest compression level (range 0-8)
color_space="bt709", # Use YUV BT.709 color space
chroma=0, # 4:4:4 chroma sampling (0=4:4:4, 1=4:2:0, 2=4:2:2)
num_threads=0, # Use all available CPU threads for encoding
)
logger.debug(
f"Processed image: {input_image} -> {output_path} and {avif_output_path}"
)
except Exception as e:
logger.error(
f"Error saving AVIF version of {input_image}: {e}"
)
else:
logger.error(
"AVIF support not available. Skipping AVIF conversion."
)
logger.debug(f"Processed image: {input_image} -> {output_path}")
except OSError as e:
logger.error(f"OS error processing {input_image}: {e}")
except Exception as e:
logger.error(f"Error processing {input_image}: {e}")
finally:
if lock_path.exists():
try:
lock_path.unlink()
except OSError:
pass
def process_assets(
assets: Dict[str, AssetMetadata], asset_dir: Path, output_dir: Path
) -> None:
logger.info("Processing assets")
manifest_images = []
for asset_identifier, asset_metadata in assets.items():
source_path = Path(asset_metadata.filepath)
output_path = output_dir / asset_metadata.slug
os.makedirs(output_path.parent, exist_ok=True)
if not source_path.exists():
raise FileNotFoundError(
f"Missing asset: {asset_identifier} at {source_path}"
)
if source_path.suffix == ".gpx":
with open(source_path, "rb") as file_to_hash:
asset_metadata.hash = md5(file_to_hash.read()).hexdigest()
copyfile(source_path, output_path)
elif output_path.exists():
continue
elif source_path.suffix in (".jpg", ".png", ".heic", ".webp"):
width = 3000 if "PANO" in str(output_path) else 1600
manifest_images.append((source_path, output_path, width, asset_metadata))
else:
copyfile(source_path, output_path)
for asset in list(asset_dir.glob("*")):
if asset.is_file():
output_path = output_dir / asset.relative_to(asset_dir)
os.makedirs(output_path.parent, exist_ok=True)
copyfile(asset, output_path)
with multiprocessing.Pool() as pool:
pool.map(process_image_parallel, manifest_images)
logger.info("Finished processing assets")
def _parse_date(date_str: str) -> datetime.datetime:
"""Parses a date string into a datetime object, handling both date and datetime inputs."""
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S%z").replace(
tzinfo=None
)
except ValueError:
return datetime.datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=None)
def preprocess_metadata(filepath: Path, metadata: Dict[str, Any]) -> DocumentMetadata:
"""Preprocesses metadata for a document and converts it to a DocumentMetadata instance."""
# Create a working copy to avoid modifying the input
processed = metadata.copy()
# Parse date fields
for date_field in ["available", "created", "updated"]:
if isinstance(processed.get(date_field), str):
processed[date_field] = _parse_date(processed[date_field])
elif isinstance(processed.get(date_field), datetime.datetime):
processed[date_field] = processed[date_field].replace(tzinfo=None)
# Set default updated time if not provided
processed.setdefault("updated", processed.get("available"))
# Process source information if present
if "source" in processed:
processed["attribution"] = process_source_information(
processed["source"], processed.get("via", {})
)
else:
processed["attribution"] = {}
processed["source"] = {}
if "via" not in processed:
processed["via"] = {}
# Handle draft status
if processed.get("status") == "draft":
processed["slug"] = f"drafts/{processed['uid']}"
# Add filepath as it's required but comes from function parameter
processed["filepath"] = filepath
# Determine title
processed["title"] = (
processed.get("title")
or processed.get("attribution", {}).get("plain")
or processed["available"].strftime("%B %e, %Y %-I.%M%p")
)
# Create and return DocumentMetadata instance
return DocumentMetadata(**processed)
def process_source_information(source: Dict[str, Any], via) -> Dict[str, str]:
creator = source.get("creator") or source.get("director")
title = source.get("title") or (
" ♫ " + str(source.get("track"))
if source.get("track")
else source.get("description")
)
date = source.get("published") or source.get("year") or source.get("created")
volume = source.get("volume")
chapter = source.get("chapter")
pages = source.get("pages")
url = source.get("url", "")
speaker = source.get("speaker") or source.get("character")
edition = source.get("edition")
publisher = source.get("publisher")
partsplain = []
partsdjot = []
partshtml = []
partsshared = []
partsvia = ""
if speaker:
speaker = f"{speaker} in "
else:
speaker = ""
if edition:
edition = f"{edition} edition"
else:
edition = ""
if creator:
if title:
partsplain.append(f"{creator}, {title}")
partsdjot.append(f"{creator}, {{_{title}_}}")
if url:
partshtml.append(f"{creator}, [{{_{title}_}}]({escape_url(url)})")
else:
partshtml.append(f"{creator}, {{_{title}_}}")
else:
partsplain.append(creator)
partsdjot.append(creator)
if url:
partshtml.append(f"[{creator}]({escape_url(url)})")
else:
partshtml.append(f"{creator}")
elif title:
partsplain.append(title)
partsdjot.append(f"{{_{title}_}}")
if url:
partshtml.append(f"[{{_{title}_}}]({escape_url(url)})")
else:
partshtml.append(f"{{_{title}_}}")
else:
logger.error(f"No creator or title {source}")
if "album" in source:
partsshared.append(source["album"])
if "show" in source:
partsshared.append(source["show"])
if "season" in source:
partsshared.append(f"season {source['season']}")
if "episode" in source:
partsshared.append(f"episode {source['episode']}")
if "publication" in source:
partsshared.append(source["publication"])
if publisher and edition:
partsshared.append(f"{publisher} ({edition})")
elif publisher:
partsshared.append(publisher)
elif edition:
partsshared.append(edition)
if chapter:
partsshared.append(f"Ch. {chapter}")
if volume:
partsshared.append(f"Vol. {volume}")
if pages:
partsshared.append(f"p. {pages}")
if date:
partsshared.append(str(date.year if isinstance(date, datetime.date) else date))
if via:
via_url = via.get("url", "")
if not via_url == "":
partsvia = f" ([via]({escape_url(via['url'])}))"
return {
"plain": f"{speaker}{', '.join(partsplain + partsshared)}",
"djot": f"{speaker}{', '.join(partsdjot + partsshared)}",
"html": format_rich_attribution(
" — " + f"{speaker}{', '.join(partshtml + partsshared) + partsvia}"
),
}
def escape_url(url: str) -> str:
return url.replace(")", "%29")
def format_rich_attribution(attribution: str) -> str:
return run(
"jotdown", input=attribution, text=True, capture_output=True
).stdout.strip()
def check_uuid_collisions(uuid_list):
prefixes = [uuid[:8] for uuid in uuid_list]
if len(set(prefixes)) != len(prefixes):
collisions = [
prefix for prefix, count in Counter(prefixes).items() if count > 1
]
raise ValueError(
f"CRITICAL ERROR: UUID prefix collision for: {', '.join(collisions)}"
)
def insert_substitutions(
documents: Dict[str, DocumentMetadata],
assets: Dict[str, AssetMetadata],
site: SiteMetadata,
) -> None:
logger.info("Performing substitutions")
REF_LINK_RE = re.compile(r"!?\[([^\]]*?)\](\((.*?::)([^)]+)\))")
REF_SLUG_RE = re.compile(r"(? str:
for match in regex.finditer(text):
ref_type, ref_short_id = match.groups()
full_match = match.group(0)
ref_id = next((k for k in merged_data if k.startswith(ref_short_id)), None)
if ref_id:
try:
replacement = f"/{merged_data[ref_id].slug}"
except AttributeError:
replacement = f"/{merged_data[ref_id].slug}"
text = text.replace(full_match, replacement)
return text
def replace_title_references(
text: str, regex: re.Pattern, merged_data: Dict[str, DocumentMetadata]
) -> str:
for match in regex.finditer(text):
opening, ref_type, ref_short_id, comment, closing = match.groups()
full_match = match.group(0)
ref_id = next((k for k in merged_data if k.startswith(ref_short_id)), None)
if ref_id:
replacement = merged_data[ref_id].title
text = text.replace(full_match, replacement)
return text
def replace_cite_references(
text: str, regex: re.Pattern, merged_data: Dict[str, DocumentMetadata]
) -> str:
for match in regex.finditer(text):
opening, ref_type, ref_short_id, comment, closing = match.groups()
full_match = match.group(0)
ref_id = next((k for k in merged_data if k.startswith(ref_short_id)), None)
if ref_id:
replacement = f"[{merged_data[ref_id].attribution['djot']}](/{merged_data[ref_id].slug})"
text = text.replace(full_match, replacement)
return text
def replace_import_references(
text: str,
regex: re.Pattern,
merged_data: Dict[str, DocumentMetadata],
key: str,
page: DocumentMetadata,
) -> str:
for match in regex.finditer(text):
opening, ref_type, ref_short_id, comment, closing = match.groups()
full_match = match.group(0)
ref_id = next((k for k in merged_data if k.startswith(ref_short_id)), None)
if ref_id:
ref_text = merged_data[ref_id].content["plain"]
if ref_type == "import::":
replacement = ref_text
elif ref_type == "aside::":
ref_title = merged_data[ref_id].title
ref_slug = merged_data[ref_id].slug
ref_location = merged_data[ref_id].location
location_string = (
" ⚕ " + ref_location["city"] + ", " + ref_location["country"] or ""
)
replacement = (
f"{{.aside}}\n{':' * 78}\n"
f"{ref_text}\n"
f'``{{=html}}\n"
f"{':' * 78}"
)
else:
raise ValueError(f"Unrecognised reference type: {ref_type}")
if not page.status == "draft":
merged_data[ref_id].links["backlinks"].add(key)
text = text.replace(full_match, replacement)
return text
def process_reference_links(
text: str, regex: re.Pattern, merged_data: Dict[str, Any], key: str
) -> str:
for ref_text_match, _, ref_type, ref_short_id in regex.findall(text):
match = f"[{ref_text_match}]({ref_type}{ref_short_id})"
ref_id = next(
(k for k in merged_data.keys() if k.startswith(ref_short_id)), None
)
if ref_id is None:
logger.error(f"No match found for {ref_short_id}")
if not ref_id:
raise ValueError(
f"Unmatched UUID reference in document {key}: {ref_short_id}"
)
if ref_type not in ["link::", "img::", "video::", "quote::"]:
raise ValueError(
f"Unexpected Internal Reference type '{ref_type}' in document {key}: {match}"
)
ref_text = get_reference_text(ref_text_match, merged_data[ref_id])
ref_slug = f"/{merged_data[ref_id].slug}"
if ref_type == "link::":
try:
# Double quotes within a page title are escaped so that they don't break the HTML 'title' element
ref_title = merged_data[ref_id].title.replace('"', '\\"')
if merged_data[ref_id].category != "references":
ref_title += f" | {merged_data[ref_id].available.strftime('%B %Y')}"
except AttributeError:
ref_title = merged_data[ref_id].title.replace('"', '\\"')
replacement = f'[{ref_text}]({ref_slug}){{title="{ref_title}"}}'
elif ref_type == "img::":
match = f""
if ref_slug.endswith("svg"):
replacement = f'```=html\n\n```'
if ref_slug.endswith("png"):
replacement = (
f"```=html\n"
f"
\n'
f"
\n'
f"
{code}
"
def run_jotdown(plaintext: str, page) -> str:
"""
Modified to handle code blocks with syntax highlighting.
Fixed to properly handle both raw HTML and HTML code blocks.
"""
CODE_BLOCK_RE = re.compile(
r"( *)````*(=html|\s*(?:(\w+)\n))?(.*?)( *)````*", re.DOTALL
)
code_blocks = []
marker_template = "§CODE_BLOCK_{}§"
def save_code_block(match):
leading_space = match.group(1)
raw_html_marker = match.group(2)
language = match.group(3)
code = match.group(4).rstrip()
trailing_space = match.group(5)
code_words = len(code.split())
code_lines = len(code.splitlines())
page.words["code"]["lines"] += code_lines
page.words["code"]["words"] += code_words
site.words["code"]["lines"] += code_lines
site.words["code"]["words"] += code_words
# Remove the wordcount of codeblocks from the prose wordcounts
page.words["self"] -= code_words
site.words["self"] -= code_words
# Check if this is a raw HTML block
if raw_html_marker == "=html":
return f"{leading_space}```=html\n{code}\n{trailing_space}```"
# For all other cases, including 'html' language, highlight the code
highlighted = highlight_code(code, language)
marker = marker_template.format(len(code_blocks))
code_blocks.append(highlighted)
return f"{leading_space}```=html\n{marker}\n{trailing_space}```"
# First, replace all code blocks with markers
processed_text = CODE_BLOCK_RE.sub(save_code_block, plaintext)
"""
TODO: Exclude codeblocks from wordcounts!
"""
# Run through jotdown
html = run("jotdown", input=processed_text, text=True, capture_output=True).stdout
prose_wordcount = len(html.split())
# Replace markers with actual highlighted code
for i, code in enumerate(code_blocks):
marker = marker_template.format(i)
html = html.replace(marker, code)
return html
def build_backlinks(documents, site):
logger.info("Building backlinks")
INLINE_LINK_RE = re.compile(
r"\[[^\]]*(?:\[[^\]]*\][^\]]*)*\]\(\/([^)#]*)\)", re.DOTALL
)
FOOTNOTE_LINK_URL_RE = re.compile(r"\[.+?\]:\s\/(.*)", re.DOTALL)
interlink_count = 0
for key, page in documents.items():
if "nobacklinks" in page.options or page.status == "draft":
continue
logger.debug(page.filepath)
text = page.content.get("plain")
# Skip if no main content
if not text:
continue
interlinks = set(documents[key].links["internal"])
combined_refs = INLINE_LINK_RE.findall(text) + FOOTNOTE_LINK_URL_RE.findall(
text
)
for slug in combined_refs:
try:
link_uid = site.slug_to_uid_lookup[slug]
interlinks.add(link_uid)
interlink_count += 1
except KeyError:
if should_ignore_slug(slug):
continue
logger.warning(f"\nKeyError in {page.title} ({key}): {slug}")
documents[key].links["internal"] = sorted(interlinks)
for interlink_key in interlinks:
documents[interlink_key].links["backlinks"].add(key)
"""
TODO: REMOVE SITE.BACKLINKS in favour a 'stats' or 'count' (templates will need updating
"""
site.backlinks += interlink_count
def should_ignore_slug(slug):
return (
slug.startswith(("feeds/", "images/", "$"))
or slug.endswith((".jpg", ".webp", ".png", ".svg", ".pdf", ".gif", ".html"))
or slug in ["publickey", "humans.txt", "build.py"]
)
def build_collections(
documents: Dict[str, DocumentMetadata], site: SiteMetadata
) -> Tuple[Dict[str, List[Dict[str, Any]]], List[Dict[str, Any]]]:
collections = {
category: []
for category in list(site.categories)
+ list(site.secondaries)
+ list(site.tags)
+ ["everything", "main", "cd68b918-ac5f-4d6c-abb5-a55a0318846b"]
}
sitemap = []
for key, page in sorted(
documents.items(), key=lambda k_v: k_v[1].available, reverse=True
):
if page.status == "draft":
collections["cd68b918-ac5f-4d6c-abb5-a55a0318846b"].append(page)
continue
elif page.status == "hidden":
continue
elif "nofeed" in page.options:
sitemap.append(page)
continue
else:
sitemap.append(page)
collections["everything"].append(page)
collections[page.category].append(page)
collections[page.secondary].append(page)
for tag in page.tags:
collections[tag].append(page)
if page.secondary in [
"essays",
"wandering",
"rambling",
"dialog",
"pearls",
]:
collections["main"].append(page)
return collections, sitemap
def output_html(
assets: Dict[str, AssetMetadata],
documents: Dict[str, DocumentMetadata],
collections: Dict[str, List[Dict[str, Any]]],
site: SiteMetadata,
env: Environment,
output_dir: Path,
) -> None:
logger.info("Generating Hypertext")
for key, page in documents.items():
template_file = page.layout
template = env.get_template(template_file)
collection = build_page_collection(page, collections)
output = template.render(
documents=documents,
assets=assets,
collections=collections,
collection=collection,
page=asdict(page),
site=site,
)
output_path = output_dir / page.slug / "index.html"
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
f.write(output)
logger.debug(f" {page.filepath} >> {output_path}")
def build_page_collection(page, collections):
try:
collection = [
item
for include in page.collection["include"]
for item in collections[include]
]
return sorted(collection, key=lambda x: x.available, reverse=True)
except KeyError:
logger.error(f"Failed collection for {page.filepath}")
return []
def output_feeds(collections, site, env, output_dir):
logger.info("Generating Feeds")
feed_list = list(site.categories) + list(site.secondaries) + ["everything", "main"]
for entry in feed_list:
feed = render_feed(entry, collections, site, env)
write_feed(feed, output_dir)
logger.debug(f" {entry} >> {feed['path']}")
output_feed_stylesheet(site, env, output_dir)
def render_feed(feed_name, collections, site, env):
slug = f"feeds/{feed_name}"
feed_path = f"{slug}/index.xml"
template = env.get_template(TEMPLATE_FEED)
feed_content = template.render(
site=site,
slug=slug,
collection=feed_name,
feed=collections[feed_name],
)
return {"name": feed_name, "output": feed_content, "path": feed_path}
def write_feed(feed, output_dir):
feed_path = output_dir / feed["path"]
feed_path.parent.mkdir(parents=True, exist_ok=True)
feed_path.write_text(feed["output"])
def output_link_report(site, output_dir):
logger.info("Creating plaintext link files")
output_path = output_dir / "links.txt"
with open(output_path, "w") as file:
for link in sorted(site.links["external"]):
file.write(f"{link}\n")
logger.debug(f" {output_path}")
def output_feed_stylesheet(site, env, output_dir):
logger.info("Creating XSL Stylesheet")
template = env.get_template(TEMPLATE_FEED_XSL)
output_path = output_dir / "feed.xsl"
output = template.render(site=site)
output_path.write_text(output)
logger.debug(f" {output_path}")
def output_sitemap(sitemap, site, env, output_dir):
logger.info("Generating Sitemap")
template = env.get_template(TEMPLATE_SITEMAP)
output = template.render(sitemap=sitemap, site=site)
output_path = output_dir / "sitemap.xml"
output_path.write_text(output)
logger.debug(f" {output_path}")
async def main():
# Initialize site and load assets
global site
site = init_site()
assets = load_assets()
# Set up Jinja environment
env = setup_jinja_environment()
# Process assets
await asyncio.to_thread(process_assets, assets, ASSET_DIR, OUTPUT_DIR)
# Ingest and process documents
documents = await ingest_documents(site)
insert_substitutions(documents, assets, site)
generate_html(documents)
# Build backlinks and collections
build_backlinks(documents, site)
collections, sitemap = build_collections(documents, site)
# Attempting to make final order of 'backlinks' deterministic
for key, page in documents.items():
# Sort interlinks based on published dates
documents[key].links["internal"] = sorted(
documents[key].links["internal"],
key=lambda x: documents[x].available,
reverse=True, # Most recent first
)
# Sort backlinks based on published dates
documents[key].links["backlinks"] = sorted(
documents[key].links["backlinks"],
key=lambda x: documents[x].available,
reverse=True, # Most recent first
)
# Output HTML, feeds, and sitemap
output_html(assets, documents, collections, site, env, OUTPUT_DIR)
output_link_report(site, OUTPUT_DIR)
output_feeds(collections, site, env, OUTPUT_DIR)
output_sitemap(sitemap, site, env, OUTPUT_DIR)
# Change back to the initial directory
os.chdir(INIT_DIR)
# Print summary
logger.info("Build complete!")
logger.info(f"Pages: {site.pagecount}")
logger.info(f"Words: {site.words['total']}")
logger.info(f"Internal links: {site.backlinks}")
logger.info(f"External links: {len(site.links['external'])}")
if __name__ == "__main__":
asyncio.run(main())