Source code for ploneintranet.workspace.browser.views

# -*- coding: utf-8 -*-
from AccessControl import Unauthorized
from Acquisition import aq_inner
from collective.workspace.interfaces import IWorkspace
from plone import api
from plone.app.content.browser.file import FileUploadView as BaseFileUploadView
from plone.app.dexterity.factories import upload_lock
from plone.app.layout.viewlets.content import (
    ContentHistoryView as BaseContentHistoryView,
)  # noqa
from plone.app.workflow.browser.sharing import SharingView as BaseSharingView
from plone.dexterity.interfaces import IDexterityFTI
from plone.dexterity.utils import createContentInContainer
from ploneintranet.layout.memoize.view import memoize
from plone.namedfile.file import NamedBlobFile
from plone.namedfile.file import NamedBlobImage
from plone.protect.authenticator import createToken
from plone.rfc822.interfaces import IPrimaryFieldInfo
from plone.uuid.interfaces import IUUID
from ploneintranet import api as pi_api
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.docconv.client.handlers import generate_pdf
from ploneintranet.workspace.config import INTRANET_USERS_GROUP_ID
from ploneintranet.workspace.subscribers import _update_workspace_groupings
from Products.CMFCore.utils import _checkPermission
from Products.CMFEditions.Permissions import AccessPreviousVersions
from Products.CMFPlone import utils as ploneutils
from Products.Five.browser import BrowserView
from urllib import urlencode
from zope.component.interfaces import IObjectEvent
from zope.component.interfaces import ObjectEvent
from zope.container.interfaces import INameChooser
from zope.event import notify
from zope.interface import implementer

import json
import mimetypes


[docs]class IObjectUploadedEvent(IObjectEvent): """An object has been uploaded """
[docs]@implementer(IObjectUploadedEvent) class ObjectUploadedEvent(ObjectEvent): """An object has been uploaded """
[docs]class JoinView(BrowserView): """ adds a user to the team on a self-join workspace """ def __call__(self): if not self.context.join_policy == "self": msg = _(u"Workspace join policy doesn't allow self join") raise Unauthorized(msg) field = "button.join" req_method = self.request.method.lower() if req_method == "post" and field in self.request.form: user = api.user.get_current() workspace = IWorkspace(self.context) workspace.add_to_team(user.getId()) msg = _(u"You are a member of this workspace now") api.portal.show_message(message=_(msg), request=self.request) referer = self.request.get("HTTP_REFERER", "").strip() if not referer: referer = self.context.absolute_url() return self.request.response.redirect(referer)
[docs]class SharingView(BaseSharingView): """ override the sharing tab """
[docs] def can_edit_inherit(self): """ Disable "inherit permissions" checkbox """ return False
[docs] def role_settings(self): """ Filter out unwanted to show groups """ results = super(SharingView, self).role_settings() uid = self.context.UID() # We do not want to share to the current context, # to authenticated users # and to all intranet users return [ result for result in results if not ( result["id"].endswith(uid) or result["id"] in ("AuthenticatedUsers", INTRANET_USERS_GROUP_ID) ) ]
[docs] def user_search_results(self): """ Add [member] to a user title if user is a member of current workspace """ results = super(SharingView, self).user_search_results() ws = IWorkspace(self.context) roles_mapping = ws.available_groups roles = roles_mapping.get(self.context.participant_policy.title()) for result in results: if result["id"] in ws.members: groups = ws.get(result["id"]).groups for role in roles: result["roles"][role] = "acquired" if "Admins" in groups: title = "administrator" result["roles"]["TeamManager"] = "acquired" else: title = "member" result["title"] = "%s [%s]" % (result["title"], title) return results
[docs]class FileUploadView(BaseFileUploadView): """Redirect to the workspace view so we can inject.""" @property @memoize def groupname(self): """ Return the groupname """ groupname = self.request.get("groupname") if groupname and groupname != "Untagged": return groupname def __call__(self): with pi_api.env.indexing_disabled(self.request): result = self.process_request() if self.request.get_header("HTTP_ACCEPT") == "application/json": self.request.response.setHeader("Content-type", "application/json") return json.dumps(result) target = self.context.absolute_url() + "/@@sidebar.documents" groupname = self.groupname if groupname: target = "{url}?{qs}".format( url=target, qs=urlencode({"groupname": groupname}) ) self.request.response.redirect(target)
[docs] def process_request(self): # XXX: We don't support the TUS resumable file upload protocol. # The pat-upload pattern supports it (due to mockup) and # plone.app.content.browser.file.py also supports it, but at the cost # of not being able to upload multiple files at once. We decided that # that's more important at the moment. if self.request.REQUEST_METHOD != "POST": return [] result = [] form = self.request.form for name in [k for k in form.keys() if k.startswith("file")]: output = self.create_file_from_request(name) if output: result.append(output) return result
[docs] def post_factory(self, obj): """ Things to do after the object has been created in this form """ groupname = self.groupname if groupname: obj.setSubject(groupname) _update_workspace_groupings(obj, None)
[docs] def set_filedata(self, obj, filedata, filename, content_type): """ Try to understand what is the primary field and store the data there """ try: info = IPrimaryFieldInfo(obj, None) except TypeError: info = None fieldname = info and info.fieldname or "file" # This should take care of File and images if "image" in fieldname: blob_factory = NamedBlobImage else: blob_factory = NamedBlobFile filename = ploneutils.safe_unicode(filename) value = blob_factory(data=filedata, filename=filename, contentType=content_type) return setattr(obj, fieldname, value)
[docs] def create_dx_file(self, filename, content_type, filedata, portal_type): """ Inspired by plone.app.dexterity.factories.IDXFileFactory """ name = filename.decode("utf8") chooser = INameChooser(self.context) # otherwise I get ZPublisher.Conflict ConflictErrors # when uploading multiple files upload_lock.acquire() newid = chooser.chooseName(name, self.context.aq_parent) try: # Temporarily disable previews because we want to generate # them with a longer delay pi_api.events.disable_previews() obj = createContentInContainer( self.context, portal_type, id=newid, title=name ) self.set_filedata(obj, filedata, name, content_type) generate_pdf(obj, countdown=60) finally: pi_api.events.enable_previews() upload_lock.release() return obj
[docs] def create_file_from_request(self, name): context = self.context filedata = self.request.form.get(name, None) if not filedata: return filename = filedata.filename content_type = mimetypes.guess_type(filename)[0] or "" # Determine if the default file/image types are DX or AT based ctr = api.portal.get_tool("content_type_registry") type_ = ctr.findTypeName(filename.lower(), content_type, "") if not type_ == "Image": type_ = "File" pt = api.portal.get_tool("portal_types") if IDexterityFTI.providedBy(getattr(pt, type_)): obj = self.create_dx_file(filename, content_type, filedata, type_) self.post_factory(obj) notify(ObjectUploadedEvent(obj)) if hasattr(obj, "file"): size = obj.file.getSize() content_type = obj.file.contentType elif hasattr(obj, "image"): size = obj.image.getSize() content_type = obj.image.contentType else: return result = {"type": content_type, "size": size} else: from Products.ATContentTypes.interfaces import IATCTFileFactory obj = IATCTFileFactory(context)(filename, content_type, filedata) self.post_factory(obj) try: size = obj.getSize() except AttributeError: size = obj.getObjSize() result = {"type": obj.getContentType(), "size": size} result.update( { "url": obj.absolute_url(), "name": obj.getId(), "UID": IUUID(obj), "filename": filename, } ) return result
[docs]class ContentHistoryView(BaseContentHistoryView): """ Customised so that we can provide more info about the revisions in our history. """
[docs] def revisionHistory(self): context = aq_inner(self.context) if not _checkPermission(AccessPreviousVersions, context): return [] rt = api.portal.get_tool("portal_repository") if rt is None or not rt.isVersionable(context): return [] context_url = context.absolute_url() history = rt.getHistoryMetadata(context) # XXX Hardcoded: diff is not supported can_diff = False can_revert = _checkPermission( "CMFEditions: Revert to previous versions", context ) def morphVersionDataToHistoryFormat(vdict, version_id): meta = vdict["metadata"]["sys_metadata"] userid = meta["principal"] token = createToken() # XXX For Files, link to the download view if context.portal_type in ("File",): preview_url = "{0}/download_file_version?version_id={1}&_authenticator={2}".format( # noqa: E501 context_url, version_id, token ) else: preview_url = ( "%s/versions_history_form?version_id=%s&_authenticator=%s#version_preview" # noqa: E501 % (context_url, version_id, token) ) info = dict( type="versioning", action=_(u"Edited"), transition_title=_(u"Edited"), actorid=userid, time=meta["timestamp"], comments=meta["comment"], version_id=version_id, preview_url=preview_url, ) up_to_date = rt.isUpToDate(context, version_id) if can_diff: if version_id > 0: info["diff_previous_url"] = ( "%s/@@history?one=%s&two=%s&_authenticator=%s" % (context_url, version_id, version_id - 1, token) ) if not up_to_date: info["diff_current_url"] = ( "%s/@@history?one=current&two=%s&_authenticator=%s" % (context_url, version_id, token) ) if can_revert and not up_to_date: info["revert_url"] = "%s/revertversion" % context_url else: info["revert_url"] = None info.update(self.getUserInfo(userid)) return info # History may be an empty list if not history: return history version_history = [] # Count backwards from most recent to least recent for i in xrange(history.getLength(countPurged=False) - 1, -1, -1): vdict = history.retrieve(i, countPurged=False) version_id = history.getVersionId(i, countPurged=False) morphed_data = morphVersionDataToHistoryFormat(vdict, version_id) version_history.append(morphed_data) return version_history