Source code for discord_dumper.dumper

"""PArse websocket."""

import base64
import logging
import pathlib
import re
import shutil
import zlib
from concurrent.futures import ProcessPoolExecutor
from typing import Any, Dict, Optional, Union
from urllib.error import HTTPError
from urllib.parse import parse_qs, unquote, urlencode, urlparse, urlunparse
from urllib.request import Request, urlopen

import orjson

AUTHORIZED_EXTENSIONS = (
    ".webp",
    ".png",
    ".jpeg",
    ".jpeg",
    ".svg",
    ".ico",
    ".gif",
)
logger = logging.getLogger(__name__)
ALPHANUM = re.compile(r"[^a-z0-9]+")
USER_AGENT = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    "AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/126.0.0.0 Safari/537.36"
)
OK = 200


class SecurityError(Exception):
    pass


def get_image_path(url: str) -> pathlib.Path:
    """Get image paths."""
    image_root = pathlib.Path("images").resolve()
    parsed_url = urlparse(url)
    url_path = unquote(parsed_url.path).lstrip("/")
    path = image_root / url_path
    if parsed_url.query:
        path = path.parent / unquote(parsed_url.query) / path.name
    path = path.resolve()
    if image_root not in path.parents:
        error_message = (
            f"Attempted path traversal with {url!r} " f"resulting at {path!r}"
        )
        raise SecurityError(error_message)
    if path.suffix not in AUTHORIZED_EXTENSIONS:
        error_message = (
            f"Invalid extension for {url!r} resulting as {path.suffix}"
        )
        raise SecurityError(error_message)
    return pathlib.Path("images") / path.relative_to(image_root)


def fetch_and_save_image(url: str) -> int:
    """FEtch and save url."""
    req = Request(url)  # noqa: S310
    req.add_header("User-Agent", USER_AGENT)
    try:
        with urlopen(req) as res:  # noqa: S310
            path = pathlib.Path("discord") / get_image_path(url)
            path.parent.mkdir(parents=True, exist_ok=True)
            with path.open("wb") as output:
                shutil.copyfileobj(res, output)
                return res.status  # type: ignore[no-any-return]
    except HTTPError as e:
        return getattr(e, "status", -1)


[docs]def dump( # noqa: C901, PLR0912, PLR0915 path: Union[str, pathlib.Path], *, images: Optional[bool] = None, pretty: Optional[bool] = None, fetch: Optional[bool] = None, ) -> None: """Dump file.""" if images is None: images = True if pretty is None: pretty = True if fetch is None: fetch = False options: Dict[str, Any] = {} if pretty: options["option"] = orjson.OPT_INDENT_2 root = pathlib.Path("discord") shutil.rmtree(root, ignore_errors=True) content = pathlib.Path(path).read_text("utf-8").strip() obj = orjson.loads(content) inflator = zlib.decompressobj() events = [] urls = [] for entry in obj["log"]["entries"]: url: str = entry["request"]["url"] if ( url == "wss://gateway.discord.gg/?encoding=json&v=9&compress=zlib-stream" ): pathlib.Path("discord.json") for message in entry["_webSocketMessages"]: message_data = message["data"] if message["type"] != "send": data = base64.b64decode(message_data) message_data = orjson.loads(inflator.decompress(data)) else: message_data = orjson.loads(message_data) events.append(message_data) elif entry["response"]["content"]["mimeType"].startswith("image/"): if not url.startswith("https://cdn.discordapp.com/"): continue image_path = root / get_image_path(url) text = entry["response"]["content"].get("text", None) if text is None: logger.warning("No data for %s", url) continue if entry["response"]["content"].get("encoding") == "base64": data = base64.b64decode(text) else: data = text.encode("utf-8") image_path.parent.mkdir(parents=True, exist_ok=True) image_path.write_bytes(data) if fetch: parsed_url = urlparse(url) if not parsed_url.path.startswith("/badge-icons/"): query = parse_qs(parsed_url.query) query["size"] = ["4096"] for key, value in query.items(): if len(value) == 1: query[key] = value[0] # type: ignore[assignment] parts = list(parsed_url) parts[4] = urlencode(query) size_url = urlunparse(parts) urls.append(size_url) if fetch: with ProcessPoolExecutor() as pool: for url, status in zip(urls, pool.map(fetch_and_save_image, urls)): if status == OK: logger.debug("Fetched %s", url) else: logger.warning("Error %d on %s", status, url) events_path = root / pathlib.Path("events.json") events_path.write_bytes(orjson.dumps(events, **options)) for event in events: if event.get("t") == "READY": for key in ( "guilds", "users", "private_channels", ): for obj in event["d"][key]: obj_path = root / key / f'{obj["id"]}.json' obj_path.parent.mkdir(exist_ok=True, parents=True) obj_path.write_bytes(orjson.dumps(obj, **options)) for obj in event["d"]["connected_accounts"]: name = f'{obj["type"]}/{obj["id"]}.json' obj_path = root / "connected_accounts" / name obj_path.parent.mkdir(exist_ok=True, parents=True) obj_path.write_bytes(orjson.dumps(obj, **options)) logger.warning( "Don't send your files without verification, " "they can contains you credentials.", )