pynenc_mongo.state_backend.mongo_state_backend

Module Contents

Classes

ChunkPrefix

Key prefixes for different data types in chunk storage.

MongoStateBackend

A MongoDB-based implementation of the state backend for cross-process testing.

Data

API

pynenc_mongo.state_backend.mongo_state_backend.logger

‘getLogger(…)’

class pynenc_mongo.state_backend.mongo_state_backend.ChunkPrefix[source]

Bases: enum.StrEnum

Key prefixes for different data types in chunk storage.

Initialization

Initialize self. See help(type(self)) for accurate signature.

ARGS

‘auto(…)’

RESULT

‘auto(…)’

EXCEPTION

‘auto(…)’

class pynenc_mongo.state_backend.mongo_state_backend.MongoStateBackend(app: pynenc.app.Pynenc)[source]

Bases: pynenc.state_backend.base_state_backend.BaseStateBackend[pynenc.types.Params, pynenc.types.Result]

A MongoDB-based implementation of the state backend for cross-process testing.

Stores invocation data, history, results, and exceptions in MongoDB collections, allowing state sharing between processes. Suitable for testing process runners.

Warning

The MongoStateBackend class is designed for testing purposes only and should not be used in production systems. It uses MongoDB for state management.

Initialization

conf() pynenc_mongo.conf.config_state_backend.ConfigStateBackendMongo
store_app_info(app_info: pynenc.app_info.AppInfo) None[source]

Store app info.

get_app_info() pynenc.app_info.AppInfo[source]

Retrieve app info for the current app.

static discover_app_infos() dict[str, pynenc.app_info.AppInfo][source]

Retrieve all app information by scanning all app-prefixed collections.

store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]

Store a workflow run for tracking and monitoring.

_upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]

Store invocations with automatic argument chunking when needed.

_get_invocation(invocation_id: str) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]

Retrieve invocation, reconstructing arguments from chunks if needed.

_add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]

Adds the same history record for a list of invocations.

_get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]

Retrieves the history of an invocation ordered by timestamp.

MongoDB (and mongomock) only sort datetimes to millisecond precision, so if multiple records share the same millisecond, their order may be wrong. We re-sort in Python for full microsecond precision, but keep the DB sort for efficiency.

iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]

Iterate over invocation IDs that have history within time range.

Uses MongoDB’s native cursor with batch_size for efficient pagination.

Parameters:
  • start_time – Start of time range (inclusive)

  • end_time – End of time range (inclusive)

  • batch_size – Number of invocation IDs per batch

Returns:

Iterator yielding batches of invocation IDs

iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]

Iterate over history entries within time range.

Uses MongoDB’s native cursor with batch_size for efficient iteration.

Parameters:
  • start_time – Start of time range (inclusive)

  • end_time – End of time range (inclusive)

  • batch_size – Number of history entries per batch

Returns:

Iterator yielding batches of InvocationHistory objects

_get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieve result, decompressing from chunks if needed.

_set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]

Store result, chunking if it exceeds BSON limits.

_get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieve exception, decompressing from chunks if needed.

_set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]

Store exception, chunking if it exceeds BSON limits.

set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]

Set workflow data.

get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]

Get workflow data.

get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]

Retrieve all workflow types.

get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve all stored workflows.

get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow runs for a specific workflow type.

store_workflow_sub_invocation(parent_workflow_id: pynenc.identifiers.invocation_id.InvocationId, sub_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Store workflow sub-invocation relationship.

get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Get workflow sub-invocations.

get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Return IDs of all invocations directly spawned by the given parent.

Queries the invocations collection using the indexed parent_invocation_id field for efficient family-tree traversal.

Parameters:

parent_invocation_id – The invocation ID to find children for.

Returns:

Iterator of child invocation IDs (may be empty).

_store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]

Store a runner context in MongoDB.

Parameters:

runner_context (RunnerContext) – The context to store

_get_runner_context(runner_id: str) RunnerContext | None[source]

Retrieve a runner context by runner_id from MongoDB.

Parameters:

runner_id (str) – The runner’s unique identifier

Returns:

The stored RunnerContext or None if not found

_get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]

Retrieve multiple runner contexts by their IDs.

Parameters:

runner_ids (list[str]) – List of runner unique identifiers

Returns:

list[“RunnerContext”] of the stored RunnerContexts

get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]

Search runner contexts by partial ID match.

Performs a regex search on runner_id for flexible matching.

Parameters:

partial_id – Partial ID string to match (case-sensitive)

Returns:

Iterator of matching RunnerContext objects

get_invocation_ids_by_workflow(workflow_id: str | None = None, workflow_type_key: str | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs filtered by workflow criteria.

Returns invocations matching the specified workflow filters. At least one filter should be provided.

Parameters:
  • workflow_id – Optional workflow ID to filter by

  • workflow_type_key – Optional workflow type key to filter by

Returns:

Iterator of matching InvocationId objects

purge() None[source]

Clear all state backend data including chunked storage.