Source code for ploneintranet.workspace.browser.workspace

# coding=utf-8
from Acquisition import aq_inner
from collections import defaultdict
from collective.workspace.interfaces import IWorkspace
from datetime import date
from json import dumps
from plone import api
from plone.app.blocks.interfaces import IBlocksTransformEnabled
from ploneintranet.layout.memoize.view import memoize
from ploneintranet.layout.memoize.view import memoize_contextless
from ploneintranet import api as pi_api
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.layout.interfaces import IDiazoAppTemplate
from ploneintranet.search.interfaces import ISiteSearch
from ploneintranet.todo.behaviors import ITodo
from ploneintranet.workspace.behaviors.group import IMembraneGroupMarker
from ploneintranet.workspace.behaviors.group import MembraneWorkspaceGroup
from ploneintranet.workspace.interfaces import IBaseWorkspaceFolder
from ploneintranet.workspace.interfaces import IGroupingStorage
from ploneintranet.workspace.interfaces import IWorkspaceFolder
from ploneintranet.workspace.policies import PARTICIPANT_POLICY
from ploneintranet.workspace.utils import OpenEndedGraph
from ploneintranet.workspace.utils import parent_workspace
from Products.PortalTransforms.utils import safe_nativestring
from Products.CMFPlone.utils import safe_unicode
from Products.Five import BrowserView
from Products.membrane.interfaces import group as group_ifaces
from Products.PlonePAS.interfaces.group import IGroupData
from six.moves.urllib.parse import urlencode
from zope.component import getAdapter
from zope.component import getUtility
from zope.component.interfaces import ComponentLookupError
from zope.interface import implementer

import six


[docs]@implementer(IDiazoAppTemplate) class BaseWorkspaceView(BrowserView): """ Base view class for workspace related view """
[docs] def get_workspace_security_icon(self): """ We have an icon that display the security status of the workspace """ if self.context.accessible_to_intranet_users: return "icon-lock-open" if self.context.visible_to_intranet_users: return "icon-lock" return "icon-user-secret"
@property @memoize_contextless def is_ajax(self): """ Check if we have an ajax call """ requested_with = self.request.environ.get("HTTP_X_REQUESTED_WITH") return requested_with == "XMLHttpRequest" @property @memoize_contextless def include_clicktracker(self): """Is inclusion of slcclicktracker element enabled in the registry?""" if self.request.form.get("disable_clicktracker"): return False include_slcclicktracker = api.portal.get_registry_record( "ploneintranet.workspace.include_slcclicktracker", default=False ) return include_slcclicktracker @property @memoize_contextless def show_sidebar(self): """ Should we show the sidebar? """ form = self.request.form if "show_sidebar" in form: return True if "hide_sidebar" in form: return False return True
[docs] @memoize def workspace(self): """Acquire the root workspace of the current context""" return parent_workspace(self.context)
[docs] @memoize def can_manage_workspace(self): """ does this user have permission to manage the workspace """ return api.user.has_permission( "ploneintranet.workspace: Manage workspace", obj=self.context )
[docs] @memoize def can_manage_roster(self): """ does this user have permission to manage the workspace's roster """ return self.can_manage_workspace()
[docs] @memoize def can_add_users(self): """ Does this user have permission to add other members to this workspace? 1. A user who can edit the roster will always have this permission. This is usually the workspace admin. 2. On a team-managed workspace, any member of the team will have this permission. """ if self.can_manage_roster(): return True return self.context.join_policy in {"team"}
[docs] @memoize def can_add_status_updates(self): """ Does this user have the permission to add status updates """ return api.user.has_permission( "Plone Social: Add Microblog Status Update", obj=self.context )
@property @memoize def groupids_key_mapping(self): """ Return the set of the group ids knows by portal_groups """ if not self.groups_container: return {} return { safe_nativestring(group.getGroupId()): key for key, group in self.groups_container.objectItems() } @property @memoize def workspaces_container_keys(self): """ Return the set of workspace ids known by the toplevel workspaces container """ return self.workspaces_container.keys()
[docs] @memoize def get_principal_title(self, principal): """ Get the title for this principal """ if isinstance(principal, six.string_types): principalid = principal principal = self.resolve_principalid(principal) if principal is None: return principalid if IGroupData.providedBy(principal) or isinstance( principal, MembraneWorkspaceGroup ): return principal.getGroupName() if hasattr(principal, "getGroupId"): return principal.Title() or principal.getGroupId() if IWorkspaceFolder.providedBy(principal): return principal.Title() return ( getattr(principal, "fullname", "") or principal.getProperty("fullname") or principal.getId() )
[docs] def get_principal_url(self, principal): """ Get a suitable URL for viewing this principal """ if isinstance(principal, six.string_types): principal = self.resolve_principalid(principal) if IGroupData.providedBy(principal) or isinstance( principal, MembraneWorkspaceGroup ): portal = api.portal.get() p_url = portal.absolute_url() return "{0}/workspace-group-view?id={1}".format(p_url, principal.getId()) return principal.absolute_url()
[docs] @memoize def get_principal_description(self, principal): """ Get the description for this principal """ if isinstance(principal, six.string_types): principal = self.resolve_principalid(principal) if not hasattr(principal, "getGroupId"): return "" group_memberids = set(principal.getGroupMembers()) group_groupids = group_memberids.intersection(self.groupids_key_mapping) group_spaceids = group_memberids.intersection(self.workspaces_container_keys) no_groups = len(group_groupids) + len(group_spaceids) no_users = len(group_memberids) - no_groups return _( u"number_of_members", default=u"${no_users} Users | ${no_groups} Groups", mapping={u"no_users": no_users, u"no_groups": no_groups}, )
[docs] @memoize def get_principal_roles(self, principal, omit_default_policy_role=True): """ Get the human readable policy role title for this principal as a list of one element (e.g. [u'Consume']). If `omit_default_policy_role` is True (the default value), the default role for the policy is skipped and an empty list is returned. """ adapter = IWorkspace(self.context) if hasattr(principal, "getGroupId"): groups = adapter.get(safe_nativestring(principal.getGroupId())).groups else: groups = adapter.get(principal.getId()).groups if "Admins" in groups: return ["Admin"] # The policies are ordered from the less permissive to the most # permissive. We reverse them # Additionally we might want to exclude the default one good_policies = ( policy for policy in reversed(PARTICIPANT_POLICY) if not ( omit_default_policy_role and policy == self.context.participant_policy ) ) for policy in good_policies: if policy.title() in groups: # According to the design there is at most one extra role # per user, so we go with the first one we find. This may # not be enforced in the backend though. return [PARTICIPANT_POLICY[policy]["title"]] if groups == {"Guests"}: return ["Guest"] return []
[docs] def principal_sorting_key(self, principal): """ First we want the groups, the we want alphabetical sorting """ is_group = hasattr(principal, "getGroupId") return (not is_group, self.get_principal_title(principal))
@property @memoize def groups_container(self): """ Returns the group container (if found) or an empty dictionary """ portal = api.portal.get() return portal.get("groups", {}) @property @memoize def workspaces_container(self): """ Returns the workspaces container (if found) or an empty dictionary """ portal = api.portal.get() return portal.get("workspaces", {}) @property @memoize def users_container(self): """ Returns the group container (if found) or an empty dictionary """ portal = api.portal.get() return portal.get("profiles", {})
[docs] @memoize def resolve_principalid(self, principalid): """ Given a principal id, tries to get him for profile or groups folder and then look for him with pas. """ principal = ( self.users_container.get(principalid) or ( self.workspaces_container.get(principalid) if IMembraneGroupMarker.providedBy( self.workspaces_container.get(principalid) ) else None ) or self.groups_container.get(self.groupids_key_mapping.get(principalid)) or api.user.get(principalid) or api.group.get(principalid) ) if IMembraneGroupMarker.providedBy(principal): principal = MembraneWorkspaceGroup(principal) return principal
@property @memoize def guest_ids(self): """ Get the valid member ids through IWorkspace """ adapter = IWorkspace(self.context) return [ principalid for principalid in self.member_ids if adapter.get(principalid).groups == {"Guests"} ] @property @memoize def member_ids(self): """ Get the valid member ids through IWorkspace """ principalids = IWorkspace(self.context).members return filter(self.resolve_principalid, principalids) @property @memoize def principals(self): """ Return the list of principals which are assigned to this context """ objs = map(self.resolve_principalid, self.member_ids) return objs
[docs] def get_sorted_principals(self, sort_by="last_name"): objs = self.principals return sorted(objs, key=lambda user: getattr(user, sort_by, None))
[docs] @memoize def guests(self): """ Return the list of principals which are guests in this context By design they are assigned through the sharing view """ objs = map(self.resolve_principalid, self.guest_ids) return objs
@property @memoize def task_brains(self): return api.content.find( context=self.context, portal_type="todo", sort_on=["due", "getObjPositionInParent"], ) @property @memoize def task_objs(self): return list(filter(None, (b.getObject() for b in self.task_brains)))
[docs] @memoize def tasks(self): """ Get the context tasks """ is_case = self.context.is_case items = defaultdict(list) if is_case else [] wft = api.portal.get_tool("portal_workflow") today = date.today() for obj in self.task_objs: todo = ITodo(obj) done = wft.getInfoFor(todo, "review_state") == u"done" overdue = False if not done and todo.due: overdue = todo.due < today data = { "id": obj.UID(), "title": obj.Title(), "description": obj.Description(), "url": obj.absolute_url(), "checked": done, "due": todo.due, "overdue": overdue, "obj": obj, "can_edit": api.user.has_permission("Modify portal content", obj=obj), } if is_case: milestone = "unassigned" if obj.milestone not in ["", None]: milestone = obj.milestone items[milestone].append(data) else: items.append(data) if is_case: for milestone in items.keys(): # Show the checked tasks before the unchecked tasks items[milestone].sort(key=lambda x: x["checked"] is False) return items
[docs] @memoize def percent_complete(self): """ Return the percentage of completed tasks as text, e.g. 33% """ objs = self.task_objs if not objs: return "" total = 0.0 complete = 0.0 for obj in objs: total += 1 if api.content.get_state(obj) == u"done": complete += 1 return "{0}%".format(int((complete / total) * 100))
[docs] def subspaces_qs(self): """ Prepare the right query string to show subspaces """ form = self.request.form form["in_superspace"] = self.context.UID() form["superspace"] = self.context.getId() for attr in ("title", "description"): form[attr] = safe_unicode(form.get(attr, "")).encode("utf-8") return urlencode(form)
@property @memoize def division_classes(self): """ if workspace has a division, return `workspace-division-<id>` if workspace is a division, additionally return `workspace-division-name-<id>` """ if getattr(self.workspace(), "is_division", False): return "workspace-division-{0} workspace-division-name-{0}".format( self.workspace().id ) if getattr(self.workspace(), "division", ""): brains = api.content.find(UID=self.workspace().division) if brains: return "workspace-division-" + brains[0].id return "" @property @memoize def superspace(self): """ Return the associated superspace object """ superspace = getattr(self.context, "superspace", None) if superspace: return superspace.to_object @property def show_activity_stream(self): return getattr(self.context, "show_activity_stream", True)
[docs]@implementer(IBlocksTransformEnabled) class WorkspaceView(BaseWorkspaceView): """ Default View of the workspace """
[docs]@implementer(IBlocksTransformEnabled) class SuperspaceView(BaseWorkspaceView): """ Default View of the superspace """ @property @memoize def container_view(self): return api.content.get_view( "workspaces.html", self.context.aq_parent, self.request ) @property @memoize def subspaces(self): search_util = getUtility(ISiteSearch) filters = { "object_provides": IWorkspaceFolder.__identifier__, "superspace": self.context.UID(), } return search_util.query(filters=filters) @property @memoize def allowed_parent_superspaces(self): """ Return the list of superspaces that can be selected as a parent superspace of the current superspace. Discard the current superspace itself and all its children. """ superspaces = self.container_view.superspaces edges = [] superspaces_by_uid = {} for superspace in superspaces: end = superspace.UID start = superspace.context.get("superspace") or None superspaces_by_uid[end] = superspace edges.append([start, end]) graph = OpenEndedGraph(set(superspaces_by_uid), edges) uid = self.context.UID() blacklisted_uids = graph.get_descendants(uid) blacklisted_uids.add(uid) return [ superspace for superspace in superspaces if superspace.UID not in blacklisted_uids ]
[docs] @memoize def has_subspaces(self): """ Check if this object has subspace, even hidden ones """ return bool(self.subspaces)
[docs] def do_post(self): """ Do something when the user is posting """ sidebar = api.content.get_view( "sidebar.settings.general", self.context, self.request ) sidebar._basic_save()
def __call__(self): """ Superspaces can be modified directly from their view """ if self.request.method == "POST": self.do_post() return super(SuperspaceView, self).__call__()
[docs]def format_users_json(users): """ Format a list of users as JSON for use with pat-autosuggest :param list users: A list of user brains :rtype string: JSON {"user_id1": "user_title1", ...} """ formatted_users = [] for user in users: fullname = safe_unicode(user.Title) email = safe_unicode(user.email) uid = user.getId formatted_users.append( {"id": uid, "text": u"{0} <{1}>".format(fullname, email)} ) return dumps(formatted_users)
[docs]class WorkspaceMembersJSONView(BrowserView): """ Return workspace members in JSON for use with pat-autosuggest. Only members of the current workspace are found. """ def __call__(self): q = safe_unicode(self.request.get("q", "").strip()) if not q: return format_users_json([]) query = {"SearchableText": u"{0}*".format(q)} users = pi_api.userprofile.get_users( context=self.context, full_objects=False, **query ) return format_users_json(users)
[docs]class AllUsersJSONView(BrowserView): """ Return a filtered list of users for pat-autosuggest. Any user can be found, not only members of the current workspace """ def __call__(self): q = safe_unicode(self.request.get("q", "").strip()) if not q: return format_users_json([]) query = {"SearchableText": u"{0}*".format(q)} users = pi_api.userprofile.get_user_suggestions( context=self.context, full_objects=False, **query ) return format_users_json(users)
[docs]class BrainToGroupAdapter(object): """ Make a brain behave like a group """ def __init__(self, context): self.context = context
[docs] def getId(self): """ Return this group id """ return self.context.getGroupId
[docs] def getProperty(self, key, fallback=""): """ Mimic getProperty, remapping some keys """ if key == "state": key = "review_state" elif key == "title": key = "Title" return getattr(self.context, key, fallback)
[docs]class AllGroupsJSONView(BrowserView): """ Return all groups in JSON for use in picker TODO: consolidate AllGroupsJSONView with AllUsersJSONView """ @property @memoize_contextless def only_membrane_groups(self): return api.portal.get_registry_record( "ploneintranet.userprofile.only_membrane_groups", default=False )
[docs] def get_groups(self): """ Return all the groups """ if self.only_membrane_groups: mt = api.portal.get_tool(name="membrane_tool") purl = api.portal.get_tool(name="portal_url") groups = map( BrainToGroupAdapter, mt.unrestrictedSearchResults( object_implements=(group_ifaces.IGroup.__identifier__), path=purl.getPortalPath(), ), ) else: groups = api.group.get_groups() return groups
def __call__(self): q = safe_unicode(self.request.get("q", "").lower()) groups = self.get_groups() group_details = [] ws = IWorkspace(self.context) workspaces = api.portal.get().workspaces for group in groups: groupid = group.getId() # XXX Filter out groups representing workspace roles. Review # whether we need/want this and/or can do it more efficiently. if groupid.partition(":")[0] in ws.available_groups: continue # Don't add the group that represents the current workspace to # itself. This can only happen if the IMembraneGroup behaviour is # active on current context. if groupid == self.context.id and IMembraneGroupMarker.providedBy( self.context ): if self.only_membrane_groups: # We are limited to membrane-only groups. The group # found _could_ be this workspace. We need to compare # the paths, which we can get cheaply from the catalog # brain. group_path = group.context.getPath() if "/".join(self.context.getPhysicalPath()) == group_path: continue else: # We used the generic groups search. We have no chance to # distinguish where a group is located. Therefore, we need # to ignore any group that has the same id as this # workspace, to be on the safe side. continue if group.getProperty("state") == "secret": wsobj = workspaces.get(group.getId()) if not (wsobj and wsobj.visible_to_intranet_users): continue title = safe_unicode(group.getProperty("title") or groupid) email = safe_unicode(group.getProperty("email")) if email: title = u"%s <%s>" % (title, email) description = safe_unicode(group.getProperty("description") or "") if q in title.lower() or q in description.lower(): group_details.append({"text": title, "id": groupid}) return dumps(group_details)
[docs]class AllUsersAndGroupsJSONView(BrowserView): def __call__(self): q = self.request.get("q", "").strip() if not q: return "" acl_users = api.portal.get_tool("acl_users") results = [] groups = {x["id"]: x["title"] for x in acl_users.searchGroups(id=q)} groups.update({x["id"]: x["title"] for x in acl_users.searchGroups(name=q)}) if groups: for id, title in groups.items(): text = title or id results.append({"id": id, "text": text}) query = {"SearchableText": u"{0}*".format(safe_unicode(q))} users = pi_api.userprofile.get_users(full_objects=False, **query) for user in users: fullname = user.Title email = user.email results.append( { "id": user.getId, "text": u"{0} <{1}>".format(safe_unicode(fullname), email), } ) return dumps(results)
[docs]class ReorderTags(BrowserView): """ Lets the workspace manager re-order the tags inside a workspace """ def __call__(self): context = aq_inner(self.context) try: gs = getAdapter(context, IGroupingStorage) except ComponentLookupError: return u"Could not get adapter for context: %s" % context.absolute_url() self.tags = [tag for tag in gs.get_order_for("label")] if self.request.get("batch-function") == "save": myorder = self.request.get("tags_order") or [] if "Untagged" in myorder: myorder.remove("Untagged") gs.set_order_for("label", myorder) return self.request.response.redirect( self.context.absolute_url() + "/@@sidebar.documents" ) else: return self.index()
[docs]def format_workspaces_json(workspaces, skip=[]): """ Format a list of workspaces as JSON for use with pat-autosuggest :param list workspaces: A list of brains :param lis skip: A list of UIDs to skip :rtype string: JSON {"ws_id1": "ws_title1", ...} """ formatted_ws = [] for ws in workspaces: uid = ws.UID if uid in skip: continue title = safe_unicode(ws["Title"]) formatted_ws.append({"id": uid, "text": title}) return dumps(formatted_ws)
[docs]class WorkspacesJSONView(BrowserView): """ Return a filtered list of workspaces for pat-autosuggest. Any workspace can be found. """ def __call__(self): q = safe_unicode(self.request.get("q", "").strip()) if not q: return "" query = {"phrase": u"{0}*".format(q), "filters": self.get_filters(), "step": 50} search_util = getUtility(ISiteSearch) workspaces = search_util.query(**query) workspaces = sorted(workspaces, key=lambda ws: ws["Title"]) skip = self.get_skip() return format_workspaces_json(workspaces, skip)
[docs] def get_filters(self): return {"object_provides": "collective.workspace.interfaces.IHasWorkspace"}
[docs] def get_skip(self): return []
[docs]class RelatedWorkspacesJSONView(WorkspacesJSONView): """ Return a filtered list of workspaces for pat-autosuggest. Current workspace and its related workspaces are excluded. """
[docs] def get_skip(self): skip = [] if IBaseWorkspaceFolder.providedBy(self.context): skip = getattr(self.context, "related_workspaces", []) or [] skip.append(self.context.UID()) return skip
[docs]@implementer(IBlocksTransformEnabled) class WorkspaceCalendarView(BaseWorkspaceView): """ Wrapper to include the fullcalendar tile on workspaces """