# coding=utf-8
from Acquisition import aq_inner
from collections import defaultdict
from collective.workspace.interfaces import IWorkspace
from datetime import date
from json import dumps
from plone import api
from plone.app.blocks.interfaces import IBlocksTransformEnabled
from ploneintranet.layout.memoize.view import memoize
from ploneintranet.layout.memoize.view import memoize_contextless
from ploneintranet import api as pi_api
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.layout.interfaces import IDiazoAppTemplate
from ploneintranet.search.interfaces import ISiteSearch
from ploneintranet.todo.behaviors import ITodo
from ploneintranet.workspace.behaviors.group import IMembraneGroupMarker
from ploneintranet.workspace.behaviors.group import MembraneWorkspaceGroup
from ploneintranet.workspace.interfaces import IBaseWorkspaceFolder
from ploneintranet.workspace.interfaces import IGroupingStorage
from ploneintranet.workspace.interfaces import IWorkspaceFolder
from ploneintranet.workspace.policies import PARTICIPANT_POLICY
from ploneintranet.workspace.utils import OpenEndedGraph
from ploneintranet.workspace.utils import parent_workspace
from Products.PortalTransforms.utils import safe_nativestring
from Products.CMFPlone.utils import safe_unicode
from Products.Five import BrowserView
from Products.membrane.interfaces import group as group_ifaces
from Products.PlonePAS.interfaces.group import IGroupData
from six.moves.urllib.parse import urlencode
from zope.component import getAdapter
from zope.component import getUtility
from zope.component.interfaces import ComponentLookupError
from zope.interface import implementer
import six
[docs]@implementer(IDiazoAppTemplate)
class BaseWorkspaceView(BrowserView):
"""
Base view class for workspace related view
"""
[docs] def get_workspace_security_icon(self):
""" We have an icon that display the security status of the workspace
"""
if self.context.accessible_to_intranet_users:
return "icon-lock-open"
if self.context.visible_to_intranet_users:
return "icon-lock"
return "icon-user-secret"
@property
@memoize_contextless
def is_ajax(self):
""" Check if we have an ajax call
"""
requested_with = self.request.environ.get("HTTP_X_REQUESTED_WITH")
return requested_with == "XMLHttpRequest"
@property
@memoize_contextless
def include_clicktracker(self):
"""Is inclusion of slcclicktracker element enabled in the registry?"""
if self.request.form.get("disable_clicktracker"):
return False
include_slcclicktracker = api.portal.get_registry_record(
"ploneintranet.workspace.include_slcclicktracker", default=False
)
return include_slcclicktracker
@property
@memoize_contextless
def show_sidebar(self):
""" Should we show the sidebar?
"""
form = self.request.form
if "show_sidebar" in form:
return True
if "hide_sidebar" in form:
return False
return True
[docs] @memoize
def workspace(self):
"""Acquire the root workspace of the current context"""
return parent_workspace(self.context)
[docs] @memoize
def can_manage_workspace(self):
"""
does this user have permission to manage the workspace
"""
return api.user.has_permission(
"ploneintranet.workspace: Manage workspace", obj=self.context
)
[docs] @memoize
def can_manage_roster(self):
"""
does this user have permission to manage the workspace's roster
"""
return self.can_manage_workspace()
[docs] @memoize
def can_add_users(self):
"""
Does this user have permission to add other members to this workspace?
1. A user who can edit the roster will always have this permission.
This is usually the workspace admin.
2. On a team-managed workspace, any member of the team will have this
permission.
"""
if self.can_manage_roster():
return True
return self.context.join_policy in {"team"}
[docs] @memoize
def can_add_status_updates(self):
"""
Does this user have the permission to add status updates
"""
return api.user.has_permission(
"Plone Social: Add Microblog Status Update", obj=self.context
)
@property
@memoize
def groupids_key_mapping(self):
""" Return the set of the group ids knows by portal_groups
"""
if not self.groups_container:
return {}
return {
safe_nativestring(group.getGroupId()): key
for key, group in self.groups_container.objectItems()
}
@property
@memoize
def workspaces_container_keys(self):
"""
Return the set of workspace ids known by the
toplevel workspaces container
"""
return self.workspaces_container.keys()
[docs] @memoize
def get_principal_title(self, principal):
""" Get the title for this principal
"""
if isinstance(principal, six.string_types):
principalid = principal
principal = self.resolve_principalid(principal)
if principal is None:
return principalid
if IGroupData.providedBy(principal) or isinstance(
principal, MembraneWorkspaceGroup
):
return principal.getGroupName()
if hasattr(principal, "getGroupId"):
return principal.Title() or principal.getGroupId()
if IWorkspaceFolder.providedBy(principal):
return principal.Title()
return (
getattr(principal, "fullname", "")
or principal.getProperty("fullname")
or principal.getId()
)
[docs] def get_principal_url(self, principal):
""" Get a suitable URL for viewing this principal
"""
if isinstance(principal, six.string_types):
principal = self.resolve_principalid(principal)
if IGroupData.providedBy(principal) or isinstance(
principal, MembraneWorkspaceGroup
):
portal = api.portal.get()
p_url = portal.absolute_url()
return "{0}/workspace-group-view?id={1}".format(p_url, principal.getId())
return principal.absolute_url()
[docs] @memoize
def get_principal_description(self, principal):
""" Get the description for this principal
"""
if isinstance(principal, six.string_types):
principal = self.resolve_principalid(principal)
if not hasattr(principal, "getGroupId"):
return ""
group_memberids = set(principal.getGroupMembers())
group_groupids = group_memberids.intersection(self.groupids_key_mapping)
group_spaceids = group_memberids.intersection(self.workspaces_container_keys)
no_groups = len(group_groupids) + len(group_spaceids)
no_users = len(group_memberids) - no_groups
return _(
u"number_of_members",
default=u"${no_users} Users | ${no_groups} Groups",
mapping={u"no_users": no_users, u"no_groups": no_groups},
)
[docs] @memoize
def get_principal_roles(self, principal, omit_default_policy_role=True):
""" Get the human readable policy role title for this principal
as a list of one element (e.g. [u'Consume']).
If `omit_default_policy_role` is True (the default value),
the default role for the policy is skipped and
an empty list is returned.
"""
adapter = IWorkspace(self.context)
if hasattr(principal, "getGroupId"):
groups = adapter.get(safe_nativestring(principal.getGroupId())).groups
else:
groups = adapter.get(principal.getId()).groups
if "Admins" in groups:
return ["Admin"]
# The policies are ordered from the less permissive to the most
# permissive. We reverse them
# Additionally we might want to exclude the default one
good_policies = (
policy
for policy in reversed(PARTICIPANT_POLICY)
if not (
omit_default_policy_role and policy == self.context.participant_policy
)
)
for policy in good_policies:
if policy.title() in groups:
# According to the design there is at most one extra role
# per user, so we go with the first one we find. This may
# not be enforced in the backend though.
return [PARTICIPANT_POLICY[policy]["title"]]
if groups == {"Guests"}:
return ["Guest"]
return []
[docs] def principal_sorting_key(self, principal):
""" First we want the groups, the we want alphabetical sorting
"""
is_group = hasattr(principal, "getGroupId")
return (not is_group, self.get_principal_title(principal))
@property
@memoize
def groups_container(self):
""" Returns the group container (if found) or an empty dictionary
"""
portal = api.portal.get()
return portal.get("groups", {})
@property
@memoize
def workspaces_container(self):
""" Returns the workspaces container (if found) or an empty dictionary
"""
portal = api.portal.get()
return portal.get("workspaces", {})
@property
@memoize
def users_container(self):
""" Returns the group container (if found) or an empty dictionary
"""
portal = api.portal.get()
return portal.get("profiles", {})
[docs] @memoize
def resolve_principalid(self, principalid):
""" Given a principal id, tries to get him for profile or groups folder
and then look for him with pas.
"""
principal = (
self.users_container.get(principalid)
or (
self.workspaces_container.get(principalid)
if IMembraneGroupMarker.providedBy(
self.workspaces_container.get(principalid)
)
else None
)
or self.groups_container.get(self.groupids_key_mapping.get(principalid))
or api.user.get(principalid)
or api.group.get(principalid)
)
if IMembraneGroupMarker.providedBy(principal):
principal = MembraneWorkspaceGroup(principal)
return principal
@property
@memoize
def guest_ids(self):
""" Get the valid member ids through IWorkspace
"""
adapter = IWorkspace(self.context)
return [
principalid
for principalid in self.member_ids
if adapter.get(principalid).groups == {"Guests"}
]
@property
@memoize
def member_ids(self):
""" Get the valid member ids through IWorkspace
"""
principalids = IWorkspace(self.context).members
return filter(self.resolve_principalid, principalids)
@property
@memoize
def principals(self):
""" Return the list of principals which are assigned to this context
"""
objs = map(self.resolve_principalid, self.member_ids)
return objs
[docs] def get_sorted_principals(self, sort_by="last_name"):
objs = self.principals
return sorted(objs, key=lambda user: getattr(user, sort_by, None))
[docs] @memoize
def guests(self):
""" Return the list of principals which are guests in this context
By design they are assigned through the sharing view
"""
objs = map(self.resolve_principalid, self.guest_ids)
return objs
@property
@memoize
def task_brains(self):
return api.content.find(
context=self.context,
portal_type="todo",
sort_on=["due", "getObjPositionInParent"],
)
@property
@memoize
def task_objs(self):
return list(filter(None, (b.getObject() for b in self.task_brains)))
[docs] @memoize
def tasks(self):
""" Get the context tasks
"""
is_case = self.context.is_case
items = defaultdict(list) if is_case else []
wft = api.portal.get_tool("portal_workflow")
today = date.today()
for obj in self.task_objs:
todo = ITodo(obj)
done = wft.getInfoFor(todo, "review_state") == u"done"
overdue = False
if not done and todo.due:
overdue = todo.due < today
data = {
"id": obj.UID(),
"title": obj.Title(),
"description": obj.Description(),
"url": obj.absolute_url(),
"checked": done,
"due": todo.due,
"overdue": overdue,
"obj": obj,
"can_edit": api.user.has_permission("Modify portal content", obj=obj),
}
if is_case:
milestone = "unassigned"
if obj.milestone not in ["", None]:
milestone = obj.milestone
items[milestone].append(data)
else:
items.append(data)
if is_case:
for milestone in items.keys():
# Show the checked tasks before the unchecked tasks
items[milestone].sort(key=lambda x: x["checked"] is False)
return items
[docs] @memoize
def percent_complete(self):
""" Return the percentage of completed tasks as text, e.g. 33%
"""
objs = self.task_objs
if not objs:
return ""
total = 0.0
complete = 0.0
for obj in objs:
total += 1
if api.content.get_state(obj) == u"done":
complete += 1
return "{0}%".format(int((complete / total) * 100))
[docs] def subspaces_qs(self):
""" Prepare the right query string to show subspaces
"""
form = self.request.form
form["in_superspace"] = self.context.UID()
form["superspace"] = self.context.getId()
for attr in ("title", "description"):
form[attr] = safe_unicode(form.get(attr, "")).encode("utf-8")
return urlencode(form)
@property
@memoize
def division_classes(self):
""" if workspace has a division, return `workspace-division-<id>`
if workspace is a division, additionally return
`workspace-division-name-<id>`
"""
if getattr(self.workspace(), "is_division", False):
return "workspace-division-{0} workspace-division-name-{0}".format(
self.workspace().id
)
if getattr(self.workspace(), "division", ""):
brains = api.content.find(UID=self.workspace().division)
if brains:
return "workspace-division-" + brains[0].id
return ""
@property
@memoize
def superspace(self):
""" Return the associated superspace object
"""
superspace = getattr(self.context, "superspace", None)
if superspace:
return superspace.to_object
@property
def show_activity_stream(self):
return getattr(self.context, "show_activity_stream", True)
[docs]@implementer(IBlocksTransformEnabled)
class WorkspaceView(BaseWorkspaceView):
"""
Default View of the workspace
"""
[docs]@implementer(IBlocksTransformEnabled)
class SuperspaceView(BaseWorkspaceView):
"""
Default View of the superspace
"""
@property
@memoize
def container_view(self):
return api.content.get_view(
"workspaces.html", self.context.aq_parent, self.request
)
@property
@memoize
def subspaces(self):
search_util = getUtility(ISiteSearch)
filters = {
"object_provides": IWorkspaceFolder.__identifier__,
"superspace": self.context.UID(),
}
return search_util.query(filters=filters)
@property
@memoize
def allowed_parent_superspaces(self):
""" Return the list of superspaces that can be selected
as a parent superspace of the current superspace.
Discard the current superspace itself and all its children.
"""
superspaces = self.container_view.superspaces
edges = []
superspaces_by_uid = {}
for superspace in superspaces:
end = superspace.UID
start = superspace.context.get("superspace") or None
superspaces_by_uid[end] = superspace
edges.append([start, end])
graph = OpenEndedGraph(set(superspaces_by_uid), edges)
uid = self.context.UID()
blacklisted_uids = graph.get_descendants(uid)
blacklisted_uids.add(uid)
return [
superspace
for superspace in superspaces
if superspace.UID not in blacklisted_uids
]
[docs] @memoize
def has_subspaces(self):
"""
Check if this object has subspace, even hidden ones
"""
return bool(self.subspaces)
[docs] def do_post(self):
""" Do something when the user is posting
"""
sidebar = api.content.get_view(
"sidebar.settings.general", self.context, self.request
)
sidebar._basic_save()
def __call__(self):
""" Superspaces can be modified directly from their view
"""
if self.request.method == "POST":
self.do_post()
return super(SuperspaceView, self).__call__()
[docs]class WorkspaceMembersJSONView(BrowserView):
"""
Return workspace members in JSON for use with pat-autosuggest.
Only members of the current workspace are found.
"""
def __call__(self):
q = safe_unicode(self.request.get("q", "").strip())
if not q:
return format_users_json([])
query = {"SearchableText": u"{0}*".format(q)}
users = pi_api.userprofile.get_users(
context=self.context, full_objects=False, **query
)
return format_users_json(users)
[docs]class AllUsersJSONView(BrowserView):
"""
Return a filtered list of users for pat-autosuggest.
Any user can be found, not only members of the current workspace
"""
def __call__(self):
q = safe_unicode(self.request.get("q", "").strip())
if not q:
return format_users_json([])
query = {"SearchableText": u"{0}*".format(q)}
users = pi_api.userprofile.get_user_suggestions(
context=self.context, full_objects=False, **query
)
return format_users_json(users)
[docs]class BrainToGroupAdapter(object):
""" Make a brain behave like a group
"""
def __init__(self, context):
self.context = context
[docs] def getId(self):
""" Return this group id
"""
return self.context.getGroupId
[docs] def getProperty(self, key, fallback=""):
""" Mimic getProperty, remapping some keys
"""
if key == "state":
key = "review_state"
elif key == "title":
key = "Title"
return getattr(self.context, key, fallback)
[docs]class AllGroupsJSONView(BrowserView):
"""
Return all groups in JSON for use in picker
TODO: consolidate AllGroupsJSONView with AllUsersJSONView
"""
@property
@memoize_contextless
def only_membrane_groups(self):
return api.portal.get_registry_record(
"ploneintranet.userprofile.only_membrane_groups", default=False
)
[docs] def get_groups(self):
""" Return all the groups
"""
if self.only_membrane_groups:
mt = api.portal.get_tool(name="membrane_tool")
purl = api.portal.get_tool(name="portal_url")
groups = map(
BrainToGroupAdapter,
mt.unrestrictedSearchResults(
object_implements=(group_ifaces.IGroup.__identifier__),
path=purl.getPortalPath(),
),
)
else:
groups = api.group.get_groups()
return groups
def __call__(self):
q = safe_unicode(self.request.get("q", "").lower())
groups = self.get_groups()
group_details = []
ws = IWorkspace(self.context)
workspaces = api.portal.get().workspaces
for group in groups:
groupid = group.getId()
# XXX Filter out groups representing workspace roles. Review
# whether we need/want this and/or can do it more efficiently.
if groupid.partition(":")[0] in ws.available_groups:
continue
# Don't add the group that represents the current workspace to
# itself. This can only happen if the IMembraneGroup behaviour is
# active on current context.
if groupid == self.context.id and IMembraneGroupMarker.providedBy(
self.context
):
if self.only_membrane_groups:
# We are limited to membrane-only groups. The group
# found _could_ be this workspace. We need to compare
# the paths, which we can get cheaply from the catalog
# brain.
group_path = group.context.getPath()
if "/".join(self.context.getPhysicalPath()) == group_path:
continue
else:
# We used the generic groups search. We have no chance to
# distinguish where a group is located. Therefore, we need
# to ignore any group that has the same id as this
# workspace, to be on the safe side.
continue
if group.getProperty("state") == "secret":
wsobj = workspaces.get(group.getId())
if not (wsobj and wsobj.visible_to_intranet_users):
continue
title = safe_unicode(group.getProperty("title") or groupid)
email = safe_unicode(group.getProperty("email"))
if email:
title = u"%s <%s>" % (title, email)
description = safe_unicode(group.getProperty("description") or "")
if q in title.lower() or q in description.lower():
group_details.append({"text": title, "id": groupid})
return dumps(group_details)
[docs]class AllUsersAndGroupsJSONView(BrowserView):
def __call__(self):
q = self.request.get("q", "").strip()
if not q:
return ""
acl_users = api.portal.get_tool("acl_users")
results = []
groups = {x["id"]: x["title"] for x in acl_users.searchGroups(id=q)}
groups.update({x["id"]: x["title"] for x in acl_users.searchGroups(name=q)})
if groups:
for id, title in groups.items():
text = title or id
results.append({"id": id, "text": text})
query = {"SearchableText": u"{0}*".format(safe_unicode(q))}
users = pi_api.userprofile.get_users(full_objects=False, **query)
for user in users:
fullname = user.Title
email = user.email
results.append(
{
"id": user.getId,
"text": u"{0} <{1}>".format(safe_unicode(fullname), email),
}
)
return dumps(results)
[docs]class WorkspacesJSONView(BrowserView):
"""
Return a filtered list of workspaces for pat-autosuggest.
Any workspace can be found.
"""
def __call__(self):
q = safe_unicode(self.request.get("q", "").strip())
if not q:
return ""
query = {"phrase": u"{0}*".format(q), "filters": self.get_filters(), "step": 50}
search_util = getUtility(ISiteSearch)
workspaces = search_util.query(**query)
workspaces = sorted(workspaces, key=lambda ws: ws["Title"])
skip = self.get_skip()
return format_workspaces_json(workspaces, skip)
[docs] def get_filters(self):
return {"object_provides": "collective.workspace.interfaces.IHasWorkspace"}
[docs] def get_skip(self):
return []
[docs]@implementer(IBlocksTransformEnabled)
class WorkspaceCalendarView(BaseWorkspaceView):
"""
Wrapper to include the fullcalendar tile on workspaces
"""