pynenc_mongo.trigger.mongo_trigger¶
Module Contents¶
Classes¶
MongoDB-based implementation of the Pynenc trigger system. |
API¶
- class pynenc_mongo.trigger.mongo_trigger.MongoTrigger(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.trigger.base_trigger.BaseTriggerMongoDB-based implementation of the Pynenc trigger system.
Stores all trigger, condition, and claim data in MongoDB for cross-process safety.
Initialization
Initialize the trigger component with the parent application.
- Parameters:
app – The Pynenc application instance
- register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]¶
- _get_trigger(trigger_id: str) Optional[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
- _parse_trigger_dto(trigger_dict: dict[str, Any]) pynenc.models.trigger_definition_dto.TriggerDefinitionDTO[source]¶
- get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
- record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]¶
- clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]¶
- get_last_cron_execution(condition_id: str) datetime.datetime | None[source]¶
Get the last execution time for a cron condition.
- Parameters:
condition_id – ID of the condition to check
- Returns:
Last execution time in UTC, or None if never executed
- store_last_cron_execution(condition_id: str, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]¶
Store the last execution time for a cron condition with optimistic locking.
- Parameters:
condition_id – ID of the condition
execution_time – Time of execution in UTC
expected_last_execution – Expected current value for optimistic locking
- Returns:
True if update succeeded, False if another process won the race
- _register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: str) None[source]¶
- get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]¶
- claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]¶
- clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]¶