pynenc_mongo.orchestrator.mongo_orchestrator¶
Module Contents¶
Classes¶
Blocking control for MongoOrchestrator using MongoDB for cross-process invocation dependencies. |
|
A MongoDB-based implementation of the orchestrator for cross-process coordination. |
API¶
- class pynenc_mongo.orchestrator.mongo_orchestrator.MongoBlockingControl(app: pynenc.app.Pynenc, cols: pynenc_mongo.orchestrator.mongo_orchestrator_collections.OrchestratorCollections)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseBlockingControlBlocking control for MongoOrchestrator using MongoDB for cross-process invocation dependencies.
Initialization
- waiting_for_results(caller_invocation_id: InvocationId | None, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]¶
Notifies the system that an invocation is waiting for the results of other invocations.
- release_waiters(waited_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Removes an invocation from the graph, along with any dependencies related to it.
- get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocations that are blocking others but are not themselves waiting for any results.
Ensures each invocation is yielded only once.
- class pynenc_mongo.orchestrator.mongo_orchestrator.MongoOrchestrator(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseOrchestratorA MongoDB-based implementation of the orchestrator for cross-process coordination.
This orchestrator uses MongoDB for persistent storage, suitable for testing process runners. It mirrors the functionality of SQLiteOrchestrator.
Warning
The
MongoOrchestratorclass is designed for testing purposes only and should not be used in production systems. It uses MongoDB for state management.Initialization
- property blocking_control: pynenc.orchestrator.base_orchestrator.BaseBlockingControl¶
Return blocking control.
- _register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Register new invocations with status Registered if they don’t exist yet.
- get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Get existing invocation IDs for a task, optionally filtered by arguments and statuses.
- get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation IDs for a given task ID.
- get_invocation_ids_paginated(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs with pagination support.
Results are ordered by registration time (newest first).
- Parameters:
task_id – Optional task ID to filter by
statuses – Optional statuses to filter by
limit – Maximum number of results to return
offset – Number of results to skip
- Returns:
List of matching invocation IDs
- count_invocations(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) int[source]¶
Count invocations matching the given filters.
- Parameters:
task_id – Optional task ID to filter by
statuses – Optional statuses to filter by
- Returns:
The total count of matching invocations
- get_call_invocation_ids(call_id: pynenc.identifiers.call_id.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation IDs for a given call ID.
- any_non_final_invocations(call_id: str) bool[source]¶
Checks if there are any non-final invocations for a specific call ID.
- _acquire_transition_lock(invocation_id: str) str | None[source]¶
Acquire an exclusive lock on the invocation document for a status transition.
Pushes a unique claim into the transition_lock array. If our claim is first in the array, we hold the lock and may proceed to update the status. Two concurrent pushes are both applied, but only one can be at index 0.
If the existing lock holder’s claim is older than stale_lock_threshold_seconds, it is treated as stale (owner crashed) and cleared before retrying.
- Parameters:
invocation_id – ID of the invocation to lock
- Returns:
The claim ID if lock acquired, None otherwise
- _release_transition_lock(invocation_id: str) None[source]¶
Clear the transition_lock array after a status change.
- _atomic_status_transition(invocation_id: str, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Validate and transition invocation status with array-push locking.
Uses a transition_lock array to guarantee mutual exclusion:
Push a unique claim ID into the transition_lock array
If our claim is first → we hold the lock
Read current status, validate transition, write new status
Clear the lock array
Retries lock acquisition with exponential backoff to handle transient contention (e.g. a concurrent release wiping our claim). Only raises InvocationStatusRaceConditionError after all retries are exhausted.
- Parameters:
invocation_id – The ID of the invocation to update
status – The target status
runner_id – The owner ID for ownership validation
- Returns:
The new status record after successful transition
- Raises:
InvocationStatusRaceConditionError – If lock not acquired after retries
KeyError – If invocation does not exist
- index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]¶
Index invocation arguments for concurrency control.
- set_up_invocation_auto_purge(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Set up invocation for auto-purging by setting the auto_purge_timestamp.
- get_invocation_status_record(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatusRecord[source]¶
Get the current status of an invocation by ID.
- increment_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Increment the retry count for an invocation by ID.
- get_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) int[source]¶
Get the number of retries for an invocation by ID.
- filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Filter invocations by status by ID.
- register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]¶
Register or update runners’ heartbeat timestamp.
- record_atomic_service_execution(runner_id: str, start_time: datetime.datetime, end_time: datetime.datetime) None[source]¶
Record the latest atomic service execution window for a runner.
- _get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]¶
Retrieve runners that are considered active based on heartbeat activity.
A runner is considered “active” if it has sent a heartbeat within the timeout period. This is used for atomic service scheduling to determine which runners are eligible to participate in time slot distribution.
- Parameters:
- Returns:
List of active runners ordered by creation time (oldest first)
- Return type:
list[“ActiveRunnerInfo”]
- get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs stuck in PENDING status beyond the allowed time.
- Returns:
Iterator of invocation IDs that have been pending longer than max_pending_seconds
- _get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs in RUNNING status owned by inactive runners.
An inactive runner is one that hasn’t sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery.