from collections.abc import Iterable
from datetime import UTC, datetime, timedelta
from functools import cached_property
from typing import TYPE_CHECKING, Any, Optional
from pymongo.errors import DuplicateKeyError
from pymongo.operations import ReplaceOne
from pynenc.identifiers.task_id import TaskId
from pynenc.models.trigger_definition_dto import TriggerDefinitionDTO
from pynenc.trigger.base_trigger import BaseTrigger
from pynenc.trigger.conditions import (
CompositeLogic,
ConditionContext,
TriggerCondition,
ValidCondition,
)
from pynenc_mongo.conf.config_trigger import ConfigTriggerMongo
from pynenc_mongo.trigger.mongo_trigger_collections import TriggerCollections
if TYPE_CHECKING:
from pynenc.app import Pynenc
[docs]
class MongoTrigger(BaseTrigger):
"""
MongoDB-based implementation of the Pynenc trigger system.
Stores all trigger, condition, and claim data in MongoDB for cross-process safety.
"""
def __init__(self, app: "Pynenc") -> None:
super().__init__(app)
self.cols = TriggerCollections(self.conf, app_id=self.app.app_id)
@cached_property
def conf(self) -> ConfigTriggerMongo:
return ConfigTriggerMongo(
config_values=self.app.config_values,
config_filepath=self.app.config_filepath,
)
[docs]
def _register_condition(self, condition: TriggerCondition) -> None:
self.cols.trg_conditions.replace_one(
{"condition_id": condition.condition_id},
{
"condition_id": condition.condition_id,
"condition_json": condition.to_json(self.app),
"last_cron_execution": None,
},
upsert=True,
)
[docs]
def get_condition(self, condition_id: str) -> TriggerCondition | None:
doc = self.cols.trg_conditions.find_one({"condition_id": condition_id})
if doc:
return TriggerCondition.from_json(doc["condition_json"], self.app)
return None
[docs]
def register_trigger(self, trigger: "TriggerDefinitionDTO") -> None:
self.cols.trg_triggers.insert_or_ignore(
{
"trigger_id": trigger.trigger_id,
"task_id_key": trigger.task_id.key,
"condition_ids": trigger.condition_ids,
"logic_value": trigger.logic.value,
"argument_provider_json": trigger.argument_provider_json,
}
)
for condition_id in trigger.condition_ids:
self.cols.trg_condition_triggers.insert_or_ignore(
{"condition_id": condition_id, "trigger_id": trigger.trigger_id}
)
[docs]
def _get_trigger(self, trigger_id: str) -> Optional["TriggerDefinitionDTO"]:
doc = self.cols.trg_triggers.find_one({"trigger_id": trigger_id})
if doc:
return self._parse_trigger_dto(doc)
return None
[docs]
def _parse_trigger_dto(
self, trigger_dict: dict[str, Any]
) -> "TriggerDefinitionDTO":
return TriggerDefinitionDTO(
trigger_id=trigger_dict["trigger_id"],
task_id=TaskId.from_key(trigger_dict["task_id_key"]),
condition_ids=trigger_dict["condition_ids"],
logic=CompositeLogic(trigger_dict["logic_value"]),
argument_provider_json=trigger_dict.get("argument_provider_json"),
)
[docs]
def get_triggers_for_condition(
self, condition_id: str
) -> list["TriggerDefinitionDTO"]:
trigger_docs = list(
self.cols.trg_triggers.find({"condition_ids": condition_id})
)
return [self._parse_trigger_dto(doc) for doc in trigger_docs]
[docs]
def record_valid_condition(self, valid_condition: ValidCondition) -> None:
self.cols.trg_valid_conditions.insert_or_ignore(
{
"valid_condition_id": valid_condition.valid_condition_id,
"valid_condition_json": valid_condition.to_json(self.app),
}
)
[docs]
def record_valid_conditions(self, valid_conditions: list[ValidCondition]) -> None:
if not valid_conditions:
return
bulk_ops = [
ReplaceOne(
{"valid_condition_id": vc.valid_condition_id},
{
"valid_condition_id": vc.valid_condition_id,
"valid_condition_json": vc.to_json(self.app),
},
upsert=True,
)
for vc in valid_conditions
]
self.cols.trg_valid_conditions.bulk_write(bulk_ops)
[docs]
def get_valid_conditions(self) -> dict[str, ValidCondition]:
conditions = {}
for doc in self.cols.trg_valid_conditions.find():
vc = ValidCondition.from_json(doc["valid_condition_json"], self.app)
conditions[doc["valid_condition_id"]] = vc
return conditions
[docs]
def clear_valid_conditions(self, conditions: Iterable[ValidCondition]) -> None:
ids_to_delete = [c.valid_condition_id for c in conditions]
if ids_to_delete:
self.cols.trg_valid_conditions.delete_many(
{"valid_condition_id": {"$in": ids_to_delete}}
)
[docs]
def _get_all_conditions(self) -> list[TriggerCondition]:
conditions = []
for doc in self.cols.trg_conditions.find():
conditions.append(
TriggerCondition.from_json(doc["condition_json"], self.app)
)
return conditions
[docs]
def get_last_cron_execution(self, condition_id: str) -> datetime | None:
"""
Get the last execution time for a cron condition.
:param condition_id: ID of the condition to check
:return: Last execution time in UTC, or None if never executed
"""
doc = self.cols.trg_conditions.find_one({"condition_id": condition_id})
if doc and doc.get("last_cron_execution"):
dt = doc["last_cron_execution"]
# Ensure datetime is UTC-aware
if dt.tzinfo is None:
# Naive datetime - assume it's UTC and make it aware
return dt.replace(tzinfo=UTC)
else:
# Already aware - convert to UTC
return dt.astimezone(UTC)
return None
[docs]
def store_last_cron_execution(
self,
condition_id: str,
execution_time: datetime,
expected_last_execution: datetime | None = None,
) -> bool:
"""
Store the last execution time for a cron condition with optimistic locking.
:param condition_id: ID of the condition
:param execution_time: Time of execution in UTC
:param expected_last_execution: Expected current value for optimistic locking
:return: True if update succeeded, False if another process won the race
"""
filter_doc: dict = {"condition_id": condition_id}
if expected_last_execution is not None:
# Ensure expected_last_execution is UTC-aware for comparison
if expected_last_execution.tzinfo is None:
expected_last_execution = expected_last_execution.replace(tzinfo=UTC)
else:
expected_last_execution = expected_last_execution.astimezone(UTC)
filter_doc["last_cron_execution"] = expected_last_execution
else:
filter_doc["$or"] = [
{"last_cron_execution": None},
{"last_cron_execution": {"$exists": False}},
]
# Ensure execution_time is UTC-aware
if execution_time.tzinfo is None:
execution_time = execution_time.replace(tzinfo=UTC)
else:
execution_time = execution_time.astimezone(UTC)
result = self.cols.trg_conditions.update_one(
filter_doc, {"$set": {"last_cron_execution": execution_time}}
)
return result.modified_count > 0
[docs]
def _register_source_task_condition(
self, task_id: "TaskId", condition_id: str
) -> None:
self.cols.trg_source_task_conditions.insert_or_ignore(
{"task_id_key": task_id.key, "condition_id": condition_id}
)
[docs]
def get_conditions_sourced_from_task(
self, task_id: "TaskId", context_type: type[ConditionContext] | None = None
) -> list[TriggerCondition]:
condition_ids = [
doc["condition_id"]
for doc in self.cols.trg_source_task_conditions.find(
{"task_id_key": task_id.key}
)
]
conditions = [self.get_condition(cid) for cid in condition_ids]
conditions = [c for c in conditions if c]
if context_type is not None:
conditions = [c for c in conditions if c.context_type == context_type]
return conditions
[docs]
def claim_trigger_execution(
self, trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60
) -> bool:
claim_key = f"{trigger_id}:{valid_condition_id}"
now = datetime.now(UTC)
expiration = now + timedelta(seconds=expiration_seconds)
try:
self.cols.trg_execution_claims._collection.find_one_and_update(
{
"claim_key": claim_key,
"$or": [
{"expiration": {"$lte": now}},
{"expiration": {"$exists": False}},
],
},
{"$set": {"expiration": expiration, "claimed_at": now}},
upsert=True,
)
return True
except DuplicateKeyError:
# Another worker claimed it concurrently
return False
except Exception as e:
# Log other errors but treat as claim failure
self.app.logger.error(f"Claim failed for {claim_key}: {e}")
return False
[docs]
def claim_trigger_run(
self, trigger_run_id: str, expiration_seconds: int = 60
) -> bool:
now = datetime.now(UTC)
expiration = now + timedelta(seconds=expiration_seconds)
try:
self.cols.trg_trigger_run_claims._collection.find_one_and_update(
{
"trigger_run_id": trigger_run_id,
"$or": [
{"expiration": {"$lte": now}},
{"expiration": {"$exists": False}},
],
},
{"$set": {"expiration": expiration, "claimed_at": now}},
upsert=True,
)
return True
except DuplicateKeyError:
return False
except Exception as e:
self.app.logger.error(f"Claim failed for {trigger_run_id}: {e}")
return False
[docs]
def clean_task_trigger_definitions(self, task_id: "TaskId") -> None:
trigger_docs = self.cols.trg_triggers.find(
{"trigger_json": {"$regex": f'"task_id_key": "{task_id.key}"'}}
)
trigger_ids = [doc["trigger_id"] for doc in trigger_docs]
if trigger_ids:
self.cols.trg_triggers.delete_many({"trigger_id": {"$in": trigger_ids}})
self.cols.trg_condition_triggers.delete_many(
{"trigger_id": {"$in": trigger_ids}}
)
[docs]
def _purge(self) -> None:
self.cols.purge_all()