Scraping your posts from BlueSky with Python

Inspired by my writing in public everyday for a month, which often draws from things I’ve read or watched, Kyle wants to get into the habit of writing a little something about the things he reads/watches/etc.

BlueSky came up as an option for a character limited, low-barrier, write-once platform, and while I’m broadly averse to centralised platforms1 I recognise the appeal of simple UX and broad reach.

POSSE (Publish (on your) Own Site, Syndicate Elsewhere) is one avenue for satisfying the tension between sovereignty and syndication, but if the goal is low-friction then perhaps the reverse — publishing elsewhere and then preserving on your own site — is a less complicated flow.

I toyed around with a simple Python program for pulling posts into my own document system, using Anthropic’s Claude 3.5 Sonnet to knock together the rough shape of the scraper, and then tweaking and expanding it to my liking.

The code is included at the end, but it may not be useful to you as is. So, in no particular order, and with no promises of completeness, here are some of the assumptions I’ve made,

Still todo:

The complete script,

#!/usr/bin/env python3

import requests
import json
from datetime import datetime
from typing import List, Dict, Optional, Tuple
import tomlkit
import uuid
from pathlib import Path
import argparse

class BlueSkyPostScraper:
def __init__(self, identifier: str, password: str, output_dir: str = "posts"):
"""
Initialize the BlueSky scraper with your credentials.
"""
self.base_url = "https://bsky.social/xrpc"
self.session = requests.Session()
self.auth_token = None
self.did = None
self.output_dir = Path(output_dir)
self.media_dir = self.output_dir / "media"
self.output_dir.mkdir(parents=True, exist_ok=True)
self.media_dir.mkdir(parents=True, exist_ok=True)
self.tracking_file = self.output_dir / "downloaded_posts.json"
self.downloaded_posts = self._load_tracking()
self._login(identifier, password)
self.username = identifier

def download_media(self, image_info: Dict) -> Optional[Tuple[str, str]]:
"""
Download media from BlueSky and return the local path.
Returns (filename, mime_type) tuple if successful, None if failed.
"""
try:
# Extract image URL and type
blob = image_info.get("blob", {})
ref = blob.get("ref", {})
link = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={ref.get('$link', '')}"
mime_type = blob.get("mimeType", "")

if not mime_type.startswith("image/"):
return None

# Download the image
response = self.session.get(link)
if response.status_code != 200:
return None

# Create filename based on content hash
content_hash = hashlib.sha256(response.content).hexdigest()[:16]
ext = mime_type.split("/")[-1]
filename = f"{content_hash}.{ext}"
filepath = self.media_dir / filename

# Save the image
with open(filepath, "wb") as f:
f.write(response.content)

return filename, mime_type
except Exception as e:
print(f"Failed to download media: {e}")
return None

def create_toml_doc(
self, post: Dict, creator_name: str, media_files: List[Tuple[str, str]] = None
) -> tomlkit.document:
"""Create a TOML document for a post."""
doc = tomlkit.document()

# Convert UTC datetime string to local datetime object
utc_dt = datetime.fromisoformat(post["created_at"].replace("Z", "+00:00"))
local_dt = utc_dt.astimezone()

# Basic metadata
doc["uid"] = str(uuid.uuid4())
doc["creator"] = creator_name
doc["primary"] = "journal"
doc["secondary"] = "nonsense"
doc["created"] = local_dt
doc["slug"] = self.create_slug(local_dt)

# Content section
content = tomlkit.table()
content["plain"] = "\n" + post["message"] + "\n"

# Add media information if present
if media_files:
media = tomlkit.array()
for filename, mime_type in media_files:
media_item = tomlkit.inline_table()
media_item["filename"] = filename
media_item["type"] = mime_type
media.append(media_item)
content["media"] = media

doc["content"] = content
return doc

def get_posts(self, limit: int = 100) -> List[Dict]:
"""Fetch your posts from BlueSky."""
posts = []
cursor = None

while len(posts) < limit:
params = {"actor": self.did, "limit": min(100, limit - len(posts))}
if cursor:
params["cursor"] = cursor

response = self.session.get(
f"{self.base_url}/app.bsky.feed.getAuthorFeed", params=params
)

if response.status_code != 200:
break

data = response.json()
feed_items = data.get("feed", [])

if not feed_items:
break

for item in feed_items:
post = item["post"]
post_id = post["uri"]

# Skip if we've already downloaded this post
if post_id in self.downloaded_posts:
continue

# Extract post data including embedded media
post_data = {
"message": post["record"].get("text", ""),
"created_at": post["record"].get("createdAt", ""),
"embed": post["record"].get("embed", {}),
}
posts.append((post_id, post_data))

cursor = data.get("cursor")
if not cursor:
break

return posts

def save_posts_as_toml(self, posts: List[tuple], creator_name: str) -> None:
"""Save posts as individual TOML files."""
for post_id, post_data in posts:
# Handle media if present
media_files = []
embed = post_data.get("embed", {})

# Handle images
if embed.get("$type") == "app.bsky.embed.images":
for image in embed.get("images", []):
result = self.download_media(image)
if result:
media_files.append(result)

# Create TOML document with media information
doc = self.create_toml_doc(post_data, creator_name, media_files)

# Create filename based on created date
created_dt = datetime.fromisoformat(
post_data["created_at"].replace("Z", "+00:00")
)
filename = f"{created_dt.strftime('%Y%m%d_%H%M%S')}.toml"
filepath = self.output_dir / filename

# Save TOML file
with open(filepath, "w", encoding="utf-8") as f:
tomlkit.dump(doc, f)

# Mark post as downloaded
self.downloaded_posts.add(post_id)

# Save tracking file
self._save_tracking()

def _load_tracking(self) -> set:
"""Load the set of previously downloaded post IDs."""
if self.tracking_file.exists():
with open(self.tracking_file, "r") as f:
return set(json.load(f))
return set()

def _save_tracking(self) -> None:
"""Save the set of downloaded post IDs."""
with open(self.tracking_file, "w") as f:
json.dump(list(self.downloaded_posts), f)

def _login(self, identifier: str, password: str) -> None:
"""Authenticate with BlueSky."""
auth_response = self.session.post(
f"{self.base_url}/com.atproto.server.createSession",
json={"identifier": identifier, "password": password},
)
if auth_response.status_code != 200:
raise Exception("Authentication failed")

auth_data = auth_response.json()
self.auth_token = auth_data["accessJwt"]
self.did = auth_data["did"]
self.session.headers.update({"Authorization": f"Bearer {self.auth_token}"})

def create_slug(self, dt: datetime) -> str:
"""Create a slug from a datetime object."""
return dt.strftime("%Y/%m/%d/%H%M%S")


def main():
parser = argparse.ArgumentParser(
description="Scrape BlueSky posts and save as TOML files"
)
parser.add_argument("identifier", help="Your BlueSky handle or email")
parser.add_argument("password", help="Your BlueSky password")
parser.add_argument(
"--creator", help="Creator name for TOML files (defaults to identifier)"
)
parser.add_argument(
"--limit",
type=int,
default=500,
help="Maximum number of posts to retrieve (default: 500)",
)
parser.add_argument(
"--output-dir",
default="posts",
help="Directory to store TOML files (default: posts)",
)

args = parser.parse_args()
creator_name = args.creator if args.creator else args.identifier

try:
scraper = BlueSkyPostScraper(args.identifier, args.password, args.output_dir)
posts = scraper.get_posts(limit=args.limit)
scraper.save_posts_as_toml(posts, creator_name)
print(f"Successfully scraped {len(posts)} new posts")
except Exception as e:
print(f"Error: {str(e)}")


if __name__ == "__main__":
main()

  1. And before someone pipes up with “BlueSky/AT-protocol is decentralised!”, no, it isn’t. The corporation behind it claims it is, but so far it only aspires to be. If you’re curious, you can read more here.↩︎