# coding=utf-8
from AccessControl import Unauthorized
from Acquisition import Explicit
from BTrees import LLBTree
from BTrees import LOBTree
from BTrees import OOBTree
from interfaces import IMicroblogContext
from interfaces import IStatusContainer
from interfaces import IStatusUpdate
from persistent import Persistent
from plone import api
from plone.memoize import ram
from plone.protect.utils import safeWrite
from plone.uuid.interfaces import IUUID
from Products.CMFCore.utils import getToolByName
from Testing.makerequest import makerequest
from utils import longkeysortreverse
from zope.component.hooks import getSite
from zope.component.hooks import setSite
from zope.container.contained import ObjectAddedEvent
from zope.event import notify
from zope.globalrequest import getRequest
from zope.interface import implements
import logging
import math
import Queue
import sys
import threading
import time
import transaction
import Zope2
logger = logging.getLogger("ploneintranet.microblog")
LOCK = threading.RLock()
STATUSQUEUE = Queue.PriorityQueue()
[docs]def cache_key(method, self):
"""
Used as ramcache key for the expensive and frequently used
allowed_status_keys() results.
- cache per user
- until a new update is inserted
- for maximally 1 minute
The short time interval is needed in case the user's workspace
memberships change - this should invalidate the cache but we're
not listening to that event directly.
One minute on the other hand is enough to cache the results for
multiple calls during a single page rendering request - which
should take seconds rather than a minute, but real life can be slow
(especially if the cache expires, which has dramatic effects...)
memoize.ram automatically garbage collects the cache after 24 hours.
"""
try:
member = api.user.get_current()
except api.exc.CannotGetPortalError:
# getSite() fails in integration tests, disable caching
raise ram.DontCache
return (
member.id,
self._mtime, # last add in milliseconds (self = statuscontainer)
self._ctime, # last delete in microseconds
time.time() // 60,
) # for one minute
[docs]def getZope2App(*args, **kwargs):
"""Gets the Zope2 app.
Copied almost verbatim from collective.celery
"""
if Zope2.bobo_application is None:
orig_argv = sys.argv
sys.argv = [""]
res = Zope2.app(*args, **kwargs)
sys.argv = orig_argv
return res
# should set bobo_application
# man, freaking zope2 is weird
return Zope2.bobo_application(*args, **kwargs)
[docs]class BaseStatusContainer(Persistent, Explicit):
"""This implements IStatusUpdate storage, indexing and query logic.
This is just a base class, the actual IStorageContainer used
in the implementation is the QueuedStatusContainer defined below.
StatusUpdates are stored in the private _status_mapping BTree.
A subset of BTree accessors are exposed, see interfaces.py.
StatusUpdates are keyed by longint microsecond ids.
Additionally, StatusUpdates are indexed by users and tags.
These indexes use the same longint microsecond IStatusUpdate.id.
Special user_* prefixed accessors take an extra argument 'users',
an interable of userids, and return IStatusUpdate keys, instances or items
filtered by userids, in addition to the normal min/max statusid filters.
For backward compatibility sake, microblog_context is indexed as
_uuid_mapping and accessors are called .context_*.
This microblog_context is the security context for statusupdates.
The newer content_context is indexed as
_content_mapping and accessors are called .content_*.
This has no security impact and is only a convenience for per-document
accessors.
"""
implements(IStatusContainer)
dontcache = False # used to disable cache during deletion
def __init__(self, context=None):
# last add in milliseconds - used in both async and cache key
self._mtime = 0
# cache buster in MICROseconds - used in cache key only
self._ctime = 0
# primary storage: (long statusid) -> (object IStatusUpdate)
self._status_mapping = LOBTree.LOBTree()
# archive deleted: (long statusid) -> (object IStatusUpdate)
self._status_archive = LOBTree.LOBTree()
# index by user: (string userid) -> (object TreeSet(long statusid))
self._user_mapping = OOBTree.OOBTree()
# index by tag: (string tag) -> (object TreeSet(long statusid))
self._tag_mapping = OOBTree.OOBTree()
# index by microblog_context:
# (string UUID) -> (object TreeSet(long statusid))
self._uuid_mapping = OOBTree.OOBTree() # keep old name for backcompat
# index by content_context:
# (string UUID) -> (object TreeSet(long statusid))
self._content_uuid_mapping = OOBTree.OOBTree()
# index by thread (long threadid) -> (object TreeSet(long statusid))
self._threadid_mapping = OOBTree.OOBTree()
# index by mentions (string UUID) -> (object TreeSet(long statusid))
self._mentions_mapping = OOBTree.OOBTree()
# index all content updates: (object TreeSet(long statusid))
self._is_content_mapping = LLBTree.LLTreeSet()
# index all human updates: (object TreeSet(long statusid))
self._is_human_mapping = LLBTree.LLTreeSet()
[docs] def add(self, status):
self._check_status(status)
self._check_add_permission(status)
self._store(status)
[docs] def _store(self, status):
# see ZODB/Btree/Interfaces.py
# If the key was already in the collection, there is no change
while not self._status_mapping.insert(status.id, status):
status.id += 1
self._idx_user(status)
self._idx_tag(status)
self._idx_microblog_context(status)
self._idx_content_context(status)
self._idx_threadid(status)
self._idx_mentions(status)
self._idx_is_content(status)
self._idx_is_human(status)
self._notify_add(status)
# the _store() method is shared between Base and Async
# putting counter updates here ensures maximal consistency
self._update_mtime() # millisec for async scheduler
self._update_ctime() # microsecond granularity cache busting
[docs] def _check_status(self, status):
if not IStatusUpdate.providedBy(status):
raise ValueError("IStatusUpdate interface not provided.")
[docs] def _notify_add(self, status):
event = ObjectAddedEvent(status, newParent=self, newName=status.id)
notify(event)
[docs] def delete(self, id, restricted=True):
status = self._get(id) # bypass view permission check
# thread expansion and batch delete can run into eachother
if not status:
return
# delete permission check only original, not thread cascade
if restricted: # content_removed handler runs unrestricted
self._check_delete_permission(status)
thread_mapping = self._threadid_mapping.get(id)
if thread_mapping:
# list() avoids RuntimeError
# mapping includes current parent id automatically
to_delete = list(thread_mapping)
else:
to_delete = [id]
for xid in to_delete:
# thread expansion and batch delete can run into eachother
if not self._get(xid):
continue
# cannot reproduce #379 but better safe than sorry
try:
self._unidx(xid)
deleted = self._status_mapping.pop(xid)
self._status_archive.insert(xid, deleted)
# this would be the right place to notify deletion
logger.info(
"%s archived statusupdate %s", api.user.get_current().id, xid
)
except AttributeError:
logger.error(
"%s failed to archive statusupdate %s",
api.user.get_current().id,
xid,
)
self._update_ctime() # purge cache
[docs] def _unidx(self, id):
status = self._get(id) # bypass view permission check
self._unidx_user(status)
self._unidx_tag(status)
self._unidx_microblog_context(status)
self._unidx_content_context(status)
self._unidx_threadid(status)
self._unidx_mentions(status)
self._unidx_is_content(status)
self._unidx_is_human(status)
# --- INDEXES ---
[docs] def _idx_user(self, status):
userid = unicode(status.userid)
# If the key was already in the collection, there is no change
# create user treeset if not already present
self._user_mapping.insert(userid, LLBTree.LLTreeSet())
# add status id to user treeset
self._user_mapping[userid].insert(status.id)
[docs] def _unidx_user(self, status):
self._user_mapping[status.userid].remove(status.id)
[docs] def _idx_tag(self, status):
"""
Update the `StatusContainer` tag index with any new tags
:param status: a `StatusUpdate` object
"""
if not status.tags:
return
for tag in [tag.decode("utf-8") for tag in status.tags]:
# If the key was already in the collection, there is no change
# create tag treeset if not already present
self._tag_mapping.insert(tag, LLBTree.LLTreeSet())
# add status id to tag treeset
self._tag_mapping[tag].insert(status.id)
[docs] def _unidx_tag(self, status):
if not status.tags:
return
for tag in [tag.decode("utf-8") for tag in status.tags]:
self._tag_mapping[tag].remove(status.id)
[docs] def _idx_microblog_context(self, status):
uuid = status._microblog_context_uuid
if not uuid:
return
# If the key was already in the collection, there is no change
# create tag treeset if not already present
self._uuid_mapping.insert(uuid, LLBTree.LLTreeSet())
self._uuid_mapping[uuid].insert(status.id)
[docs] def _unidx_microblog_context(self, status):
uuid = status._microblog_context_uuid
if not uuid:
return
self._uuid_mapping[uuid].remove(status.id)
[docs] def _idx_content_context(self, status):
uuid = status._content_context_uuid
if not uuid:
return
# If the key was already in the collection, there is no change
# create tag treeset if not already present
self._content_uuid_mapping.insert(uuid, LLBTree.LLTreeSet())
self._content_uuid_mapping[uuid].insert(status.id)
[docs] def _unidx_content_context(self, status):
uuid = status._content_context_uuid
if not uuid:
return
self._content_uuid_mapping[uuid].remove(status.id)
[docs] def _idx_threadid(self, status):
if not getattr(status, "thread_id", False):
return
thread_id = status.thread_id
if thread_id:
# If the key was already in the collection, there is no change
# create tag treeset if not already present
self._threadid_mapping.insert(thread_id, LLBTree.LLTreeSet())
self._threadid_mapping[thread_id].insert(status.id)
# Make sure thread_id is also in the mapping
self._threadid_mapping[thread_id].insert(thread_id)
[docs] def _unidx_threadid(self, status):
if not getattr(status, "thread_id", False):
return
thread_id = status.thread_id
if thread_id:
self._threadid_mapping[thread_id].remove(status.id)
[docs] def _idx_mentions(self, status):
if not getattr(status, "mentions", False):
return
mentions = status.mentions.keys()
for mention in mentions:
# If the key was already in the collection, there is no change
# create tag treeset if not already present
self._mentions_mapping.insert(mention, LLBTree.LLTreeSet())
self._mentions_mapping[mention].insert(status.id)
[docs] def _unidx_mentions(self, status):
if not getattr(status, "mentions", False):
return
mentions = status.mentions.keys()
for mention in mentions:
self._mentions_mapping[mention].remove(status.id)
[docs] def _idx_is_content(self, status):
if status.is_content_update:
self._is_content_mapping.insert(status.id)
[docs] def _unidx_is_content(self, status):
# try/except is more robust than status.is_content_update attr check
try:
self._is_content_mapping.remove(status.id)
except KeyError:
pass
[docs] def _idx_is_human(self, status):
if status.is_human_update:
self._is_human_mapping.insert(status.id)
[docs] def _unidx_is_human(self, status):
# try/except is more robust than status.is_human_update attr check
try:
self._is_human_mapping.remove(status.id)
except KeyError:
pass
[docs] def clear(self):
self._user_mapping.clear()
self._tag_mapping.clear()
self._uuid_mapping.clear()
self._content_uuid_mapping.clear()
self._threadid_mapping.clear()
self._mentions_mapping.clear()
return self._status_mapping.clear()
# --- WRITE SECURITY ---
[docs] def _check_add_permission(self, status):
permission = "Plone Social: Add Microblog Status Update"
# can also throw statusupdate._uuid2context() uuid lookup Unauthorized
if not api.user.has_permission(
permission, obj=status.microblog_context
): # None=SiteRoot handled by api
raise Unauthorized("You do not have permission <%s>" % permission)
[docs] def _check_delete_permission(self, status):
"""
StatusUpdates have no local 'owner' role. Instead we check against
permissions on the microblog context and compare with the creator.
"""
if not status.can_delete:
raise Unauthorized("Not allowed to delete this statusupdate")
# --- READ SECURITY ---
[docs] def _has_global_view_statusupdate_permission(self):
# hardcode non-anon protection at siteroot for now
permission = "Plone Social: View Microblog Status Update"
try:
if not api.user.has_permission(permission):
return False
except api.exc.CannotGetPortalError: # happens in tests
pass
return True
[docs] def secure(self, keyset):
"""Filter keyset to return only keys the current user may see.
NB this may return statusupdates with a microblog_context (workspace)
accessible to the user, but referencing a content_context (document)
which the user may not access yet because of content workflow.
Filtering that is quite costly and not done here - instead there's a
postprocessing filter in activitystream just before rendering.
"""
return LLBTree.intersection(
LLBTree.LLTreeSet(keyset), LLBTree.LLTreeSet(self.allowed_status_keys())
)
[docs] @ram.cache(cache_key)
def allowed_status_keys(self):
"""Return the subset of IStatusUpdate keys
that are related to UUIDs of accessible contexts.
I.e. blacklist all IStatusUpdate that has a context
which we don't have permission to access.
This is the key security protection used by all getters.
Because it's called a lot we're caching results per user request.
Because we use ram caching, we must make sure the return value
is *not* a ZODB BTree accessor, so we cast to tuple.
"""
if self._has_global_view_statusupdate_permission():
return self.allowed_status_keys_intranet()
else:
return self.allowed_status_keys_extranet()
[docs] def allowed_status_keys_intranet(self):
# intranet users may see global updates
all_statusids = LLBTree.LLSet(self._status_mapping.keys())
uuid_blacklist = self._blacklist_microblogcontext_uuids()
if not uuid_blacklist:
return tuple(all_statusids)
else:
# for each uid, expand uid into set of statusids
blacklisted_treesets = (
self._uuid_mapping.get(uuid)
for uuid in uuid_blacklist
if uuid in self._uuid_mapping.keys()
)
# merge sets of blacklisted statusids into single blacklist
blacklisted_statusids = reduce(
LLBTree.union, blacklisted_treesets, LLBTree.TreeSet()
)
# subtract blacklisted statusids from all statusids
return tuple(LLBTree.difference(all_statusids, blacklisted_statusids))
[docs] def _blacklist_microblogcontext_uuids(self):
"""Returns the uuids for all IMicroblogContext that the current
user has no access to.
All the read accessors rely on this method for security checks.
"""
whitelist = self._whitelist_microblogcontext_uuids()
# SiteRoot context is not UUID indexed, so not blacklisted
return set(self._uuid_mapping) - whitelist
[docs] def _whitelist_microblogcontext_uuids(self):
"""
Current catalog query implicitly filters on View permission for
the current user.
We should not rely on View adequately representing ViewStatusUpdate.
The current implementation takes a conservative approach by
applying an extra explicit security check for ViewStatusUpdate.
It is theoretically possible that the result excludes workspaces for
which the user does have ViewStatusUpdate but does not have View.
A possible performance optimization that would also fix the overly
conservative bias would be to add a special index for
ViewStatusUpdate and use that directly in the catalog query.
See http://stackoverflow.com/questions/23950860/how-to-list-users-having-review-permission-on-content-object
However, the number of IMicroblogContext objects in a site is
normally quite limited and the outcome of this check is cached
per request, which should hopefully limit the performance cost.
EDIT: this *is* costly, see workaround and comments below.
""" # noqa
catalog = getToolByName(self, "portal_catalog")
marker = IMicroblogContext.__identifier__
results = catalog.searchResults(object_provides=marker)
# performance workaround replaces code further down
whitelist = {brain.UID for brain in results}
return whitelist
# BELOW IS ONLY NEEDED ONCE #1101 IS FIXED
# TODO: REFACTOR FOR PERFORMANCE (USING DEDICATED INDEX)
# SHOULD AVOID THE COSTLY getObject() + api.user CALLS
# permission = "Plone Social: View Microblog Status Update"
# # SiteRoot context is NOT whitelisted
# whitelist = []
# for brain in results:
# try:
# obj = brain.getObject()
# except Unauthorized:
# # can View but not Access workspace - skip
# continue
# # and double check for ViewStatusUpdate
# if api.user.has_permission(permission, obj=obj):
# whitelist.append(brain.UID)
# --- DISABLED ACCESSORS ---
# blocked IBTree methods to protect index consistency
# (also not sensible for our use case)
[docs] def insert(self, key, value):
raise NotImplementedError("Can't allow that to happen.")
[docs] def pop(self, k, d=None):
raise NotImplementedError("Can't allow that to happen.")
[docs] def setdefault(self, k, d):
raise NotImplementedError("Can't allow that to happen.")
[docs] def update(self, collection):
raise NotImplementedError("Can't allow that to happen.")
# --- PRIMARY ACCESSORS ---
[docs] def get(self, key):
key = int(key)
# secure
if key in self.allowed_status_keys():
return self._get(key)
# don't mask lookup error with Unauthorized
elif key not in self._status_mapping.keys():
raise KeyError(key)
else:
raise (Unauthorized("You're not allowed to get status %s'" % key))
# performance helper to avoid multiple security checks
[docs] def _get(self, key):
return self._status_mapping.get(key)
[docs] def items(self, min=None, max=None, limit=100, tags=None, users=None):
# secured in keys()
return (
(key, self._get(key)) for key in self.keys(min, max, limit, tags, users)
)
[docs] def values(self, min=None, max=None, limit=100, tags=None, users=None):
# secured in keys()
return (self._get(key) for key in self.keys(min, max, limit, tags, users))
[docs] def keys(self, min=None, max=None, limit=100, tags=None, users=None):
# secure
if tags is None and users is None:
matches = self.allowed_status_keys()
else:
matches = self.secure(
LLBTree.union(
self._query_mapping(self._tag_mapping, tags),
self._query_mapping(self._user_mapping, users),
)
)
return longkeysortreverse(matches, min, max, limit)
iteritems = items
iterkeys = keys
itervalues = values
# --- THREAD CONVERSATION ACCESSORS ---
[docs] def is_most_recent_in_thread(self, statusupdate, tags=None):
# thread_id is None for a toplevel
thread_id = statusupdate.thread_id or statusupdate.id
if thread_id not in self._threadid_mapping:
# toplevel without replies has no index record (yet)
return True
if tags is None:
# straightforward: take the reply with the highest id
most_recent_id = max(self._threadid_mapping[thread_id])
else:
# the highest reply id that matches one of the tags
tagged_in_thread = LLBTree.intersection(
self._threadid_mapping[thread_id],
self._query_mapping(self._tag_mapping, tags),
)
most_recent_id = max(tagged_in_thread)
return statusupdate.id == most_recent_id
[docs] def thread_items(self, thread_id, min=None, max=None, limit=100):
# secured by thread_keys
return (
(key, self._get(key))
for key in self.thread_keys(thread_id, min, max, limit)
)
[docs] def thread_values(self, thread_id, min=None, max=None, limit=100):
# secured by thread_keys
return (self._get(key) for key in self.thread_keys(thread_id, min, max, limit))
[docs] def thread_keys(self, thread_id, min=None, max=None, limit=100):
if not thread_id:
return ()
mapping = self._threadid_mapping.get(thread_id) or [thread_id]
mapping = self.secure(mapping)
return longkeysortreverse(mapping, min, max, limit)
# --- USER ACCESSORS ---
[docs] def user_items(self, users, min=None, max=None, limit=100):
# secured by user_keys
return ((key, self._get(key)) for key in self.user_keys(users, min, max, limit))
[docs] def user_values(self, users, min=None, max=None, limit=100):
# secured by user_keys
return (self._get(key) for key in self.user_keys(users, min, max, limit))
[docs] def user_keys(self, users, min=None, max=None, limit=100):
if not users:
return ()
return self.keys(min, max, limit, users=users)
# --- CONTEXT ACCESSORS = microblog_context security context ---
[docs] def context_items(
self, microblog_context, min=None, max=None, limit=100, nested=True
):
# secured by microblog_context_keys
return (
(key, self._get(key))
for key in self.context_keys(microblog_context, min, max, limit, nested)
)
[docs] def context_values(
self, microblog_context, min=None, max=None, limit=100, nested=True
):
# secured by microblog_context_keys
return (
self._get(key)
for key in self.context_keys(microblog_context, min, max, limit, nested)
)
[docs] def context_keys(
self, microblog_context, min=None, max=None, limit=100, nested=True
):
if nested:
# hits portal_catalog
nested_uuids = {
uuid
for uuid in self.nested_uuids(microblog_context)
if uuid in self._uuid_mapping
}
if not nested_uuids:
return ()
else:
# used in test_statuscontainer_microblog_context
uuid = self._context2uuid(microblog_context)
if uuid not in self._uuid_mapping:
return ()
nested_uuids = {uuid}
matches = self._query_mapping(self._uuid_mapping, nested_uuids)
matches = self.secure(matches)
return longkeysortreverse(matches, min, max, limit)
# enable unittest override of plone.app.uuid lookup
[docs] def _context2uuid(self, context):
return IUUID(context)
# --- CONTENT ACCESSORS = content_context
[docs] def content_items(self, content_context):
return ((key, self._get(key)) for key in self.content_keys(content_context))
[docs] def content_values(self, content_context):
return (self._get(key) for key in self.content_keys(content_context))
[docs] def content_keys(self, content_context):
uuid = self._context2uuid(content_context)
mapping = self._content_uuid_mapping.get(uuid)
if not mapping:
return ()
# security is derived from microblog_context NOT from content_context
mapping = self.secure(mapping)
# not reversing, keep document discussion in chronological order
return mapping
# --- HUMAN/CONTENT STREAM FILTERS ACCESSORS ---
[docs] def is_human_items(self, min=None, max=None, limit=100):
# secured in is_human_keys()
return ((key, self._get(key)) for key in self.is_human_keys(min, max, limit))
[docs] def is_human_values(self, min=None, max=None, limit=100):
# secured in is_human_keys()
return (self._get(key) for key in self.is_human_keys(min, max, limit))
[docs] def is_human_keys(self, min=None, max=None, limit=100):
# secure
keys = self.secure(self._is_human_mapping)
return longkeysortreverse(keys, min, max, limit)
[docs] def is_content_items(self, min=None, max=None, limit=100):
# secured in is_content_keys()
return ((key, self._get(key)) for key in self.is_content_keys(min, max, limit))
[docs] def is_content_values(self, min=None, max=None, limit=100):
# secured in is_content_keys()
return (self._get(key) for key in self.is_content_keys(min, max, limit))
[docs] def is_content_keys(self, min=None, max=None, limit=100):
# secure
keys = self.secure(self._is_content_mapping)
return longkeysortreverse(keys, min, max, limit)
# --- MENTION ACCESSORS ---
[docs] def mention_items(self, mentions, min=None, max=None, limit=100):
# secured by mention_keys
return (
(key, self._get(key))
for key in self.mention_keys(mentions, min, max, limit)
)
[docs] def mention_values(self, mentions, min=None, max=None, limit=100):
# secured by mention_keys
return (self._get(key) for key in self.mention_keys(mentions, min, max, limit))
[docs] def mention_keys(self, mentions, min=None, max=None, limit=100):
if not mentions:
return ()
matches = self._query_mapping(self._mentions_mapping, mentions)
matches = self.secure(matches)
return longkeysortreverse(matches, min, max, limit)
# --- HELPERS ---
[docs] def nested_uuids(self, context):
catalog = getToolByName(context, "portal_catalog")
results = catalog(
path={"query": "/".join(context.getPhysicalPath()), "depth": -1},
object_implements=IMicroblogContext,
)
# does not need to be deterministically sorted - will be used to
# retrieve statusupdates which then themselves will be sorted
return {item.UID for item in results}
[docs] def _query_mapping(self, mapping, keys):
"""
Calculate the union of all statusids indexed in <mapping>
on any of the <keys>.
Always returns an LLTreeSet ready for further processing.
"""
if not keys:
return LLBTree.LLTreeSet()
elif isinstance(keys, (str, unicode)):
# convert single key to set
keys = {keys}
# calculate the union set of matching ids across all tags
# silently discards all non-existing key ids
treesets = [mapping.get(id) for id in keys if id in mapping.keys()]
return LLBTree.multiunion(treesets)
[docs] def _update_mtime(self):
"""Update _mtime on statusupdate add.
Uses milliseconds. This flag is used for the cache key
and for the async time window.
"""
with LOCK:
self._mtime = int(time.time() * 1000)
[docs] def _update_ctime(self):
"""Update _ctime for cache busting.
Requires *micro*seconds granularity to avoid RuntimeError
by cache interference. Only used for cache invalidation.
"""
with LOCK:
self._ctime = int(time.time() * 1000000)
[docs]class QueuedStatusContainer(BaseStatusContainer):
"""A write performance optimized IStatusContainer.
This separates the queuing logic from the base class to make
the code more readable (and testable).
For performance reasons, an in-memory STATUSQUEUE is used.
StatusContainer.add() puts StatusUpdates into the queue.
MAX_QUEUE_AGE is the commit window in milliseconds.
To disable batch queuing, set MAX_QUEUE_AGE = 0
.add() calls .autoflush(), which flushes the queue when
._mtime is longer than MAX_QUEUE_AGE ago.
So each .add() checks the queue. In a low-traffic site this will
result in immediate disk writes (msg frequency < timeout).
In a high-traffic site this will result on one write per timeout,
which makes it possible to attain > 100 status update inserts
per second.
Note that the algorithm is structured in such a way, that the
system automatically adapts to low/high traffic conditions.
Additionally, a non-interactive queue flush is set up via
_schedule_flush() which uses a volatile thread timer _v_timer
to set up a non-interactive queue flush. This ensures that
the "last Tweet of the day" also gets committed to disk.
An attempt is made to make self._mtime and self._v_timer
thread-safe. These function as a kind of ad-hoc locking
mechanism so that only one thread at a time is flushing the
memory queue into persistent storage.
"""
implements(IStatusContainer)
# set ASYNC=False to force sync mode and ignore MAX_QUEUE_AGE
if api.env.test_mode():
ASYNC = False
else:
ASYNC = False # disabled because of CSRF issues
# max in-memory time in millisec before disk sync, if in ASYNC mode
MAX_QUEUE_AGE = 1000
[docs] def add(self, status):
self._check_status(status)
self._check_add_permission(status)
if self.ASYNC and self.MAX_QUEUE_AGE > 0:
self._queue(status)
# fallback sync in case of NO traffic (kernel timer)
self._schedule_flush()
# immediate sync on low traffic (old ._mtime)
# postpones sync on high traffic (next .add())
return self._autoflush() # updates _mtime on write
else:
self._store(status)
return 1 # immediate write
[docs] def _queue(self, status):
STATUSQUEUE.put((status.id, status))
[docs] def _schedule_flush(self):
"""A fallback queue flusher that runs without user interactions"""
if not self.MAX_QUEUE_AGE > 0:
return
try:
# non-persisted, absent on first request
self._v_timer
except AttributeError:
# initialize on first request
self._v_timer = None
if self._v_timer is not None:
# timer already running
return
# only a one-second granularity, round upwards
timeout = int(math.ceil(float(self.MAX_QUEUE_AGE) / 1000))
# Get the global request,
# we just need to copy over some environment stuff.
request = getRequest()
request_environ = {}
site = getSite()
# We do not use plone.api here because we cannot assume
# that site == plone portal.
# It could also be a directory under it
if site:
site_path = "/".join(site.getPhysicalPath())
if request is None:
# If we fail getting a request,
# we might still have it in portal
request = getattr(site, "REQUEST", None)
else:
# This situation can happen in tests.
logger.warning("Could not get the site")
site_path = None
if request is not None:
request_environ = {
"SERVER_NAME": request.SERVER_NAME,
"SERVER_PORT": request.SERVER_PORT,
"REQUEST_METHOD": request.REQUEST_METHOD,
}
with LOCK:
# logger.info("Setting timer")
self._v_timer = threading.Timer(
timeout,
self._scheduled_autoflush,
kwargs={"site_path": site_path, "environ": request_environ},
)
self._v_timer.start()
[docs] def _scheduled_autoflush(self, site_path=None, environ=None):
"""This method is run from the timer, outside a normal request scope.
This requires an explicit commit on db write"""
if site_path is not None:
app = makerequest(getZope2App(), environ=environ)
site = app.restrictedTraverse(site_path)
setSite(site)
if self._autoflush(): # returns 1 on actual write
transaction.commit()
[docs] def _autoflush(self):
# logger.info("autoflush")
# compares millisecond _mtime with millisecond MAX_QUEUE_AGE
if int(time.time() * 1000) - self._mtime > self.MAX_QUEUE_AGE:
return self.flush_queue() # 1 on write, 0 on noop
return 0 # no write
[docs] def flush_queue(self):
# update marker - block autoflush
self._update_mtime()
with LOCK:
try:
# cancel scheduled flush
if self.MAX_QUEUE_AGE > 0 and self._v_timer is not None:
# logger.info("Cancelling timer")
self._v_timer.cancel()
self._v_timer = None
except AttributeError:
self._v_timer = None
if STATUSQUEUE.empty():
return 0 # no write
while True:
try:
(id, status) = STATUSQUEUE.get(block=False)
safeWrite(status)
self._store(status)
except Queue.Empty:
break
return 1 # confirmed write