Source code for ploneintranet.workspace.utils

# -*- coding: utf-8 -*-
from AccessControl.SecurityManagement import newSecurityManager
from Acquisition import aq_chain
from BTrees.OOBTree import OOBTree
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from plone import api
from plone.dexterity.utils import iterSchemata
from plone.rfc822.interfaces import IPrimaryField
from ploneintranet.layout.app import IApp
from ploneintranet.workspace.interfaces import IWorkspaceFolder
from Products.CMFCore.interfaces import ISiteRoot
from zope.annotation import IAnnotations
from zope.component import getUtility
from zope.i18nmessageid import MessageFactory
from zope.schema import getFieldsInOrder

import config
import logging
import mimetypes


pl_message = MessageFactory("plonelocales")
log = logging.getLogger(__name__)

ANNOTATION_KEY = "ploneintranet.workspace.invitation_storage"

# The type map is used to deduct clear text names for classes and labels
# from portal types
TYPE_MAP = {
    "Event": "event",
    "Folder": "folder",
    "Document": "rich document",
    "todo": "task",
    "ploneintranet.workspace.mail": "email",
    "ploneintranet.workspace.workspacefolder": "workspace",
    "Image": "image",
}


[docs]def get_storage(clear=False): """helper function to get annotation storage on the portal :param clear: If true is passed in, annotations will be cleared :returns: portal annotations :rtype: IAnnotations """ portal = getUtility(ISiteRoot) annotations = IAnnotations(portal) if ANNOTATION_KEY not in annotations or clear: annotations[ANNOTATION_KEY] = OOBTree() return annotations[ANNOTATION_KEY]
[docs]def send_email(recipient, subject, message, sender="ploneintranet@netsight.co.uk"): """helper function to send an email with the sender preset """ try: api.portal.send_email( recipient=recipient, sender=sender, subject=subject, body=message ) except ValueError as e: log.error("MailHost error: {0}".format(e))
[docs]def render_and_send_mail(obj, view_name, recipients): """ Renders view_name on obj and sends the result to recipients. The mail subject is read from the mail view attribute subject. """ message = MIMEMultipart() mailview = api.content.get_view(name=view_name, context=obj, request=obj.REQUEST) body = mailview() message.attach(MIMEText(body.encode("utf8"), "html", _charset="utf-8")) try: api.portal.send_email( recipient=recipients, subject=mailview.subject, body=message, immediate=False, ) except Exception as e: api.portal.show_message(str(e), obj.REQUEST, "error") log.exception("Error sending the email")
[docs]def parent_workspace(context): """ Return containing workspace Returns None if not found. """ for parent in aq_chain(context): if IWorkspaceFolder.providedBy(parent): return parent
[docs]def in_workspace(context): return IWorkspaceFolder.providedBy(parent_workspace(context))
[docs]def parent_app(context): """ Return containing workspace Returns None if not found. """ if IApp.providedBy(context): return context for parent in aq_chain(context): if IApp.providedBy(parent): return parent
[docs]def in_app(context): return IApp.providedBy(parent_workspace(context))
[docs]def guess_mimetype(file_name): content_type = mimetypes.guess_type(file_name)[0] # sometimes plone mimetypes registry could be more powerful if not content_type: mtr = api.portal.get_tool("mimetypes_registry") oct = mtr.globFilename(file_name) if oct is not None: content_type = str(oct) return content_type
[docs]def map_content_type(mimetype, portal_type=""): """ takes a mimetype and returns a content type string as used in the prototype """ if portal_type in TYPE_MAP: return TYPE_MAP[portal_type] if not mimetype: return "" if mimetype in config.PDF: return "pdf" if mimetype in config.DOC: return "word" if mimetype in config.PPT: return "powerpoint" if mimetype in config.ZIP: return "zip" if mimetype in config.XLS: return "excel" if mimetype in config.URI: return "link" if mimetype in config.NEWS: return "news" major = (mimetype or "").partition("/")[0] if major == "text": return "rich document" if major == "audio": return "sound" if major == "video": return "video" if major == "image": return "image" return ""
[docs]def purge_and_refresh_security_manager(): """ This is necessary in case you have a cache on your acl_users folder. This method purges the configured cache on the acl_users folder and reinitialises the security manager for the current user. This is necessary as example when we are creating a workspace and right afterwards transition it into the private state. The transition is guarded by the TeamManager role which the current user just got when the workspace was created. This is however not yet reflected in the cached user object. This would not be an issue in the next upcoming request as the security context will be rebuilt then, but in the current request, this is a problem. """ # purge the acl_users cache acl_users = api.portal.get_tool("acl_users") if acl_users.ZCacheable_enabled(): acl_users.ZCacheable_invalidate() # reinitialise the security manager current_user_name = api.user.get_current().getUserName() current_user = acl_users.getUser(current_user_name) newSecurityManager(None, current_user)
[docs]def get_primary_field(obj): primary = None for i in iterSchemata(obj): fields = getFieldsInOrder(i) for name, field in fields: if IPrimaryField.providedBy(field): primary = (name, field) break return primary
[docs]class OpenEndedGraph(object): def __init__(self, nodes, edges): """ A graph that has nodes and edges, edges start and end might be or not nodes """ self.nodes = nodes self.edges = edges
[docs] def get_meaningful_children(self, start_node, edges=None): """ Given a start node, return all the edges endpoints that lead to (at least) another node. We work on copy of the edges to avoid circular relations: once an edge is evaluated it is discarded """ if edges is None: edges = self.edges[:] children = set() for edge in edges[:]: start, end = edge if start == start_node: # protect from circular references edges.remove(edge) if end in self.nodes or self.get_meaningful_children(end, edges): children.add(end) return children
[docs] def get_descendants(self, start_node): """ Return all the descendants of a node """ children = set() for edge in self.edges: start, end = edge if start == start_node: children.add(end) children.update(self.get_descendants(end)) return children