# coding=utf-8
from Acquisition import aq_inner
from Acquisition import Implicit
from borg.localrole.default_adapter import DefaultLocalRoleAdapter
from borg.localrole.interfaces import ILocalRoleProvider
from BTrees.OOBTree import OOBTree
from BTrees.OOBTree import OOTreeSet
from collections import OrderedDict
from collective.workspace.workspace import Workspace
from datetime import datetime
from OFS.owner import Owned
from plone import api
from plone.app.contenttypes.interfaces import IEvent
from plone.folder.ordered import OrderedBTreeFolderBase
from plone.indexer.wrapper import IndexableObjectWrapper
from plone.uuid.interfaces import IUUID
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.layout.memoize.view import memoize
from ploneintranet.layout.memoize.view import memoize_contextless
from ploneintranet.workspace.behaviors.group import IMembraneGroupMarker
from ploneintranet.workspace.config import SecretWorkspaceNotAllowed
from ploneintranet.workspace.interfaces import IBaseWorkspaceFolder
from ploneintranet.workspace.interfaces import IMetroMap
from ploneintranet.workspace.membership import PloneIntranetTeamMembership
from ploneintranet.workspace.utils import parent_workspace
from Products.CMFCore.Expression import getExprContext
from Products.CMFCore.interfaces import IContentish
from Products.CMFCore.interfaces import IFolderish
from Products.PortalTransforms.utils import safe_nativestring
from zope.component import adapter
from zope.component import getMultiAdapter
from zope.globalrequest import getRequest
from zope.interface import implementer
import logging
import persistent
logger = logging.getLogger(__name__)
AVAILABLE_GROUPS = {
u"Admins": ("Contributor", "Editor", "Reviewer", "Reader", "TeamManager"),
u"Members": ("TeamMember",),
u"Guests": ("TeamGuest",),
# These are the 'participation policy' groups
u"Consumers": (),
u"Producers": ("Contributor",),
u"Publishers": ("Contributor", "SelfPublisher"),
u"Moderators": ("Reader", "Contributor", "Reviewer", "Editor"),
}
[docs]class PloneIntranetWorkspace(Workspace):
"""
A custom workspace behaviour, based on collective.workspace
Here we define our own available groups, and the roles
they are given on the workspace.
"""
membership_factory = PloneIntranetTeamMembership
# A list of groups to which team members can be assigned.
# Maps group name -> roles
available_groups = AVAILABLE_GROUPS
[docs] def add_to_team(self, userid, **kw):
"""
Add user/group to this workspace
We override this method from collective.workspace
to add our additional participation
policy groups, as detailed in available_groups above.
Also used to update an existing members' groups.
"""
data = kw.copy()
groups = data.get("groups") or []
if "Members" in groups:
# Members is an automatic group - ignore
groups.remove("Members")
if not groups:
# Put user in the default policy group if none provided
default_group = self.context.participant_policy.title()
data["groups"] = set([default_group])
group = api.group.get(groupname=userid)
# check: are we adding a workspace-group?
# And if so, is it a group we cannot see?
if group:
ws = api.portal.get().workspaces.get(group.getId())
if (
ws
and IMembraneGroupMarker.providedBy(ws)
and not ws.visible_to_intranet_users
):
raise SecretWorkspaceNotAllowed(
u"Forbidden: A workspace cannot be added as member "
u"if it is not visible to all the intranet users. "
)
return super(PloneIntranetWorkspace, self).add_to_team(userid, **data)
[docs] def group_for_policy(self, policy=None):
"""
Lookup the collective.workspace usergroup corresponding to the
given policy
:param policy: The value of the policy to lookup, defaults to the
current policy
:type policy: str
"""
if policy is None:
policy = self.context.participant_policy
return "%s:%s" % (policy.title(), self.context.UID())
[docs] def update_participant_policy_groups(self, old_policy, new_policy):
"""Move relevant members to a new default policy
We only move members who were previously part of the *old* policy.
This allows for 'exception' users who have been promoted/demoted
manually to retain their existing roles.
"""
members = self.members
old_group = old_policy.title()
new_group = new_policy.title()
for userid in members:
groups = self.get(userid).groups
if old_group not in groups:
# This user was an exception to the default policy
# so we ignore them
continue
groups.remove(old_group)
groups.add(new_group)
self.add_to_team(userid, groups=groups)
user = api.user.get_current()
logger.info(
"%s changed policy on %s from %s to %s for %s members",
user.getId(),
repr(self.context),
old_policy.title(),
new_policy.title(),
len(members),
)
[docs]@implementer(ILocalRoleProvider)
@adapter(IBaseWorkspaceFolder)
class RemoteAdminLocalRoleAdapter(object):
"""
Treat the remote_admin user as a local admin
"""
def __init__(self, context):
self.context = context
[docs] @memoize
def getRoles(self, principal_id):
remote_admin = self.context.remote_admin
if not remote_admin:
return []
if principal_id != remote_admin:
return []
return ["TeamManager"]
[docs] def getAllRoles(self):
if not self.context.remote_admin:
return []
return [(safe_nativestring(self.context.remote_admin), ["TeamManager"])]
[docs]@implementer(ILocalRoleProvider)
@adapter(IContentish)
class WorkspaceLocalRoleAdapter(DefaultLocalRoleAdapter):
"""
If the user has the local role of Owner on the context
and the acquired role of SelfPublisher; they should also be given Reviewer.
"""
def __init__(self, context):
self.context = context
self.request = getRequest()
@memoize_contextless
def get_workspace_roles(self, workspace):
return api.user.get_roles(obj=workspace)
[docs] @memoize
def is_good_context(self):
workspace = parent_workspace(self.context)
if workspace is None or workspace == self.context:
return False
return True
[docs] @memoize
def _getRoles(self, principal_id):
"""
Give an Owner who is also a 'selfpublisher', the reviewer role
This works on every content inside a workspace
but not on the workspace itself
"""
if not self.is_good_context():
return []
# Check inside __ac_local_roles__
current_roles = DefaultLocalRoleAdapter.getRoles(self, principal_id)
# ignore if we are not Owner
if "Owner" not in current_roles:
return []
if "SelfPublisher" not in self.get_workspace_roles(
parent_workspace(self.context)
):
return []
return ["Reviewer"]
[docs] def getRoles(self, principal_id):
if not self.request:
return []
return self._getRoles(principal_id)
[docs]@implementer(ILocalRoleProvider)
@adapter(IEvent)
class EventReaderGroupLocalRoles(object):
_granted_roles = ["Reader"]
def __init__(self, context):
self.context = context
self.calendar = None
rel = getattr(self.context, "shared_calendar", None)
if rel:
self.calendar = rel.to_object
[docs] def getRoles(self, principalid):
""" Return special roles if principalid is the reader_group
"""
if principalid != self.reader_group:
return []
return self._granted_roles
[docs] def getAllRoles(self):
""" Grant special roles to the reader group
"""
reader_group = self.reader_group
if not reader_group:
return []
return [(reader_group, self._granted_roles)]
@property
def reader_group(self):
return getattr(self.calendar, "reader_group", None)
[docs]@implementer(ILocalRoleProvider)
@adapter(IEvent)
class EventMaintainerGroupLocalRoles(object):
_granted_roles = ["Reader"]
def __init__(self, context):
self.context = context
self.calendar = None
rel = getattr(self.context, "shared_calendar", None)
if rel:
self.calendar = rel.to_object
[docs] def getRoles(self, principalid):
""" Return special roles if principalid is the maintainer_group
"""
if principalid != self.maintainer_group:
return []
return self._granted_roles
[docs] def getAllRoles(self):
""" Grant special roles to the maintainer group
"""
maintainer_group = self.maintainer_group
if not maintainer_group:
return []
return [(maintainer_group, self._granted_roles)]
@property
def maintainer_group(self):
return getattr(self.calendar, "maintainer_group", None)
[docs]@implementer(ILocalRoleProvider)
@adapter(IEvent)
class EventCalendarOwnerLocalRoles(object):
_granted_roles = ["Reader"]
def __init__(self, context):
self.context = context
self.calendar = None
rel = getattr(self.context, "shared_calendar", None)
if rel:
self.calendar = rel.to_object
[docs] def getRoles(self, principalid):
""" Return special roles if principalid is the maintainer_group
"""
if principalid != self.owner:
return []
return self._granted_roles
[docs] def getAllRoles(self):
""" Grant special roles to the maintainer group
"""
owner = self.owner
if not owner:
return []
return [(owner, self._granted_roles)]
@property
def owner(self):
if self.calendar:
owner = self.calendar.getOwner()
if owner:
return owner.getId()
[docs]@implementer(IMetroMap)
@adapter(IFolderish)
class MetroMap(object):
def __init__(self, context):
self.context = context
@property
def _metromap_workflow(self):
"""All Case Workspaces should have a placeful workflow. In order to
render the metromap, this workflow needs to have a metromap_transitions
variable which determines the order of the milestones (states) and the
transitions between them.
Return the workflow required to render the metromap.
"""
pw = api.portal.get_tool("portal_workflow")
workflows = pw.getWorkflowsFor(self.context)
for workflow in workflows:
if workflow.variables.get("metromap_transitions", False):
return workflow
@property
def _metromap_transitions(self):
"""A data structure is stored as a TAL expression on a workflow which
determines the sequence of workflow states/milestones used to render
the metromap.
We need to evaluate the expression and returns the data structure.
It consists of a list of dicts each with the workflow state, the
transition to the next milestone in the metromap, and the
transition required to return to the milestone:
[{
'state': 'new',
'next_transition': 'finalise',
'reopen_transition': 'reset'
}, {
'state': 'complete',
'next_transition': 'archive',
'reopen_transition': 'finalise'
}, {
'state': 'archived'}
]
"""
metromap_workflow = self._metromap_workflow
if metromap_workflow is None:
return []
wfstep = metromap_workflow.variables["metromap_transitions"]
tal_expr = wfstep.default_expr
expr_context = getExprContext(self.context)
metromap_transitions = tal_expr(expr_context)
return metromap_transitions
@property
def metromap_sequence(self):
"""Return the data structure required for displaying the metromap,
derived from the configuration in the metromap_transitions variable of
the associated workflow.
An OrderedDict is used to provide details such as whether a milestone
has already been finished, the transition required to close the current
milestone, and the transition required to reopen the previous
milestone.
In the 'complete' workflow state / milestone it returns the following:
OrderedDict([(
'new', {
'transition_title': u'Transfer To Department',
'title': u'New',
'finished': True, # This milestone has been finished
'is_current': False, # Not the current milestone
'reopen_transition': 'reset', # For [Reopen milestone]
'transition_id': 'transfer_to_department'
}), (
'complete', {
'transition_title': u'Submit',
'title': u'Content Complete',
'finished': False, # This milestone isn't finished yet
'is_current': True, # Current milestone: Show [Close milestone]
'reopen_transition': False,
'transition_id': 'submit'
}), (
'archived', {
'transition_title': '',
'title': u'Archived',
'is_current': False,
'finished': False,
'reopen_transition': False,
'transition_id': None
})
])
"""
cwf = self._metromap_workflow
wft = api.portal.get_tool("portal_workflow")
metromap_list = self._metromap_transitions
if not metromap_list:
return {}
try:
can_manage = api.user.has_permission(
"ploneintranet.workspace: Manage workspace",
user=api.user.get_current(),
obj=self.context,
)
except api.exc.UserNotFoundError:
raise api.exc.UserNotFoundError(
"Unknown user. Do not use Zope rescue user."
)
# If the case is frozen, render the MetroMap for the pre-frozen state
freeze_view = getMultiAdapter(
(self.context, self.context.REQUEST), name="freeze-view"
)
if freeze_view.is_frozen():
unfreeze_view = getMultiAdapter(
(self.context, self.context.REQUEST), name="unfreeze-view"
)
current_state = unfreeze_view.pre_frozen_state
else:
current_state = wft.getInfoFor(self.context, "review_state")
finished = True
sequence = OrderedDict()
tasks = api.content.get_view("view", self.context, self.context.REQUEST).tasks()
for index, wfstep in enumerate(metromap_list):
state = wfstep["state"]
if state == current_state:
is_current = True
finished = False # keep this for the rest of the loop
open_tasks = [x for x in tasks[state] if not x["checked"]]
else:
is_current = False
open_tasks = [] # we don't care so performance optimize
# last workflow step: consider done if no open tasks left
if state == current_state and index > len(metromap_list) and not open_tasks:
finished = True
# get the id and title of the next transition, for display on the
# metromap
next_transition_id = metromap_list[index].get("next_transition")
if next_transition_id:
transition_title = _(cwf.transitions.get(next_transition_id).title)
else:
transition_title = ""
# only current state can be closed, archived state cannot be closed
if (
state == current_state
and can_manage
and not open_tasks
and next_transition_id
):
next_transition_enabled = True
else:
next_transition_enabled = False
# reopen only the before-current step, only for admins
reopen_transition = None
try:
next_state = metromap_list[index + 1]["state"]
# if this step precedes the current state, it can be reopened
if next_state == current_state and can_manage:
reopen_transition = wfstep.get("reopen_transition", None)
except IndexError:
# last step, no next
pass
sequence[state] = {
"title": _(cwf.states.get(state).title),
"transition_enabled": next_transition_enabled,
"transition_id": next_transition_id,
"transition_title": transition_title,
"reopen_transition": reopen_transition,
"is_current": is_current,
"finished": finished,
}
return sequence
[docs]class GroupingStorageValues(Implicit, Owned, persistent.Persistent):
""" A datastructure to store the UIDs of objects appearing under a specific
grouping.
It conforms to the requirements imposed by OrderedBTreeFolderBase on
its sub-objects (acquisition aware, ownable, persistent).
"""
def __init__(self, uids):
self.archived = False
self.uids = OOTreeSet()
self.uids.update(uids)
def __iter__(self):
return self.uids.__iter__()
def __contains__(self, item):
return item in self.uids
def __len__(self):
return len(self.uids)
[docs] def add(self, item):
self.uids.insert(item)
[docs] def discard(self, item):
self.uids.remove(item)
[docs] def remove(self, item):
self.uids.remove(item)
[docs] def pop(self):
return self.uids.pop()
[docs]class GroupingStorage(object):
""" Adapter that stores the sidebar's groupings for quick lookup.
The groupings dict is arranged in the following way:
OOBTree({
'label': {
'Important': set([uid, uid, uid]),
'Frivolous': set([uid, uid]),
}
'author: {
'max-mustermann': set([uid]),
'john-doe': set([uid, uid]),
'jane-doe': set([uid]),
}
'type': {
'foo': set([uid]),
'bar': set([uid, uid, uid]),
'baz': set([uid]),
}
'first_letter': {
'a': set([uid]),
'b': set([uid, uid, uid]),
'c': set([uid]),
}
})
The top-level keys are the groupings.
For each grouping we store another dict.
Each key in this dict is a unique value for that grouping. These values
are retrieved from the objects stored in the workspace.
For each key we have a list of uids. These are the uids of the objects
that have that key as a field value (corresponding to the grouping).
Remember: each grouping is a field on an object, so each value of that
grouping is a value of that field on an object inside the workspace.
We need to keep track of the uids, so that we know when to remove a
grouping-value. When an object is modified, we don't know if anything
was removed (for example from the 'Subject' field, which corresponds to
'label' grouping).
So we have to check every time if that object's uid is in any grouping
values that that object doesn't have anymore. In that way, we know to
remove the uid from that grouping-value. If that grouping-value doesn't
have any uids anymore, we can remove it.
"""
def __init__(self, context):
self.context = context
context = aq_inner(self.context)
if not hasattr(context, "_groupings"):
self.init_groupings()
[docs] def clear_groupings(self):
self.init_groupings()
[docs] def init_groupings(self):
context = aq_inner(self.context)
context._groupings = OOBTree(
{
"label": OrderedBTreeFolderBase(),
"author": OrderedBTreeFolderBase(),
"type": OrderedBTreeFolderBase(),
"first_letter": OrderedBTreeFolderBase(),
}
)
[docs] def _add_grouping_values(self, grouping, values, obj):
""" Add $uid to the list of uids stored under the grouping values
(provided by $values) in the groupings datastructure.
If the list doesn't exist yet, add it.
"""
uid = IUUID(obj)
groupings = self.get_groupings()
if grouping not in groupings:
logger.info("Adding grouping %s to object %r", grouping, obj)
groupings[grouping] = OrderedBTreeFolderBase()
for value in values:
if value in groupings[grouping]:
groupings[grouping][value].add(uid)
else:
groupings[grouping][value] = GroupingStorageValues([uid])
[docs] def _remove_grouping_values(self, grouping, values, obj):
""" Remove $uid from the list of uids stored under the grouping values
(provided by $values) in the groupings datastructure.
If $uid is the only one in the list, then remove the
list (and its key) entirely.
"""
uid = IUUID(obj)
groupings = self.get_groupings()
if grouping not in groupings:
return
for value in groupings[grouping].keys():
if value not in values and uid in groupings[grouping][value]:
if len(groupings[grouping][value]) == 1:
del groupings[grouping][value]
else:
groupings[grouping][value].remove(uid)
[docs] def _remove_grouping_value(self, grouping, value):
"""
Remove entry $value under a given $grouping.
Can be used for bulk-changes to a grouping (e.g. changing a tag)
"""
groupings = self.get_groupings()
if value in groupings[grouping]:
del groupings[grouping][value]
[docs] def get_groupings(self):
context = aq_inner(self.context)
return context._groupings
[docs] def update_groupings(self, obj):
"""
Update the groupings dict with the values stored on obj.
"""
context = aq_inner(self.context)
if parent_workspace(obj) == obj:
# obj is the workspace, abort
return
catalog = api.portal.get_tool("portal_catalog")
groupings = context._groupings
# label
self._remove_grouping_values("label", obj.Subject(), obj)
self._add_grouping_values("label", obj.Subject(), obj)
# mimetype
wrapper = IndexableObjectWrapper(obj, catalog)
if hasattr(wrapper, "mimetype"):
mimetype = wrapper.mimetype
self._remove_grouping_values("type", [mimetype], obj)
self._add_grouping_values("type", [mimetype], obj)
# author
self._add_grouping_values("author", [obj.Creator()], obj)
self._remove_grouping_values("author", [obj.Creator()], obj)
# Title / first letter
title_or_id = obj.Title() and obj.Title() or obj.id
first_letter = title_or_id[0].lower()
self._add_grouping_values("first_letter", [first_letter], obj)
self._remove_grouping_values("first_letter", [first_letter], obj)
context._groupings = groupings
context._groupings_modified = datetime.now()
context._p_changed = 1
[docs] def reset_order(self):
"""
Reset the order for all groupings to default, i.e. same order
as the keys of the OOBTree
"""
groupings = self.get_groupings()
for grouping in groupings.keys():
self.set_order_for(
grouping, sorted([k for k in groupings[grouping].keys()])
)
[docs] def get_order_for(self, grouping, include_archived=False, alphabetical=False):
"""
Return the keys of the given grouping in order.
"""
groupings = self.get_groupings()
grouping_obj = groupings.get(grouping)
if not grouping_obj:
return []
if alphabetical:
order = sorted(grouping_obj.keys())
else:
order = grouping_obj.getOrdering().idsInOrder()
if include_archived:
return [
dict(
title=g, description="", id=g, archived=grouping_obj.get(g).archived
)
for g in order
]
return order
return [
dict(title=g, description="", id=g, archived=False)
for g in order
if not grouping_obj.get(g).archived
]
[docs] def set_order_for(self, grouping, order):
""" Set order for a given grouping"""
grouping = self.get_groupings()[grouping]
for uid in order:
grouping.moveObjectToPosition(uid, order.index(uid), suppress_events=True)
[docs] def remove_from_groupings(self, obj):
""" Remove obj's grouping relevant information to its workspace.
"""
self._remove_grouping_values("type", [], obj)
self._remove_grouping_values("label", [], obj)
self._remove_grouping_values("author", [], obj)
self._remove_grouping_values("first_letter", [], obj)
context = aq_inner(self.context)
groupings = context._groupings
context._groupings = groupings
context._groupings_modified = datetime.now()
# context._p_changed = 1
[docs] def modified(self):
""" Return the last time this grouping storage was modified.
"""
context = aq_inner(self.context)
if hasattr(context, "_groupings_modified"):
if type(context._groupings_modified) == datetime:
return context._groupings_modified
return datetime.min