Source code for ploneintranet.workspace.utils
# -*- coding: utf-8 -*-
from AccessControl.SecurityManagement import newSecurityManager
from Acquisition import aq_chain
from BTrees.OOBTree import OOBTree
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from plone import api
from plone.dexterity.utils import iterSchemata
from plone.rfc822.interfaces import IPrimaryField
from ploneintranet.layout.app import IApp
from ploneintranet.workspace.interfaces import IWorkspaceFolder
from Products.CMFCore.interfaces import ISiteRoot
from zope.annotation import IAnnotations
from zope.component import getUtility
from zope.i18nmessageid import MessageFactory
from zope.schema import getFieldsInOrder
import config
import logging
import mimetypes
pl_message = MessageFactory("plonelocales")
log = logging.getLogger(__name__)
ANNOTATION_KEY = "ploneintranet.workspace.invitation_storage"
# The type map is used to deduct clear text names for classes and labels
# from portal types
TYPE_MAP = {
"Event": "event",
"Folder": "folder",
"Document": "rich document",
"todo": "task",
"ploneintranet.workspace.mail": "email",
"ploneintranet.workspace.workspacefolder": "workspace",
"Image": "image",
}
[docs]def get_storage(clear=False):
"""helper function to get annotation storage on the portal
:param clear: If true is passed in, annotations will be cleared
:returns: portal annotations
:rtype: IAnnotations
"""
portal = getUtility(ISiteRoot)
annotations = IAnnotations(portal)
if ANNOTATION_KEY not in annotations or clear:
annotations[ANNOTATION_KEY] = OOBTree()
return annotations[ANNOTATION_KEY]
[docs]def send_email(recipient, subject, message, sender="ploneintranet@netsight.co.uk"):
"""helper function to send an email with the sender preset
"""
try:
api.portal.send_email(
recipient=recipient, sender=sender, subject=subject, body=message
)
except ValueError as e:
log.error("MailHost error: {0}".format(e))
[docs]def render_and_send_mail(obj, view_name, recipients):
""" Renders view_name on obj and sends the result to recipients.
The mail subject is read from the mail view attribute subject.
"""
message = MIMEMultipart()
mailview = api.content.get_view(name=view_name, context=obj, request=obj.REQUEST)
body = mailview()
message.attach(MIMEText(body.encode("utf8"), "html", _charset="utf-8"))
try:
api.portal.send_email(
recipient=recipients,
subject=mailview.subject,
body=message,
immediate=False,
)
except Exception as e:
api.portal.show_message(str(e), obj.REQUEST, "error")
log.exception("Error sending the email")
[docs]def parent_workspace(context):
""" Return containing workspace
Returns None if not found.
"""
for parent in aq_chain(context):
if IWorkspaceFolder.providedBy(parent):
return parent
[docs]def in_workspace(context):
return IWorkspaceFolder.providedBy(parent_workspace(context))
[docs]def parent_app(context):
""" Return containing workspace
Returns None if not found.
"""
if IApp.providedBy(context):
return context
for parent in aq_chain(context):
if IApp.providedBy(parent):
return parent
[docs]def in_app(context):
return IApp.providedBy(parent_workspace(context))
[docs]def guess_mimetype(file_name):
content_type = mimetypes.guess_type(file_name)[0]
# sometimes plone mimetypes registry could be more powerful
if not content_type:
mtr = api.portal.get_tool("mimetypes_registry")
oct = mtr.globFilename(file_name)
if oct is not None:
content_type = str(oct)
return content_type
[docs]def map_content_type(mimetype, portal_type=""):
"""
takes a mimetype and returns a content type string as used in the
prototype
"""
if portal_type in TYPE_MAP:
return TYPE_MAP[portal_type]
if not mimetype:
return ""
if mimetype in config.PDF:
return "pdf"
if mimetype in config.DOC:
return "word"
if mimetype in config.PPT:
return "powerpoint"
if mimetype in config.ZIP:
return "zip"
if mimetype in config.XLS:
return "excel"
if mimetype in config.URI:
return "link"
if mimetype in config.NEWS:
return "news"
major = (mimetype or "").partition("/")[0]
if major == "text":
return "rich document"
if major == "audio":
return "sound"
if major == "video":
return "video"
if major == "image":
return "image"
return ""
[docs]def purge_and_refresh_security_manager():
""" This is necessary in case you have a cache on your acl_users folder.
This method purges the configured cache on the acl_users folder and
reinitialises the security manager for the current user.
This is necessary as example when we are creating a workspace and right
afterwards transition it into the private state. The transition is guarded
by the TeamManager role which the current user just got when the workspace
was created. This is however not yet reflected in the cached user object.
This would not be an issue in the next upcoming request as the security
context will be rebuilt then, but in the current request, this is a
problem.
"""
# purge the acl_users cache
acl_users = api.portal.get_tool("acl_users")
if acl_users.ZCacheable_enabled():
acl_users.ZCacheable_invalidate()
# reinitialise the security manager
current_user_name = api.user.get_current().getUserName()
current_user = acl_users.getUser(current_user_name)
newSecurityManager(None, current_user)
[docs]def get_primary_field(obj):
primary = None
for i in iterSchemata(obj):
fields = getFieldsInOrder(i)
for name, field in fields:
if IPrimaryField.providedBy(field):
primary = (name, field)
break
return primary
[docs]class OpenEndedGraph(object):
def __init__(self, nodes, edges):
""" A graph that has nodes and edges,
edges start and end might be or not nodes
"""
self.nodes = nodes
self.edges = edges
[docs] def get_meaningful_children(self, start_node, edges=None):
""" Given a start node, return all the edges endpoints
that lead to (at least) another node.
We work on copy of the edges to avoid circular relations:
once an edge is evaluated it is discarded
"""
if edges is None:
edges = self.edges[:]
children = set()
for edge in edges[:]:
start, end = edge
if start == start_node:
# protect from circular references
edges.remove(edge)
if end in self.nodes or self.get_meaningful_children(end, edges):
children.add(end)
return children
[docs] def get_descendants(self, start_node):
""" Return all the descendants of a node
"""
children = set()
for edge in self.edges:
start, end = edge
if start == start_node:
children.add(end)
children.update(self.get_descendants(end))
return children