Source code for pynenc_mongo.broker.mongo_broker
from datetime import UTC, datetime
from functools import cached_property
from typing import TYPE_CHECKING
from pymongo import ASCENDING, IndexModel
from pynenc.broker.base_broker import BaseBroker
from pynenc.identifiers.invocation_id import InvocationId
from pynenc_mongo.conf.config_broker import ConfigBrokerMongo
from pynenc_mongo.util.mongo_collections import CollectionSpec, MongoCollections
if TYPE_CHECKING:
from pynenc.app import Pynenc
from pynenc_mongo.conf.config_mongo import ConfigMongo
from pynenc_mongo.util.mongo_client import RetryableCollection
[docs]
class BrokerCollections(MongoCollections):
"""MongoDB collections for the broker message queue."""
def __init__(self, conf: "ConfigMongo", app_id: str):
super().__init__(conf, prefix="broker", app_id=app_id)
@cached_property
def broker_message_queue(self) -> "RetryableCollection":
spec = CollectionSpec(
name="broker_message_queue",
indexes=[IndexModel([("created_at", ASCENDING)])],
)
return self.instantiate_retriable_coll(spec)
[docs]
class MongoBroker(BaseBroker):
"""
A MongoDB-based implementation of the broker for cross-process coordination.
Uses MongoDB for cross-process message queue coordination and implements
all required abstract methods from BaseBroker. Routes invocation IDs through
a persistent FIFO queue stored in MongoDB.
"""
def __init__(self, app: "Pynenc") -> None:
super().__init__(app)
self.cols = BrokerCollections(self.conf, app_id=self.app.app_id)
@cached_property
def conf(self) -> ConfigBrokerMongo:
return ConfigBrokerMongo(
config_values=self.app.config_values,
config_filepath=self.app.config_filepath,
)
[docs]
def route_invocation(self, invocation_id: str) -> None:
"""
Route a single invocation ID to the message queue.
:param invocation_id: The invocation ID to route
"""
self.cols.broker_message_queue.insert_one(
{
"invocation_id": invocation_id,
"created_at": datetime.now(UTC),
}
)
[docs]
def route_invocations(self, invocation_ids: list["InvocationId"]) -> None:
"""
Route multiple invocation IDs to the message queue.
:param invocation_ids: List of invocation IDs to route
"""
if not invocation_ids:
return
documents = [
{
"invocation_id": inv_id,
"created_at": datetime.now(UTC),
}
for inv_id in invocation_ids
]
self.cols.broker_message_queue.insert_many(documents)
[docs]
def retrieve_invocation(self) -> "InvocationId | None":
"""
Atomically retrieve and remove a single invocation ID from the queue.
Ensures that no two processes can retrieve the same invocation.
:return: The next invocation ID in the queue, or None if empty
"""
# Atomically find and delete the oldest message
document = self.cols.broker_message_queue.find_one_and_delete(
{}, sort=[("created_at", 1)]
)
if document:
return InvocationId(document["invocation_id"])
return None
[docs]
def count_invocations(self) -> int:
"""
Count the number of invocations in the queue.
:return: Number of pending invocations
"""
return self.cols.broker_message_queue.count_documents({})
[docs]
def purge(self) -> None:
"""Clear all messages from the queue."""
self.cols.purge_all()