pynenc_mongo.trigger.mongo_trigger

Module Contents

Classes

MongoTrigger

MongoDB-based implementation of the Pynenc trigger system.

API

class pynenc_mongo.trigger.mongo_trigger.MongoTrigger(app: pynenc.app.Pynenc)[source]

Bases: pynenc.trigger.base_trigger.BaseTrigger

MongoDB-based implementation of the Pynenc trigger system.

Stores all trigger, condition, and claim data in MongoDB for cross-process safety.

Initialization

Initialize the trigger component with the parent application.

Parameters:

app – The Pynenc application instance

conf() pynenc_mongo.conf.config_trigger.ConfigTriggerMongo
_register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]
get_condition(condition_id: str) pynenc.trigger.conditions.TriggerCondition | None[source]
register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]
_get_trigger(trigger_id: str) Optional[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]
_parse_trigger_dto(trigger_dict: dict[str, Any]) pynenc.models.trigger_definition_dto.TriggerDefinitionDTO[source]
get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]
record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]
record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]
get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]
clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]
_get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]
get_last_cron_execution(condition_id: str) datetime.datetime | None[source]

Get the last execution time for a cron condition.

Parameters:

condition_id – ID of the condition to check

Returns:

Last execution time in UTC, or None if never executed

store_last_cron_execution(condition_id: str, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]

Store the last execution time for a cron condition with optimistic locking.

Parameters:
  • condition_id – ID of the condition

  • execution_time – Time of execution in UTC

  • expected_last_execution – Expected current value for optimistic locking

Returns:

True if update succeeded, False if another process won the race

_register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: str) None[source]
get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]
claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]
claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]
clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]
_purge() None[source]