pynenc_mongo.orchestrator.mongo_orchestrator

Module Contents

Classes

MongoBlockingControl

Blocking control for MongoOrchestrator using MongoDB for cross-process invocation dependencies.

MongoOrchestrator

A MongoDB-based implementation of the orchestrator for cross-process coordination.

API

class pynenc_mongo.orchestrator.mongo_orchestrator.MongoBlockingControl(app: pynenc.app.Pynenc, cols: pynenc_mongo.orchestrator.mongo_orchestrator_collections.OrchestratorCollections)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

Blocking control for MongoOrchestrator using MongoDB for cross-process invocation dependencies.

Initialization

waiting_for_results(caller_invocation_id: InvocationId | None, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]

Notifies the system that an invocation is waiting for the results of other invocations.

release_waiters(waited_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Removes an invocation from the graph, along with any dependencies related to it.

get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves invocations that are blocking others but are not themselves waiting for any results.

Ensures each invocation is yielded only once.

class pynenc_mongo.orchestrator.mongo_orchestrator.MongoOrchestrator(app: pynenc.app.Pynenc)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseOrchestrator

A MongoDB-based implementation of the orchestrator for cross-process coordination.

This orchestrator uses MongoDB for persistent storage, suitable for testing process runners. It mirrors the functionality of SQLiteOrchestrator.

Warning

The MongoOrchestrator class is designed for testing purposes only and should not be used in production systems. It uses MongoDB for state management.

Initialization

conf() pynenc_mongo.conf.config_orchestrator.ConfigOrchestratorMongo
property blocking_control: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

Return blocking control.

_register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]

Register new invocations with status Registered if they don’t exist yet.

get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Get existing invocation IDs for a task, optionally filtered by arguments and statuses.

get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves all invocation IDs for a given task ID.

get_invocation_ids_paginated(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs with pagination support.

Results are ordered by registration time (newest first).

Parameters:
  • task_id – Optional task ID to filter by

  • statuses – Optional statuses to filter by

  • limit – Maximum number of results to return

  • offset – Number of results to skip

Returns:

List of matching invocation IDs

count_invocations(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) int[source]

Count invocations matching the given filters.

Parameters:
  • task_id – Optional task ID to filter by

  • statuses – Optional statuses to filter by

Returns:

The total count of matching invocations

get_call_invocation_ids(call_id: pynenc.identifiers.call_id.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves all invocation IDs for a given call ID.

any_non_final_invocations(call_id: str) bool[source]

Checks if there are any non-final invocations for a specific call ID.

_make_claim() str[source]

Create a lock claim string encoding a unique ID and timestamp.

_parse_claim_timestamp(claim: str) float[source]

Extract the timestamp from a lock claim string.

_acquire_transition_lock(invocation_id: str) str | None[source]

Acquire an exclusive lock on the invocation document for a status transition.

Pushes a unique claim into the transition_lock array. If our claim is first in the array, we hold the lock and may proceed to update the status. Two concurrent pushes are both applied, but only one can be at index 0.

If the existing lock holder’s claim is older than stale_lock_threshold_seconds, it is treated as stale (owner crashed) and cleared before retrying.

Parameters:

invocation_id – ID of the invocation to lock

Returns:

The claim ID if lock acquired, None otherwise

_release_transition_lock(invocation_id: str) None[source]

Clear the transition_lock array after a status change.

_atomic_status_transition(invocation_id: str, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]

Validate and transition invocation status with array-push locking.

Uses a transition_lock array to guarantee mutual exclusion:

  1. Push a unique claim ID into the transition_lock array

  2. If our claim is first → we hold the lock

  3. Read current status, validate transition, write new status

  4. Clear the lock array

Retries lock acquisition with exponential backoff to handle transient contention (e.g. a concurrent release wiping our claim). Only raises InvocationStatusRaceConditionError after all retries are exhausted.

Parameters:
  • invocation_id – The ID of the invocation to update

  • status – The target status

  • runner_id – The owner ID for ownership validation

Returns:

The new status record after successful transition

Raises:
  • InvocationStatusRaceConditionError – If lock not acquired after retries

  • KeyError – If invocation does not exist

index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]

Index invocation arguments for concurrency control.

set_up_invocation_auto_purge(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Set up invocation for auto-purging by setting the auto_purge_timestamp.

auto_purge() None[source]

Auto-purge old invocations based on auto_purge_timestamp.

get_invocation_status_record(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatusRecord[source]

Get the current status of an invocation by ID.

increment_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Increment the retry count for an invocation by ID.

get_invocation_retries(invocation_id: pynenc.identifiers.invocation_id.InvocationId) int[source]

Get the number of retries for an invocation by ID.

filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]

Filter invocations by status by ID.

register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]

Register or update runners’ heartbeat timestamp.

Parameters:
  • runner_ids (list[str]) – The list of runner IDs to register.

  • can_run_atomic_service (bool) – Whether these runners are eligible to run atomic services.

record_atomic_service_execution(runner_id: str, start_time: datetime.datetime, end_time: datetime.datetime) None[source]

Record the latest atomic service execution window for a runner.

_get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]

Retrieve runners that are considered active based on heartbeat activity.

A runner is considered “active” if it has sent a heartbeat within the timeout period. This is used for atomic service scheduling to determine which runners are eligible to participate in time slot distribution.

Parameters:
  • timeout_seconds (float) – Heartbeat timeout in seconds (typically from atomic_service_runner_considered_dead_after_minutes config)

  • can_run_atomic_service (bool | None) – If specified, filters runners based on their eligibility to run atomic services

Returns:

List of active runners ordered by creation time (oldest first)

Return type:

list[“ActiveRunnerInfo”]

get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs stuck in PENDING status beyond the allowed time.

Returns:

Iterator of invocation IDs that have been pending longer than max_pending_seconds

_get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs in RUNNING status owned by inactive runners.

An inactive runner is one that hasn’t sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery.

Parameters:

timeout_seconds (float) – Heartbeat timeout in seconds

Returns:

Iterator of invocation IDs that need recovery.

Return type:

Iterator[str]

purge() None[source]

Clear all orchestrator state.