Source code for pynenc_mongo.orchestrator.mongo_orchestrator_collections

from functools import cached_property
from typing import TYPE_CHECKING

from pymongo import ASCENDING, IndexModel

from pynenc_mongo.util.mongo_collections import CollectionSpec, MongoCollections

if TYPE_CHECKING:
    from pynenc_mongo.conf.config_mongo import ConfigMongo
    from pynenc_mongo.util.mongo_client import RetryableCollection


[docs] class OrchestratorCollections(MongoCollections): """Collections specific to MongoOrchestrator with prefix orchestrator_.""" def __init__(self, conf: "ConfigMongo", app_id: str): super().__init__(conf, prefix="orchestrator_", app_id=app_id) @cached_property def orchestrator_invocations(self) -> "RetryableCollection": spec = CollectionSpec( name="orchestrator_invocations", indexes=[ IndexModel([("invocation_id", ASCENDING)], unique=True), IndexModel([("task_id", ASCENDING)]), IndexModel([("call_id", ASCENDING)]), IndexModel([("status", ASCENDING)]), IndexModel([("auto_purge_timestamp", ASCENDING)]), IndexModel([("status_runner_id", ASCENDING)]), ], ) return self.instantiate_retriable_coll(spec) @cached_property def orchestrator_invocation_args(self) -> "RetryableCollection": spec = CollectionSpec( name="orchestrator_invocation_args", indexes=[ IndexModel( [("invocation_id", ASCENDING), ("arg_key", ASCENDING)], unique=True ), IndexModel([("arg_key", ASCENDING), ("arg_value", ASCENDING)]), ], ) return self.instantiate_retriable_coll(spec) @cached_property def orchestrator_blocking_edges(self) -> "RetryableCollection": spec = CollectionSpec( name="orchestrator_blocking_edges", indexes=[ IndexModel( [("waiter_id", ASCENDING), ("waited_id", ASCENDING)], unique=True ), IndexModel([("waited_id", ASCENDING)]), IndexModel([("waiter_id", ASCENDING)]), ], ) return self.instantiate_retriable_coll(spec) @cached_property def orchestrator_runner_heartbeats(self) -> "RetryableCollection": spec = CollectionSpec( name="orchestrator_runner_heartbeats", indexes=[ IndexModel([("runner_id", ASCENDING)], unique=True), IndexModel([("last_heartbeat", ASCENDING)]), IndexModel([("creation_timestamp", ASCENDING)]), ], ) return self.instantiate_retriable_coll(spec)