Source code for ploneintranet.microblog.statuscontainer

# coding=utf-8
from AccessControl import Unauthorized
from Acquisition import Explicit
from BTrees import LLBTree
from BTrees import LOBTree
from BTrees import OOBTree
from interfaces import IMicroblogContext
from interfaces import IStatusContainer
from interfaces import IStatusUpdate
from persistent import Persistent
from plone import api
from plone.memoize import ram
from plone.protect.utils import safeWrite
from plone.uuid.interfaces import IUUID
from Products.CMFCore.utils import getToolByName
from Testing.makerequest import makerequest
from utils import longkeysortreverse
from zope.component.hooks import getSite
from zope.component.hooks import setSite
from zope.container.contained import ObjectAddedEvent
from zope.event import notify
from zope.globalrequest import getRequest
from zope.interface import implements

import logging
import math
import Queue
import sys
import threading
import time
import transaction
import Zope2


logger = logging.getLogger("ploneintranet.microblog")

LOCK = threading.RLock()
STATUSQUEUE = Queue.PriorityQueue()


[docs]def cache_key(method, self): """ Used as ramcache key for the expensive and frequently used allowed_status_keys() results. - cache per user - until a new update is inserted - for maximally 1 minute The short time interval is needed in case the user's workspace memberships change - this should invalidate the cache but we're not listening to that event directly. One minute on the other hand is enough to cache the results for multiple calls during a single page rendering request - which should take seconds rather than a minute, but real life can be slow (especially if the cache expires, which has dramatic effects...) memoize.ram automatically garbage collects the cache after 24 hours. """ try: member = api.user.get_current() except api.exc.CannotGetPortalError: # getSite() fails in integration tests, disable caching raise ram.DontCache return ( member.id, self._mtime, # last add in milliseconds (self = statuscontainer) self._ctime, # last delete in microseconds time.time() // 60, ) # for one minute
[docs]def getZope2App(*args, **kwargs): """Gets the Zope2 app. Copied almost verbatim from collective.celery """ if Zope2.bobo_application is None: orig_argv = sys.argv sys.argv = [""] res = Zope2.app(*args, **kwargs) sys.argv = orig_argv return res # should set bobo_application # man, freaking zope2 is weird return Zope2.bobo_application(*args, **kwargs)
[docs]class BaseStatusContainer(Persistent, Explicit): """This implements IStatusUpdate storage, indexing and query logic. This is just a base class, the actual IStorageContainer used in the implementation is the QueuedStatusContainer defined below. StatusUpdates are stored in the private _status_mapping BTree. A subset of BTree accessors are exposed, see interfaces.py. StatusUpdates are keyed by longint microsecond ids. Additionally, StatusUpdates are indexed by users and tags. These indexes use the same longint microsecond IStatusUpdate.id. Special user_* prefixed accessors take an extra argument 'users', an interable of userids, and return IStatusUpdate keys, instances or items filtered by userids, in addition to the normal min/max statusid filters. For backward compatibility sake, microblog_context is indexed as _uuid_mapping and accessors are called .context_*. This microblog_context is the security context for statusupdates. The newer content_context is indexed as _content_mapping and accessors are called .content_*. This has no security impact and is only a convenience for per-document accessors. """ implements(IStatusContainer) dontcache = False # used to disable cache during deletion def __init__(self, context=None): # last add in milliseconds - used in both async and cache key self._mtime = 0 # cache buster in MICROseconds - used in cache key only self._ctime = 0 # primary storage: (long statusid) -> (object IStatusUpdate) self._status_mapping = LOBTree.LOBTree() # archive deleted: (long statusid) -> (object IStatusUpdate) self._status_archive = LOBTree.LOBTree() # index by user: (string userid) -> (object TreeSet(long statusid)) self._user_mapping = OOBTree.OOBTree() # index by tag: (string tag) -> (object TreeSet(long statusid)) self._tag_mapping = OOBTree.OOBTree() # index by microblog_context: # (string UUID) -> (object TreeSet(long statusid)) self._uuid_mapping = OOBTree.OOBTree() # keep old name for backcompat # index by content_context: # (string UUID) -> (object TreeSet(long statusid)) self._content_uuid_mapping = OOBTree.OOBTree() # index by thread (long threadid) -> (object TreeSet(long statusid)) self._threadid_mapping = OOBTree.OOBTree() # index by mentions (string UUID) -> (object TreeSet(long statusid)) self._mentions_mapping = OOBTree.OOBTree() # index all content updates: (object TreeSet(long statusid)) self._is_content_mapping = LLBTree.LLTreeSet() # index all human updates: (object TreeSet(long statusid)) self._is_human_mapping = LLBTree.LLTreeSet()
[docs] def add(self, status): self._check_status(status) self._check_add_permission(status) self._store(status)
[docs] def _store(self, status): # see ZODB/Btree/Interfaces.py # If the key was already in the collection, there is no change while not self._status_mapping.insert(status.id, status): status.id += 1 self._idx_user(status) self._idx_tag(status) self._idx_microblog_context(status) self._idx_content_context(status) self._idx_threadid(status) self._idx_mentions(status) self._idx_is_content(status) self._idx_is_human(status) self._notify_add(status) # the _store() method is shared between Base and Async # putting counter updates here ensures maximal consistency self._update_mtime() # millisec for async scheduler self._update_ctime() # microsecond granularity cache busting
[docs] def _check_status(self, status): if not IStatusUpdate.providedBy(status): raise ValueError("IStatusUpdate interface not provided.")
[docs] def _notify_add(self, status): event = ObjectAddedEvent(status, newParent=self, newName=status.id) notify(event)
[docs] def delete(self, id, restricted=True): status = self._get(id) # bypass view permission check # thread expansion and batch delete can run into eachother if not status: return # delete permission check only original, not thread cascade if restricted: # content_removed handler runs unrestricted self._check_delete_permission(status) thread_mapping = self._threadid_mapping.get(id) if thread_mapping: # list() avoids RuntimeError # mapping includes current parent id automatically to_delete = list(thread_mapping) else: to_delete = [id] for xid in to_delete: # thread expansion and batch delete can run into eachother if not self._get(xid): continue # cannot reproduce #379 but better safe than sorry try: self._unidx(xid) deleted = self._status_mapping.pop(xid) self._status_archive.insert(xid, deleted) # this would be the right place to notify deletion logger.info( "%s archived statusupdate %s", api.user.get_current().id, xid ) except AttributeError: logger.error( "%s failed to archive statusupdate %s", api.user.get_current().id, xid, ) self._update_ctime() # purge cache
[docs] def _unidx(self, id): status = self._get(id) # bypass view permission check self._unidx_user(status) self._unidx_tag(status) self._unidx_microblog_context(status) self._unidx_content_context(status) self._unidx_threadid(status) self._unidx_mentions(status) self._unidx_is_content(status) self._unidx_is_human(status)
# --- INDEXES ---
[docs] def _idx_user(self, status): userid = unicode(status.userid) # If the key was already in the collection, there is no change # create user treeset if not already present self._user_mapping.insert(userid, LLBTree.LLTreeSet()) # add status id to user treeset self._user_mapping[userid].insert(status.id)
[docs] def _unidx_user(self, status): self._user_mapping[status.userid].remove(status.id)
[docs] def _idx_tag(self, status): """ Update the `StatusContainer` tag index with any new tags :param status: a `StatusUpdate` object """ if not status.tags: return for tag in [tag.decode("utf-8") for tag in status.tags]: # If the key was already in the collection, there is no change # create tag treeset if not already present self._tag_mapping.insert(tag, LLBTree.LLTreeSet()) # add status id to tag treeset self._tag_mapping[tag].insert(status.id)
[docs] def _unidx_tag(self, status): if not status.tags: return for tag in [tag.decode("utf-8") for tag in status.tags]: self._tag_mapping[tag].remove(status.id)
[docs] def _idx_microblog_context(self, status): uuid = status._microblog_context_uuid if not uuid: return # If the key was already in the collection, there is no change # create tag treeset if not already present self._uuid_mapping.insert(uuid, LLBTree.LLTreeSet()) self._uuid_mapping[uuid].insert(status.id)
[docs] def _unidx_microblog_context(self, status): uuid = status._microblog_context_uuid if not uuid: return self._uuid_mapping[uuid].remove(status.id)
[docs] def _idx_content_context(self, status): uuid = status._content_context_uuid if not uuid: return # If the key was already in the collection, there is no change # create tag treeset if not already present self._content_uuid_mapping.insert(uuid, LLBTree.LLTreeSet()) self._content_uuid_mapping[uuid].insert(status.id)
[docs] def _unidx_content_context(self, status): uuid = status._content_context_uuid if not uuid: return self._content_uuid_mapping[uuid].remove(status.id)
[docs] def _idx_threadid(self, status): if not getattr(status, "thread_id", False): return thread_id = status.thread_id if thread_id: # If the key was already in the collection, there is no change # create tag treeset if not already present self._threadid_mapping.insert(thread_id, LLBTree.LLTreeSet()) self._threadid_mapping[thread_id].insert(status.id) # Make sure thread_id is also in the mapping self._threadid_mapping[thread_id].insert(thread_id)
[docs] def _unidx_threadid(self, status): if not getattr(status, "thread_id", False): return thread_id = status.thread_id if thread_id: self._threadid_mapping[thread_id].remove(status.id)
[docs] def _idx_mentions(self, status): if not getattr(status, "mentions", False): return mentions = status.mentions.keys() for mention in mentions: # If the key was already in the collection, there is no change # create tag treeset if not already present self._mentions_mapping.insert(mention, LLBTree.LLTreeSet()) self._mentions_mapping[mention].insert(status.id)
[docs] def _unidx_mentions(self, status): if not getattr(status, "mentions", False): return mentions = status.mentions.keys() for mention in mentions: self._mentions_mapping[mention].remove(status.id)
[docs] def _idx_is_content(self, status): if status.is_content_update: self._is_content_mapping.insert(status.id)
[docs] def _unidx_is_content(self, status): # try/except is more robust than status.is_content_update attr check try: self._is_content_mapping.remove(status.id) except KeyError: pass
[docs] def _idx_is_human(self, status): if status.is_human_update: self._is_human_mapping.insert(status.id)
[docs] def _unidx_is_human(self, status): # try/except is more robust than status.is_human_update attr check try: self._is_human_mapping.remove(status.id) except KeyError: pass
[docs] def clear(self): self._user_mapping.clear() self._tag_mapping.clear() self._uuid_mapping.clear() self._content_uuid_mapping.clear() self._threadid_mapping.clear() self._mentions_mapping.clear() return self._status_mapping.clear()
# --- WRITE SECURITY ---
[docs] def _check_add_permission(self, status): permission = "Plone Social: Add Microblog Status Update" # can also throw statusupdate._uuid2context() uuid lookup Unauthorized if not api.user.has_permission( permission, obj=status.microblog_context ): # None=SiteRoot handled by api raise Unauthorized("You do not have permission <%s>" % permission)
[docs] def _check_delete_permission(self, status): """ StatusUpdates have no local 'owner' role. Instead we check against permissions on the microblog context and compare with the creator. """ if not status.can_delete: raise Unauthorized("Not allowed to delete this statusupdate")
# --- READ SECURITY ---
[docs] def _has_global_view_statusupdate_permission(self): # hardcode non-anon protection at siteroot for now permission = "Plone Social: View Microblog Status Update" try: if not api.user.has_permission(permission): return False except api.exc.CannotGetPortalError: # happens in tests pass return True
[docs] def secure(self, keyset): """Filter keyset to return only keys the current user may see. NB this may return statusupdates with a microblog_context (workspace) accessible to the user, but referencing a content_context (document) which the user may not access yet because of content workflow. Filtering that is quite costly and not done here - instead there's a postprocessing filter in activitystream just before rendering. """ return LLBTree.intersection( LLBTree.LLTreeSet(keyset), LLBTree.LLTreeSet(self.allowed_status_keys()) )
[docs] @ram.cache(cache_key) def allowed_status_keys(self): """Return the subset of IStatusUpdate keys that are related to UUIDs of accessible contexts. I.e. blacklist all IStatusUpdate that has a context which we don't have permission to access. This is the key security protection used by all getters. Because it's called a lot we're caching results per user request. Because we use ram caching, we must make sure the return value is *not* a ZODB BTree accessor, so we cast to tuple. """ if self._has_global_view_statusupdate_permission(): return self.allowed_status_keys_intranet() else: return self.allowed_status_keys_extranet()
[docs] def allowed_status_keys_intranet(self): # intranet users may see global updates all_statusids = LLBTree.LLSet(self._status_mapping.keys()) uuid_blacklist = self._blacklist_microblogcontext_uuids() if not uuid_blacklist: return tuple(all_statusids) else: # for each uid, expand uid into set of statusids blacklisted_treesets = ( self._uuid_mapping.get(uuid) for uuid in uuid_blacklist if uuid in self._uuid_mapping.keys() ) # merge sets of blacklisted statusids into single blacklist blacklisted_statusids = reduce( LLBTree.union, blacklisted_treesets, LLBTree.TreeSet() ) # subtract blacklisted statusids from all statusids return tuple(LLBTree.difference(all_statusids, blacklisted_statusids))
[docs] def allowed_status_keys_extranet(self): # extranet users may NOT see global updates uuid_whitelist = self._whitelist_microblogcontext_uuids() if not uuid_whitelist: return tuple() else: # for each uid, expand uid into set of statusids whitelisted_treesets = ( self._uuid_mapping.get(uuid) for uuid in uuid_whitelist if uuid in self._uuid_mapping.keys() ) # merge sets of whitelisted statusids into single whitelist whitelisted_statusids = reduce( LLBTree.union, whitelisted_treesets, LLBTree.TreeSet() ) # return only statusids for accessible microblog contexts return tuple(whitelisted_statusids)
[docs] def _blacklist_microblogcontext_uuids(self): """Returns the uuids for all IMicroblogContext that the current user has no access to. All the read accessors rely on this method for security checks. """ whitelist = self._whitelist_microblogcontext_uuids() # SiteRoot context is not UUID indexed, so not blacklisted return set(self._uuid_mapping) - whitelist
[docs] def _whitelist_microblogcontext_uuids(self): """ Current catalog query implicitly filters on View permission for the current user. We should not rely on View adequately representing ViewStatusUpdate. The current implementation takes a conservative approach by applying an extra explicit security check for ViewStatusUpdate. It is theoretically possible that the result excludes workspaces for which the user does have ViewStatusUpdate but does not have View. A possible performance optimization that would also fix the overly conservative bias would be to add a special index for ViewStatusUpdate and use that directly in the catalog query. See http://stackoverflow.com/questions/23950860/how-to-list-users-having-review-permission-on-content-object However, the number of IMicroblogContext objects in a site is normally quite limited and the outcome of this check is cached per request, which should hopefully limit the performance cost. EDIT: this *is* costly, see workaround and comments below. """ # noqa catalog = getToolByName(self, "portal_catalog") marker = IMicroblogContext.__identifier__ results = catalog.searchResults(object_provides=marker) # performance workaround replaces code further down whitelist = {brain.UID for brain in results} return whitelist
# BELOW IS ONLY NEEDED ONCE #1101 IS FIXED # TODO: REFACTOR FOR PERFORMANCE (USING DEDICATED INDEX) # SHOULD AVOID THE COSTLY getObject() + api.user CALLS # permission = "Plone Social: View Microblog Status Update" # # SiteRoot context is NOT whitelisted # whitelist = [] # for brain in results: # try: # obj = brain.getObject() # except Unauthorized: # # can View but not Access workspace - skip # continue # # and double check for ViewStatusUpdate # if api.user.has_permission(permission, obj=obj): # whitelist.append(brain.UID) # --- DISABLED ACCESSORS --- # blocked IBTree methods to protect index consistency # (also not sensible for our use case)
[docs] def insert(self, key, value): raise NotImplementedError("Can't allow that to happen.")
[docs] def pop(self, k, d=None): raise NotImplementedError("Can't allow that to happen.")
[docs] def setdefault(self, k, d): raise NotImplementedError("Can't allow that to happen.")
[docs] def update(self, collection): raise NotImplementedError("Can't allow that to happen.")
# --- PRIMARY ACCESSORS ---
[docs] def get(self, key): key = int(key) # secure if key in self.allowed_status_keys(): return self._get(key) # don't mask lookup error with Unauthorized elif key not in self._status_mapping.keys(): raise KeyError(key) else: raise (Unauthorized("You're not allowed to get status %s'" % key))
# performance helper to avoid multiple security checks
[docs] def _get(self, key): return self._status_mapping.get(key)
[docs] def items(self, min=None, max=None, limit=100, tags=None, users=None): # secured in keys() return ( (key, self._get(key)) for key in self.keys(min, max, limit, tags, users) )
[docs] def values(self, min=None, max=None, limit=100, tags=None, users=None): # secured in keys() return (self._get(key) for key in self.keys(min, max, limit, tags, users))
[docs] def keys(self, min=None, max=None, limit=100, tags=None, users=None): # secure if tags is None and users is None: matches = self.allowed_status_keys() else: matches = self.secure( LLBTree.union( self._query_mapping(self._tag_mapping, tags), self._query_mapping(self._user_mapping, users), ) ) return longkeysortreverse(matches, min, max, limit)
iteritems = items iterkeys = keys itervalues = values # --- THREAD CONVERSATION ACCESSORS ---
[docs] def is_most_recent_in_thread(self, statusupdate, tags=None): # thread_id is None for a toplevel thread_id = statusupdate.thread_id or statusupdate.id if thread_id not in self._threadid_mapping: # toplevel without replies has no index record (yet) return True if tags is None: # straightforward: take the reply with the highest id most_recent_id = max(self._threadid_mapping[thread_id]) else: # the highest reply id that matches one of the tags tagged_in_thread = LLBTree.intersection( self._threadid_mapping[thread_id], self._query_mapping(self._tag_mapping, tags), ) most_recent_id = max(tagged_in_thread) return statusupdate.id == most_recent_id
[docs] def thread_items(self, thread_id, min=None, max=None, limit=100): # secured by thread_keys return ( (key, self._get(key)) for key in self.thread_keys(thread_id, min, max, limit) )
[docs] def thread_values(self, thread_id, min=None, max=None, limit=100): # secured by thread_keys return (self._get(key) for key in self.thread_keys(thread_id, min, max, limit))
[docs] def thread_keys(self, thread_id, min=None, max=None, limit=100): if not thread_id: return () mapping = self._threadid_mapping.get(thread_id) or [thread_id] mapping = self.secure(mapping) return longkeysortreverse(mapping, min, max, limit)
# --- USER ACCESSORS ---
[docs] def user_items(self, users, min=None, max=None, limit=100): # secured by user_keys return ((key, self._get(key)) for key in self.user_keys(users, min, max, limit))
[docs] def user_values(self, users, min=None, max=None, limit=100): # secured by user_keys return (self._get(key) for key in self.user_keys(users, min, max, limit))
[docs] def user_keys(self, users, min=None, max=None, limit=100): if not users: return () return self.keys(min, max, limit, users=users)
# --- CONTEXT ACCESSORS = microblog_context security context ---
[docs] def context_items( self, microblog_context, min=None, max=None, limit=100, nested=True ): # secured by microblog_context_keys return ( (key, self._get(key)) for key in self.context_keys(microblog_context, min, max, limit, nested) )
[docs] def context_values( self, microblog_context, min=None, max=None, limit=100, nested=True ): # secured by microblog_context_keys return ( self._get(key) for key in self.context_keys(microblog_context, min, max, limit, nested) )
[docs] def context_keys( self, microblog_context, min=None, max=None, limit=100, nested=True ): if nested: # hits portal_catalog nested_uuids = { uuid for uuid in self.nested_uuids(microblog_context) if uuid in self._uuid_mapping } if not nested_uuids: return () else: # used in test_statuscontainer_microblog_context uuid = self._context2uuid(microblog_context) if uuid not in self._uuid_mapping: return () nested_uuids = {uuid} matches = self._query_mapping(self._uuid_mapping, nested_uuids) matches = self.secure(matches) return longkeysortreverse(matches, min, max, limit)
# enable unittest override of plone.app.uuid lookup
[docs] def _context2uuid(self, context): return IUUID(context)
# --- CONTENT ACCESSORS = content_context
[docs] def content_items(self, content_context): return ((key, self._get(key)) for key in self.content_keys(content_context))
[docs] def content_values(self, content_context): return (self._get(key) for key in self.content_keys(content_context))
[docs] def content_keys(self, content_context): uuid = self._context2uuid(content_context) mapping = self._content_uuid_mapping.get(uuid) if not mapping: return () # security is derived from microblog_context NOT from content_context mapping = self.secure(mapping) # not reversing, keep document discussion in chronological order return mapping
# --- HUMAN/CONTENT STREAM FILTERS ACCESSORS ---
[docs] def is_human_items(self, min=None, max=None, limit=100): # secured in is_human_keys() return ((key, self._get(key)) for key in self.is_human_keys(min, max, limit))
[docs] def is_human_values(self, min=None, max=None, limit=100): # secured in is_human_keys() return (self._get(key) for key in self.is_human_keys(min, max, limit))
[docs] def is_human_keys(self, min=None, max=None, limit=100): # secure keys = self.secure(self._is_human_mapping) return longkeysortreverse(keys, min, max, limit)
[docs] def is_content_items(self, min=None, max=None, limit=100): # secured in is_content_keys() return ((key, self._get(key)) for key in self.is_content_keys(min, max, limit))
[docs] def is_content_values(self, min=None, max=None, limit=100): # secured in is_content_keys() return (self._get(key) for key in self.is_content_keys(min, max, limit))
[docs] def is_content_keys(self, min=None, max=None, limit=100): # secure keys = self.secure(self._is_content_mapping) return longkeysortreverse(keys, min, max, limit)
# --- MENTION ACCESSORS ---
[docs] def mention_items(self, mentions, min=None, max=None, limit=100): # secured by mention_keys return ( (key, self._get(key)) for key in self.mention_keys(mentions, min, max, limit) )
[docs] def mention_values(self, mentions, min=None, max=None, limit=100): # secured by mention_keys return (self._get(key) for key in self.mention_keys(mentions, min, max, limit))
[docs] def mention_keys(self, mentions, min=None, max=None, limit=100): if not mentions: return () matches = self._query_mapping(self._mentions_mapping, mentions) matches = self.secure(matches) return longkeysortreverse(matches, min, max, limit)
# --- HELPERS ---
[docs] def nested_uuids(self, context): catalog = getToolByName(context, "portal_catalog") results = catalog( path={"query": "/".join(context.getPhysicalPath()), "depth": -1}, object_implements=IMicroblogContext, ) # does not need to be deterministically sorted - will be used to # retrieve statusupdates which then themselves will be sorted return {item.UID for item in results}
[docs] def _query_mapping(self, mapping, keys): """ Calculate the union of all statusids indexed in <mapping> on any of the <keys>. Always returns an LLTreeSet ready for further processing. """ if not keys: return LLBTree.LLTreeSet() elif isinstance(keys, (str, unicode)): # convert single key to set keys = {keys} # calculate the union set of matching ids across all tags # silently discards all non-existing key ids treesets = [mapping.get(id) for id in keys if id in mapping.keys()] return LLBTree.multiunion(treesets)
[docs] def _update_mtime(self): """Update _mtime on statusupdate add. Uses milliseconds. This flag is used for the cache key and for the async time window. """ with LOCK: self._mtime = int(time.time() * 1000)
[docs] def _update_ctime(self): """Update _ctime for cache busting. Requires *micro*seconds granularity to avoid RuntimeError by cache interference. Only used for cache invalidation. """ with LOCK: self._ctime = int(time.time() * 1000000)
[docs]class QueuedStatusContainer(BaseStatusContainer): """A write performance optimized IStatusContainer. This separates the queuing logic from the base class to make the code more readable (and testable). For performance reasons, an in-memory STATUSQUEUE is used. StatusContainer.add() puts StatusUpdates into the queue. MAX_QUEUE_AGE is the commit window in milliseconds. To disable batch queuing, set MAX_QUEUE_AGE = 0 .add() calls .autoflush(), which flushes the queue when ._mtime is longer than MAX_QUEUE_AGE ago. So each .add() checks the queue. In a low-traffic site this will result in immediate disk writes (msg frequency < timeout). In a high-traffic site this will result on one write per timeout, which makes it possible to attain > 100 status update inserts per second. Note that the algorithm is structured in such a way, that the system automatically adapts to low/high traffic conditions. Additionally, a non-interactive queue flush is set up via _schedule_flush() which uses a volatile thread timer _v_timer to set up a non-interactive queue flush. This ensures that the "last Tweet of the day" also gets committed to disk. An attempt is made to make self._mtime and self._v_timer thread-safe. These function as a kind of ad-hoc locking mechanism so that only one thread at a time is flushing the memory queue into persistent storage. """ implements(IStatusContainer) # set ASYNC=False to force sync mode and ignore MAX_QUEUE_AGE if api.env.test_mode(): ASYNC = False else: ASYNC = False # disabled because of CSRF issues # max in-memory time in millisec before disk sync, if in ASYNC mode MAX_QUEUE_AGE = 1000
[docs] def add(self, status): self._check_status(status) self._check_add_permission(status) if self.ASYNC and self.MAX_QUEUE_AGE > 0: self._queue(status) # fallback sync in case of NO traffic (kernel timer) self._schedule_flush() # immediate sync on low traffic (old ._mtime) # postpones sync on high traffic (next .add()) return self._autoflush() # updates _mtime on write else: self._store(status) return 1 # immediate write
[docs] def _queue(self, status): STATUSQUEUE.put((status.id, status))
[docs] def _schedule_flush(self): """A fallback queue flusher that runs without user interactions""" if not self.MAX_QUEUE_AGE > 0: return try: # non-persisted, absent on first request self._v_timer except AttributeError: # initialize on first request self._v_timer = None if self._v_timer is not None: # timer already running return # only a one-second granularity, round upwards timeout = int(math.ceil(float(self.MAX_QUEUE_AGE) / 1000)) # Get the global request, # we just need to copy over some environment stuff. request = getRequest() request_environ = {} site = getSite() # We do not use plone.api here because we cannot assume # that site == plone portal. # It could also be a directory under it if site: site_path = "/".join(site.getPhysicalPath()) if request is None: # If we fail getting a request, # we might still have it in portal request = getattr(site, "REQUEST", None) else: # This situation can happen in tests. logger.warning("Could not get the site") site_path = None if request is not None: request_environ = { "SERVER_NAME": request.SERVER_NAME, "SERVER_PORT": request.SERVER_PORT, "REQUEST_METHOD": request.REQUEST_METHOD, } with LOCK: # logger.info("Setting timer") self._v_timer = threading.Timer( timeout, self._scheduled_autoflush, kwargs={"site_path": site_path, "environ": request_environ}, ) self._v_timer.start()
[docs] def _scheduled_autoflush(self, site_path=None, environ=None): """This method is run from the timer, outside a normal request scope. This requires an explicit commit on db write""" if site_path is not None: app = makerequest(getZope2App(), environ=environ) site = app.restrictedTraverse(site_path) setSite(site) if self._autoflush(): # returns 1 on actual write transaction.commit()
[docs] def _autoflush(self): # logger.info("autoflush") # compares millisecond _mtime with millisecond MAX_QUEUE_AGE if int(time.time() * 1000) - self._mtime > self.MAX_QUEUE_AGE: return self.flush_queue() # 1 on write, 0 on noop return 0 # no write
[docs] def flush_queue(self): # update marker - block autoflush self._update_mtime() with LOCK: try: # cancel scheduled flush if self.MAX_QUEUE_AGE > 0 and self._v_timer is not None: # logger.info("Cancelling timer") self._v_timer.cancel() self._v_timer = None except AttributeError: self._v_timer = None if STATUSQUEUE.empty(): return 0 # no write while True: try: (id, status) = STATUSQUEUE.get(block=False) safeWrite(status) self._store(status) except Queue.Empty: break return 1 # confirmed write