from functools import cached_property
from typing import TYPE_CHECKING
from pymongo import ASCENDING, IndexModel
from pynenc_mongo.util.mongo_collections import CollectionSpec, MongoCollections
if TYPE_CHECKING:
from pynenc_mongo.conf.config_mongo import ConfigMongo
from pynenc_mongo.util.mongo_client import RetryableCollection
# Suffix used for app_info collections, used by discover_app_infos to scan all apps
APP_INFO_COLLECTION_SUFFIX = "state_backend_app_info"
[docs]
class StateBackendCollections(MongoCollections):
"""Collections specific to MongoStateBackend with prefix state_backend_."""
def __init__(self, conf: "ConfigMongo", app_id: str):
super().__init__(conf, prefix="state_backend_", app_id=app_id)
@cached_property
def state_backend_results(self) -> "RetryableCollection":
spec = CollectionSpec(
name="state_backend_results",
indexes=[
IndexModel([("invocation_id", ASCENDING)], unique=True),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_exceptions(self) -> "RetryableCollection":
spec = CollectionSpec(
name="state_backend_exceptions",
indexes=[
IndexModel([("invocation_id", ASCENDING)], unique=True),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_invocations(self) -> "RetryableCollection":
spec = CollectionSpec(
name="state_backend_invocations",
indexes=[
IndexModel([("invocation_id", ASCENDING)], unique=True),
IndexModel([("parent_invocation_id", ASCENDING)]),
IndexModel([("workflow_id", ASCENDING)]),
IndexModel([("workflow_type_key", ASCENDING)]),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_history(self) -> "RetryableCollection":
spec = CollectionSpec(
name="state_backend_history",
indexes=[
IndexModel(
[
("invocation_id", ASCENDING),
("history_timestamp", ASCENDING),
("history_status", ASCENDING),
],
unique=True,
),
# Add index for time-range queries
IndexModel([("history_timestamp", ASCENDING)]),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_workflows(self) -> "RetryableCollection":
spec = CollectionSpec(
name="state_backend_workflows",
indexes=[
IndexModel([("workflow_id", ASCENDING)], unique=True),
IndexModel([("workflow_type", ASCENDING)]),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_app_info(self) -> "RetryableCollection":
spec = CollectionSpec(
name=APP_INFO_COLLECTION_SUFFIX,
indexes=[
IndexModel([("app_id", ASCENDING)], unique=True),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_workflow_data(self) -> "RetryableCollection":
spec = CollectionSpec(
name="state_backend_workflow_data",
indexes=[
IndexModel(
[("workflow_id", ASCENDING), ("data_key", ASCENDING)], unique=True
),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_workflow_sub_invocations(self) -> "RetryableCollection":
spec = CollectionSpec(
name="state_backend_workflow_sub_invocations",
indexes=[
IndexModel(
[
("parent_workflow_id", ASCENDING),
("sub_invocation_id", ASCENDING),
],
unique=True,
),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_runner_contexts(self) -> "RetryableCollection":
spec = CollectionSpec(
name="state_backend_runner_contexts",
indexes=[
IndexModel([("runner_id", ASCENDING)], unique=True),
],
)
return self.instantiate_retriable_coll(spec)
@cached_property
def state_backend_chunks(self) -> "RetryableCollection":
"""Collection for storing chunked data exceeding BSON limits.
Chunk documents structure:
- chunk_key: "{invocation_id}:{data_type}:{item_key}"
- seq: 0-based sequence number for chunk ordering
- data: binary compressed chunk payload
"""
spec = CollectionSpec(
name="state_backend_chunks",
indexes=[
# Primary index for retrieval (ordered by seq for reassembly)
IndexModel([("chunk_key", ASCENDING), ("seq", ASCENDING)], unique=True),
# Secondary index for deletion operations
IndexModel([("chunk_key", ASCENDING)]),
],
)
return self.instantiate_retriable_coll(spec)