pynenc_mongo.state_backend.mongo_state_backend¶
Module Contents¶
Classes¶
Key prefixes for different data types in chunk storage. |
|
A MongoDB-based implementation of the state backend for cross-process testing. |
Data¶
API¶
- pynenc_mongo.state_backend.mongo_state_backend.logger¶
‘getLogger(…)’
- class pynenc_mongo.state_backend.mongo_state_backend.ChunkPrefix[source]¶
Bases:
enum.StrEnumKey prefixes for different data types in chunk storage.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- ARGS¶
‘auto(…)’
- RESULT¶
‘auto(…)’
- EXCEPTION¶
‘auto(…)’
- class pynenc_mongo.state_backend.mongo_state_backend.MongoStateBackend(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.state_backend.base_state_backend.BaseStateBackend[pynenc.types.Params,pynenc.types.Result]A MongoDB-based implementation of the state backend for cross-process testing.
Stores invocation data, history, results, and exceptions in MongoDB collections, allowing state sharing between processes. Suitable for testing process runners.
Warning
The
MongoStateBackendclass is designed for testing purposes only and should not be used in production systems. It uses MongoDB for state management.Initialization
- store_app_info(app_info: pynenc.app_info.AppInfo) None[source]¶
Store app info.
- get_app_info() pynenc.app_info.AppInfo[source]¶
Retrieve app info for the current app.
- static discover_app_infos() dict[str, pynenc.app_info.AppInfo][source]¶
Retrieve all app information by scanning all app-prefixed collections.
- store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]¶
Store a workflow run for tracking and monitoring.
- _upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]¶
Store invocations with automatic argument chunking when needed.
- _get_invocation(invocation_id: str) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]¶
Retrieve invocation, reconstructing arguments from chunks if needed.
- _add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]¶
Adds the same history record for a list of invocations.
- _get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]¶
Retrieves the history of an invocation ordered by timestamp.
MongoDB (and mongomock) only sort datetimes to millisecond precision, so if multiple records share the same millisecond, their order may be wrong. We re-sort in Python for full microsecond precision, but keep the DB sort for efficiency.
- iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]¶
Iterate over invocation IDs that have history within time range.
Uses MongoDB’s native cursor with batch_size for efficient pagination.
- Parameters:
start_time – Start of time range (inclusive)
end_time – End of time range (inclusive)
batch_size – Number of invocation IDs per batch
- Returns:
Iterator yielding batches of invocation IDs
- iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]¶
Iterate over history entries within time range.
Uses MongoDB’s native cursor with batch_size for efficient iteration.
- Parameters:
start_time – Start of time range (inclusive)
end_time – End of time range (inclusive)
batch_size – Number of history entries per batch
- Returns:
Iterator yielding batches of InvocationHistory objects
- _get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieve result, decompressing from chunks if needed.
- _set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]¶
Store result, chunking if it exceeds BSON limits.
- _get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieve exception, decompressing from chunks if needed.
- _set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]¶
Store exception, chunking if it exceeds BSON limits.
- set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]¶
Set workflow data.
- get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]¶
Get workflow data.
- get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]¶
Retrieve all workflow types.
- get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve all stored workflows.
- get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow runs for a specific workflow type.
- store_workflow_sub_invocation(parent_workflow_id: pynenc.identifiers.invocation_id.InvocationId, sub_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Store workflow sub-invocation relationship.
- get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Get workflow sub-invocations.
- get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Return IDs of all invocations directly spawned by the given parent.
Queries the invocations collection using the indexed
parent_invocation_idfield for efficient family-tree traversal.- Parameters:
parent_invocation_id – The invocation ID to find children for.
- Returns:
Iterator of child invocation IDs (may be empty).
- _store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Store a runner context in MongoDB.
- Parameters:
runner_context (RunnerContext) – The context to store
- _get_runner_context(runner_id: str) RunnerContext | None[source]¶
Retrieve a runner context by runner_id from MongoDB.
- Parameters:
runner_id (str) – The runner’s unique identifier
- Returns:
The stored RunnerContext or None if not found
- _get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]¶
Retrieve multiple runner contexts by their IDs.
- get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]¶
Search runner contexts by partial ID match.
Performs a regex search on runner_id for flexible matching.
- Parameters:
partial_id – Partial ID string to match (case-sensitive)
- Returns:
Iterator of matching RunnerContext objects
- get_invocation_ids_by_workflow(workflow_id: str | None = None, workflow_type_key: str | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs filtered by workflow criteria.
Returns invocations matching the specified workflow filters. At least one filter should be provided.
- Parameters:
workflow_id – Optional workflow ID to filter by
workflow_type_key – Optional workflow type key to filter by
- Returns:
Iterator of matching InvocationId objects