Source code for ploneintranet.microblog.statusupdate
# -*- coding=utf-8 -*-
from AccessControl import getSecurityManager
from AccessControl import Unauthorized
from ploneintranet.docconv.client.interfaces import IPreviewSettings
from DateTime import DateTime
from interfaces import IStatusUpdate
from persistent import Persistent
from plone import api
from plone.app.uuid.utils import uuidToObject
from plone.uuid.interfaces import IUUID
from ploneintranet import api as pi_api
from ploneintranet.attachments.attachments import IAttachmentStoragable
from ploneintranet.attachments.attachments import IAttachmentStorage
from ploneintranet.attachments.utils import add_attachments
from ploneintranet.attachments.utils import create_attachment
from ploneintranet.microblog.interfaces import IMicroblogTool
from Products.CMFCore.utils import getToolByName
from zope.annotation.interfaces import IAttributeAnnotatable
from zope.component import queryUtility
from zope.component.hooks import getSite
from zope.interface import implements
import logging
import time
logger = logging.getLogger("ploneintranet.microblog")
[docs]class StatusUpdate(Persistent):
implements(IAttachmentStoragable, IAttributeAnnotatable, IStatusUpdate)
def __init__(
self,
text=u"",
microblog_context=None,
thread_id=None,
mention_ids=None,
tags=None,
content_context=None,
action_verb=None,
):
self.__parent__ = self.__name__ = None
self.id = long(time.time() * 1e6) # modified by IStatusContainer
self.thread_id = thread_id
self.text = text
self.date = DateTime()
self._init_mentions(mention_ids)
self._init_userid()
self._init_creator()
self._init_microblog_context(thread_id, microblog_context, content_context)
self._init_content_context(thread_id, content_context)
self.tags = tags
self._verb = action_verb
# Initialize the attachment storage (see #1230)
IAttachmentStorage(self, None)
[docs] def edit(self, text):
"""keeps original text across multiple edits"""
if not self.can_edit:
raise Unauthorized("You are not allowed to edit this statusupdate")
if not self.original_text:
self._original_text = self.text
self.text = text
self._edited_date = DateTime()
# this would be the right place to notify modification
logger.info(
"%s modified text on statusupdate %s", api.user.get_current().id, self.id
)
[docs] def delete(self):
if not self.can_delete:
raise Unauthorized("You are not allowed to edit this statusupdate")
container = queryUtility(IMicroblogTool)
container.delete(self.id)
@property
def can_edit(self):
"""
StatusUpdates have no local 'owner' role. Instead we check against
permissions on the microblog context and compare with the creator.
"""
edit_all = "Plone Social: Modify Microblog Status Update"
edit_own = "Plone Social: Modify Own Microblog Status Update"
return api.user.has_permission(edit_all, obj=self.microblog_context) or (
api.user.has_permission(edit_own, obj=self.microblog_context)
and self.userid == api.user.get_current().id
)
@property
def can_delete(self):
"""
StatusUpdates have no local 'owner' role. Instead we check against
permissions on the microblog context and compare with the creator.
"""
delete_all = "Plone Social: Delete Microblog Status Update"
delete_own = "Plone Social: Delete Own Microblog Status Update"
return api.user.has_permission(delete_all, obj=self.microblog_context) or (
api.user.has_permission(delete_own, obj=self.microblog_context)
and self.userid == api.user.get_current().id
)
@property
def original_text(self):
"""Return original text of a (multiply) edited update."""
try:
return self._original_text
except AttributeError:
return None
@property
def edited(self):
"""Return last edit date if modified, or None if never changed."""
try:
return self._edited_date
except AttributeError:
return None
# for unittest subclassing
[docs] def _init_userid(self):
self.userid = getSecurityManager().getUser().getId()
# for unittest subclassing
[docs] def _init_creator(self):
portal_membership = getToolByName(getSite(), "portal_membership")
member = portal_membership.getAuthenticatedMember()
self.creator = member.getId()
# for unittest subclassing
[docs] def _init_microblog_context(
self, thread_id, microblog_context=None, content_context=None
):
"""Set the right security context.
If thread_id is given, the context of the thread parent is used
and the given context arg is ignored.
E.g. a reply globally to a parent post done in a workspace
takes the security context of the parent post.
"""
from ploneintranet import api as piapi # FIXME circular dependency
# thread_id takes precedence over microblog_context arg!
if thread_id:
parent = piapi.microblog.statusupdate.get(thread_id)
self._microblog_context_uuid = parent._microblog_context_uuid
elif microblog_context is None and content_context is None:
self._microblog_context_uuid = None
else:
# derive microblog_context from content_context if necessary
m_context = piapi.microblog.get_microblog_context(
microblog_context or content_context
)
self._microblog_context_uuid = self._context2uuid(m_context)
# for unittest subclassing
[docs] def _init_content_context(self, thread_id, content_context):
""" We store the uuid as a reference of a content_context
related to this status update
"""
from ploneintranet import api as piapi # FIXME circular dependency
if thread_id:
parent = piapi.microblog.statusupdate.get(thread_id)
self._content_context_uuid = parent._content_context_uuid
else:
self._content_context_uuid = self._context2uuid(content_context)
[docs] def _init_mentions(self, mention_ids):
self.mentions = {}
if mention_ids is None:
return
for userid in mention_ids:
user = pi_api.userprofile.get(userid) or api.user.get(userid)
if user is not None:
self.mentions[userid] = getattr(
user, "fullname", ""
) or user.getProperty("fullname")
@property
def action_verb(self):
"""Backward compatible accessor"""
return self._verb or u"posted"
[docs] def replies(self):
from ploneintranet import api as piapi
container = piapi.microblog.get_microblog()
for reply in container.thread_values(self.id):
if reply.id != self.id:
yield reply
@property
def microblog_context(self):
uuid = self._microblog_context_uuid
return self._uuid2context(uuid)
@property
def content_context(self):
if not self._content_context_uuid:
return None
uuid = self._content_context_uuid
return self._uuid2context(uuid)
@property
def is_human_update(self):
"""
A 'human' update is either a toplevel post without content_context,
or a reply (even a reply on a content update). Or, possibly,
a stream post with attachment that got converted to a content update.
"""
if self.thread_id:
return True
if self.text.strip() != "":
return True
return not self.is_content_update
@property
def is_content_update(self):
"""
Show content updates, including replies on toplevel content updates.
"""
return self._content_context_uuid is not None
[docs] def _uuid2context(self, uuid=None):
if not uuid:
return None
context = self._uuid2object(uuid)
if context is None:
# typically happens when unauthorized to access context
# but may be caused by missing uuid index (e.g. on copy)
raise Unauthorized(
"Context with uuid {0} could not be " "retrieved".format(uuid)
)
return context
# unittest override point
[docs] def _context2uuid(self, context):
if context is None:
return None
return IUUID(context)
# unittest override point
[docs] def _uuid2object(self, uuid):
return uuidToObject(uuid)
# Act a bit more like a catalog brain:
[docs] def getURL(self):
return ""
[docs] def getObject(self):
try:
c_obj = self.context_object
except AttributeError:
# backward compatibility
c_obj = self.context_object = None
return c_obj or self
[docs] def Title(self):
return self.text
[docs] def getId(self):
return self.id
[docs] def getCharset(self):
""" Return the charset for this file
BBB: This is a bit weird.
This method is here because the _prepare_imagedata
method wants it.
See https://github.com/ploneintranet/ploneintranet/blob/251c8cf9f1e69c38030b6b6ac2f7c93c86ae1e60/src/ploneintranet/microblog/browser/attachments.py#L45 # noqa
"""
return "utf8"
@property
def attachments(self):
"""The attachment storage. Lists filenames via .keys()."""
return IAttachmentStorage(self)
[docs] def add_attachment(self, filename, data):
"""
Add a binary attachment.
Can be called multiple times to attach multiple files
:param filename: name of the file to attach
:type action_verb: string
:param filename: file data to attach
:type action_verb: binary
"""
attachment = create_attachment(filename, data)
# Init document viewer settings (prevents writes on read)
IPreviewSettings(attachment)
add_attachments([attachment], self.attachments)
[docs] def remove_attachment(self, filename):
"""Remove the attachment named <filename>
:param filename: name of the file to remove
:type action_verb: string
"""
self.attachments.remove(filename)
[docs] def absolute_url(self):
context = self.content_context
if context:
return "{}/view".format(context.absolute_url())
context = self.microblog_context
if not context:
context = api.portal.get()
return "{}/@@post".format(context.absolute_url())