Source code for ploneintranet.notifications.adapters.message_handler

# -*- coding: utf-8 -*-
from plone import api


[docs]class Base(object): """ A base handler for notifications """ msg_class = "GLOBAL_NOTICE" def __init__(self, context): self.context = context try: self.tool = api.portal.get_tool("ploneintranet_notifications") except api.exc.InvalidParameterError: self.tool = None
[docs] def _for_each_user(self, func): for user in api.user.get_users(): yield func(user)
[docs] def g_add(self, message): def outer_add(tool, message): def inner_add(user): tool.append_to_user_queue(user.getUserId(), message.clone()) return inner_add return self._for_each_user(outer_add(self.tool, message))
[docs] def add(self, message): if not self.tool: # we are most likely not installed return assert ( message.predicate == self.msg_class ), "This handler is not responsible for this message, wrong predicate" list(self.g_add(message))
[docs] def g_cleanup(self): def outer_cleanup(tool): def inner_cleanup(user): queue = tool.get_user_queue(user.getUserId()) if not queue: return queue = tool._users[user.getUserId()] for key in reversed(queue.keys()): if queue[key].obj["read"]: queue.pop(key) return inner_cleanup return self._for_each_user(outer_cleanup(self.tool))
[docs] def cleanup(self): list(self.g_cleanup())