Source code for ploneintranet.workspace.subscribers

# -*- coding: utf-8 -*-
from AccessControl.SecurityManagement import newSecurityManager
from Acquisition import aq_base
from Acquisition import aq_chain
from Acquisition import aq_parent
from collective.workspace.interfaces import IWorkspace
from datetime import datetime
from OFS.CopySupport import cookie_path
from OFS.interfaces import IObjectWillBeRemovedEvent
from plone import api
from plone.api.exc import PloneApiError
from plone.app.content.interfaces import INameFromTitle
from plone.i18n.normalizer.interfaces import IUserPreferredURLNormalizer
from ploneintranet import api as pi_api
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.microblog.interfaces import IStatusUpdate
from ploneintranet.workspace import workspacefolder
from ploneintranet.workspace.behaviors.group import IMembraneGroup
from ploneintranet.workspace.case import ICase
from ploneintranet.workspace.interfaces import IBaseWorkspaceFolder
from ploneintranet.workspace.interfaces import IGroupingStoragable
from ploneintranet.workspace.interfaces import IGroupingStorage
from ploneintranet.workspace.unrestricted import execute_as_manager
from ploneintranet.workspace.utils import get_storage
from ploneintranet.workspace.utils import in_workspace
from ploneintranet.workspace.utils import parent_workspace
from ploneintranet.workspace.utils import render_and_send_mail
from Products.CMFPlacefulWorkflow.PlacefulWorkflowTool import (
    WorkflowPolicyConfig_id,
)  # noqa
from zExceptions import BadRequest
from zope.annotation.interfaces import IAnnotations
from zope.component import getAdapter
from zope.component import getMultiAdapter
from zope.container.interfaces import IContainerModifiedEvent
from zope.container.interfaces import INameChooser
from zope.globalrequest import getRequest
from zope.lifecycleevent.interfaces import IObjectAddedEvent
from zope.lifecycleevent.interfaces import IObjectCopiedEvent
from zope.lifecycleevent.interfaces import IObjectRemovedEvent

import logging


log = logging.getLogger(__name__)

WORKSPACE_INTERFACE = "collective.workspace.interfaces.IHasWorkspace"
MAX_ID_FROM_TITLE_LENGTH = 70


[docs]def _reset_security_context(userid, request, invalidate_cache=False): IAnnotations(request)[("workspaces", userid)] = None acl_users = api.portal.get_tool("acl_users") if invalidate_cache and acl_users.ZCacheable_enabled(): acl_users.ZCacheable_invalidate() user = acl_users.getUserById(userid) if user is not None: # NB when copying a template with execute_as_manager # this is 'finally' replaced again newSecurityManager(None, user)
[docs]def workspace_added(ob, event): """ when a workspace is created, we add the creator to the admin group. We then setup our placeful workflow """ is_case = ICase.providedBy(ob) preserve_template_ownership = api.portal.get_registry_record( "ploneintranet.workspace.preserve_template_ownership", default=False ) userid = api.user.get_current().id # If not stated otherwise through the registry record: # ploneintranet.workspace.preserve_template_ownership # whoever creates the workspace should be added as an Admin # When copying a template, that is the current user # (not the one who created the original template) if (not is_case) or (not preserve_template_ownership): ob.setCreators([userid]) IWorkspace(ob).add_to_team(userid, groups=set(["Admins"])) # During workspace creation, various functions # are called (renaming / workflow transitions) which do # low-level AccessControl checks. # Unfortunately these checks never re-ask PAS for a user's roles # or groups during a request, so we have to manually re-initialise # the security context for the current user. # ref: https://github.com/ploneintranet/ploneintranet/pull/438 _reset_security_context(userid, ob.REQUEST) if is_case: """Case Workspaces have their own custom workflows """ return # Configure our placeful workflow cmfpw = "CMFPlacefulWorkflow" try: ob.manage_addProduct[cmfpw].manage_addWorkflowPolicyConfig() except BadRequest: # 'The id ".wf_policy_config" is invalid - it is already in use.' # copying a template workspace which already has a policy defined return # Set the policy for the config pc = getattr(ob, WorkflowPolicyConfig_id) pc.setPolicyIn("", update_security=True) pc.setPolicyBelow("ploneintranet_policy")
[docs]def invitation_accepted(event): """ When an invitation is accepted, add the user to the team. This apparently is NOT a ploneintranet.invitations invitation, but by ploneintranet.workspace.browser.forms.InviteForm. """ request = getRequest() storage = get_storage() if event.token_id not in storage: return ws_uid, userid = storage[event.token_id] # in a email_as_login setting (see #1043) # .userid == .username but .username != .getUserName() ... :-( username = api.user.get(userid=userid).getUserName() acl_users = api.portal.get_tool("acl_users") acl_users.updateCredentials( request, request.response, username, None # the login, not the userid ) catalog = api.portal.get_tool(name="portal_catalog") brain = catalog.unrestrictedSearchResults(UID=ws_uid)[0] with api.env.adopt_roles(["Manager"]): ws = IWorkspace(brain.getObject()) # collective.workspace .members returns userid iterator for member_id in ws.members: # don't clobber userid variable member = api.user.get(userid=member_id) if member is not None: if member.getId() == userid: api.portal.show_message( _("Oh boy, oh boy, you are already a member"), request ) break else: ws.add_to_team(userid) api.portal.show_message(_("Welcome to our family, Stranger"), request)
[docs]def user_deleted_from_site_event(event): """ Remove deleted user from all the workspaces where he is a member """ userid = event.principal catalog = api.portal.get_tool("portal_catalog") query = {"object_provides": WORKSPACE_INTERFACE} query["workspace_members"] = userid workspaces = [ IWorkspace(b._unrestrictedGetObject()) for b in catalog.unrestrictedSearchResults(query) ] for workspace in workspaces: workspace.remove_from_team(userid)
[docs]def _update_workspace_groupings(obj, event): """ If the relevant object is inside a workspace, the workspace grouping parameters (for the sidebar) need to be updated. """ parent = parent_workspace(obj) if parent is None or not IGroupingStoragable.providedBy(parent): return storage = getAdapter(parent, IGroupingStorage) if IObjectRemovedEvent.providedBy(event) or IObjectWillBeRemovedEvent.providedBy( event ): storage.remove_from_groupings(obj) else: storage.update_groupings(obj)
[docs]def folder_added_to_workspace(obj, event): # Iterare over parent of the object. Only remove the owner role # in the context of a workspace for item in aq_chain(obj)[1:]: if IBaseWorkspaceFolder.providedBy(item): user = api.user.get_current() pmt = api.portal.get_tool("portal_membership") wrapped_user = pmt.getMemberById(user.getId()) if not wrapped_user: return api.user.revoke_roles(user=user, obj=obj, roles=["Owner"]) obj.reindexObjectSecurity() break
[docs]def content_object_added_to_workspace(obj, event): _update_workspace_groupings(obj, event)
[docs]def content_object_edited_in_workspace(obj, event): _update_workspace_groupings(obj, event) create_title_from_id(obj, event)
[docs]def content_object_removed_from_workspace(obj, event): _update_workspace_groupings(obj, event)
[docs]def content_object_moved(obj, event): oldWorkspace = parent_workspace(event.oldParent) newWorkspace = parent_workspace(event.newParent) # ignore if oldParent or newParent is None or if obj has just # been created or removed if oldWorkspace is None or newWorkspace is None: return if aq_base(oldWorkspace) is aq_base(newWorkspace): return if IGroupingStoragable.providedBy(oldWorkspace): old_storage = getAdapter(oldWorkspace, IGroupingStorage) old_storage.remove_from_groupings(obj) if IGroupingStoragable.providedBy(newWorkspace): new_storage = getAdapter(newWorkspace, IGroupingStorage) new_storage.update_groupings(obj) # Since OFS.CopySupport.manage_pasteObjects is called without a REQUEST # parameter, cb_dataValid() will still be true, because __cp will not # be reset. We do that manually here, so that the "paste" action will # disappear from the action list. request = getattr(obj, "REQUEST", None) if request: request["RESPONSE"].setCookie( "__cp", "deleted", path="%s" % cookie_path(request), expires="Wed, 31-Dec-97 23:59:59 GMT", ) request["__cp"] = None
[docs]def update_todo_state(obj, event): """ After editing a Todo item, set the workflow state to either Open or Planned depending on the state of the Case. Also update access permissions on the Case, which might have changed due to a change in assignment. """ # Do nothing on copy if IObjectCopiedEvent.providedBy(event): return if IObjectAddedEvent.providedBy(event): # This attribute is set when copying from a template # handle_case_workflow_state_changed will take care of everything if getattr(event.newParent, "_v_skip_update_todo_state", None): return obj.set_appropriate_state() obj.reindexObject() parent = parent_workspace(obj) if ICase.providedBy(parent): parent.update_case_access()
[docs]def handle_case_workflow_state_changed(obj, event): """ When the workflow state of a Case changes, perform the following actions: * Update the contained Todo items ans set adjust their workflow state * Grant assignees on tasks of the current milestone guest access """ execute_as_manager(_update_todos_state, obj) obj.update_case_access()
[docs]def _update_todos_state(obj): """ Update the workflow state of Todo items in a Case, when the workflow state of the Case is changed. """ for child in obj.list_todos(): if getattr(child.aq_base, "portal_type", "") == "todo": _update_todo_state(child)
[docs]def _update_todo_state(todo): todo.set_appropriate_state()
[docs]def workspace_groupbehavior_toggled(obj, event): # If the IMembraneGroup behavior gets set or deactivated for workspaces, # the membrane tool needs to be updated, and all workspaces need to be # reindexed if obj.id != workspacefolder.__name__: return relevant_change = False for description in event.descriptions: if getattr(description, "attribute", None) == "behaviors": relevant_change = True break if not relevant_change: return try: membrane_tool = api.portal.get_tool("membrane_tool") # In case the membrane_tool cannot be found, just return. # This can happen in test scenarios that do not set up the full stack of # PloneIntranet. except PloneApiError as exc: log.error(exc) return if IMembraneGroup.__identifier__ in obj.behaviors: # The behavior was activated # Add the type to the membrane types and reindex all workspaces types = set(membrane_tool.membrane_types) types.add(workspacefolder.__name__) log.info("Enabling IMembraneGroup on %s", workspacefolder.__name__) membrane_tool.membrane_types = list(types) else: # The behavior was deactivated # Remove the type from the membrane types and reindex all workspaces types = [ typ for typ in membrane_tool.membrane_types if typ != workspacefolder.__name__ ] log.info("Disabling IMembraneGroup on %s", workspacefolder.__name__) membrane_tool.membrane_types = types catalog = api.portal.get_tool("portal_catalog") membrane_catalog = membrane_tool._catalog for result in catalog(portal_type=workspacefolder.__name__): workspace = result.getObject() workspace.reindexObject() membrane_catalog.reindexObject(workspace)
[docs]def _is_id_good_enough(objid, normalizedid, container): """ Check if the actual objid is good enough with respect to the normalizedid. """ if objid == normalizedid: return True # If the normalizedid is avaliable, we should use it if normalizedid not in container: return False parts = objid.rpartition("-") if parts[0] == normalizedid and parts[-1].isdigit(): return True return False
[docs]def create_title_from_id(obj, event): if not api.portal.get_registry_record( "ploneintranet.workspace.rename_after_title_changed", default=True ): return autosave_portal_types = api.portal.get_registry_record( "ploneintranet.workspace.autosave_portal_types", default=[] ) if obj.portal_type in autosave_portal_types: # autosave does not work well with autorename because the form action # will point to the old URL and the redirection tool will not work # with the ajax calls return if IContainerModifiedEvent.providedBy(event): # The container modified event gets triggered during the creation # of a folder. We must not change the id before an item as been # properly created. return if IBaseWorkspaceFolder.providedBy(obj): # Don't change the ID of a workspace return if not in_workspace(obj): # Don't handle content outside of a workspace return orig_id = obj.getId() title_adapter = INameFromTitle(obj, None) name = title_adapter and title_adapter.title if not name: # No title present, no point in changing the id return if len(name) > MAX_ID_FROM_TITLE_LENGTH: plone_view = getMultiAdapter((obj, obj.REQUEST), name="plone") name = plone_view.cropText(name, MAX_ID_FROM_TITLE_LENGTH, ellipsis="") normalized_name = IUserPreferredURLNormalizer(obj.REQUEST).normalize(name) container = aq_parent(obj) # Check if the id is already looking good if _is_id_good_enough(orig_id, normalized_name, container): return chooser = INameChooser(container) new_id = chooser and chooser.chooseName(normalized_name, container) if new_id and new_id != orig_id: unlock_view = obj.restrictedTraverse("@@toggle-lock", None) if unlock_view and unlock_view.can_unlock(): unlock_view.unlock() api.content.rename(obj, new_id)
[docs]def update_workspace_modification_date(obj, event): if IStatusUpdate.providedBy(obj): workspace = obj.microblog_context else: workspace = parent_workspace(obj) if workspace: workspace.setModificationDate(datetime.now()) workspace.reindexObject(idxs=["modified"])
[docs]def _notify_submitted_maintainers(obj): recipients = [ user.email for user in obj.shared_calendar.to_object.get_maintainers() ] render_and_send_mail(obj, "mail-event-submitted-maintainers", recipients)
[docs]def _notify_submitted_user(obj): recipients = [pi_api.userprofile.get_current().email] render_and_send_mail(obj, "mail-event-submitted-user", recipients)
[docs]def update_event_workflow(obj, event): if any(["shared_calendar" in desc.attributes for desc in event.descriptions]): if api.content.get_state(obj=obj) != "published": api.content.transition(obj=obj, to_state="published") if obj.shared_calendar: api.content.transition( obj=obj, # can't use to_state='pending' because of the automatic # transition transition="submit", ) _notify_submitted_maintainers(obj) _notify_submitted_user(obj)