Source code for pynenc_mongo.util.mongo_chunk_data

"""MongoDB utilities for storing large data exceeding BSON limits.

Compresses data with zlib and splits across multiple chunk documents when needed.
Chunk keys: "{invocation_id}:{key_prefix}[:{item}]"
"""

import json
import logging
import zlib
from enum import StrEnum, auto
from typing import TYPE_CHECKING, Any

from bson import BSON
from pymongo import ASCENDING

if TYPE_CHECKING:
    from pynenc_mongo.util.mongo_client import RetryableCollection

logger = logging.getLogger(__name__)


# ============================================================================
# Compression and chunking utilities
# ============================================================================


[docs] def _compress(data: str) -> bytes: """Compress a string using zlib (level 6 for speed/ratio balance).""" return zlib.compress(data.encode("utf-8"), level=6)
[docs] def _decompress(data: bytes) -> str: """Decompress zlib-compressed bytes back to a string.""" return zlib.decompress(data).decode("utf-8")
[docs] def _split_into_chunks(data: bytes, chunk_size: int) -> list[bytes]: """Split bytes into ordered chunks of at most chunk_size bytes.""" return [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
[docs] def _reassemble_chunks(chunks: list[bytes]) -> bytes: """Reassemble ordered chunks into the original bytes.""" return b"".join(chunks)
[docs] def _exceeds_bson_threshold(data: dict, threshold: int) -> bool: """Check if a dict exceeds the size threshold for a single BSON document.""" encoded = BSON.encode(data) return len(encoded) > threshold
# ============================================================================ # Storage metadata keys # ============================================================================
[docs] class StorageKey(StrEnum): """Keys used in storage metadata documents.""" CHUNKED = auto() # List of keys for individually chunked values INLINE = auto() # Dict of inline values (small enough to embed) ALL_CHUNKED = auto() # Boolean flag indicating entire dict was chunked
# ============================================================================ # Public API # ============================================================================
[docs] def build_chunk_key(**parts: str) -> str: """Build base chunk key from named parts.""" return ":".join(str(v) for v in parts.values() if v)
# ============================================================================ # Internal chunk storage helpers # ============================================================================
[docs] def _chunk_key(base_key: str, item: str = "") -> str: """Build chunk_key for MongoDB storage.""" return f"{base_key}:{item}" if item else base_key
[docs] def _store_chunks( coll: "RetryableCollection", key: str, data: str, chunk_size: int ) -> int: """Compress, split, and store data as MongoDB chunk documents.""" coll.delete_many({"chunk_key": key}) # Remove old chunks compressed = _compress(data) chunks = _split_into_chunks(compressed, chunk_size) if chunks: coll.insert_many( [{"chunk_key": key, "seq": i, "data": c} for i, c in enumerate(chunks)], ordered=True, ) logger.debug("Stored %d bytes as %d chunks for %s", len(data), len(chunks), key) return len(chunks)
[docs] def _retrieve_chunks(coll: "RetryableCollection", key: str) -> str: """Retrieve, reassemble, and decompress chunked data from MongoDB.""" cursor = ( coll.find({"chunk_key": key}, {"data": 1, "_id": 0}) .sort("seq", ASCENDING) .hint([("chunk_key", ASCENDING), ("seq", ASCENDING)]) ) chunks = [doc["data"] for doc in cursor] if not chunks: raise KeyError(f"No chunks for {key}") return _decompress(_reassemble_chunks(chunks))
# ============================================================================ # Main storage API # ============================================================================
[docs] def prepare_chunk_storage( coll: "RetryableCollection", base_key: str, data: dict[str, str], threshold: int, ) -> dict[str, Any]: """Prepare data for storage, chunking if it exceeds threshold.""" chunk_size = threshold # Use threshold as chunk size # Fast path: all data fits inline if not _exceeds_bson_threshold(data, threshold): return {StorageKey.INLINE.value: data} # Separate oversized items and chunk them individually chunked_keys = [] inline = {} for k, v in data.items(): if _exceeds_bson_threshold({k: v}, threshold): _store_chunks(coll, _chunk_key(base_key, k), v, chunk_size) chunked_keys.append(k) else: inline[k] = v storage = {StorageKey.INLINE.value: inline, StorageKey.CHUNKED.value: chunked_keys} # If metadata itself is too large, chunk entire dict if _exceeds_bson_threshold(storage, threshold): _store_chunks(coll, base_key, json.dumps(data), chunk_size) logger.warning("Chunked entire dict for %s (%d items)", base_key, len(data)) return {StorageKey.ALL_CHUNKED.value: True} logger.debug( "Chunked %s: %d inline, %d separate", base_key, len(inline), len(chunked_keys) ) return storage
[docs] def retrieve_chunk_storage( coll: "RetryableCollection", base_key: str, storage_doc: dict[str, Any], ) -> dict[str, str]: """Retrieve data from storage, decompressing chunks if needed.""" # Handle entire dict was chunked if storage_doc.get(StorageKey.ALL_CHUNKED.value): json_str = _retrieve_chunks(coll, base_key) return json.loads(json_str) # Combine inline items with individually chunked items result = dict(storage_doc.get(StorageKey.INLINE.value, {})) for k in storage_doc.get(StorageKey.CHUNKED.value, []): result[k] = _retrieve_chunks(coll, _chunk_key(base_key, k)) return result