# coding=utf-8
from AccessControl import Unauthorized
from copy import copy
from itertools import imap
from plone import api
from plone.app.blocks.interfaces import IBlocksTransformEnabled
from ploneintranet.layout.memoize.view import memoize
from ploneintranet.layout.memoize.view import memoize_contextless
from plone.protect.utils import safeWrite
from ploneintranet import api as pi_api
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.layout.interfaces import IDiazoAppTemplate
from ploneintranet.network.interfaces import INetworkTool
from ploneintranet.search.interfaces import ISiteSearch
from ploneintranet.userprofile.browser.forms import get_fields_for_template
from ploneintranet.userprofile.browser.forms import UserProfileViewForm
from ploneintranet.workspace.adapters import AVAILABLE_GROUPS
from Products.CMFCore.permissions import ModifyPortalContent
from Products.CMFPlone.browser.author import AuthorView as BaseAuthorView
from Products.Five import BrowserView
from webdav.common import rfc1123_date
from zExceptions import NotFound
from zope.component import getUtility
from zope.interface import implementer
from zope.publisher.interfaces import IPublishTraverse
import os
AVATAR_SIZES = {"profile": 200, "stream": 50}
[docs]def default_avatar(response):
"""Return the contents of a default profile image"""
path = os.path.join(os.path.dirname(__file__), "defaultUser-168.png")
img_data = open(path, "r").read()
response.setHeader("content-type", "image/png")
response.setHeader("content-disposition", 'inline; filename="DefaultAvatar.png"')
response.setHeader("content-length", len(img_data))
return img_data
[docs]@implementer(IDiazoAppTemplate, IBlocksTransformEnabled)
class UserProfileView(UserProfileViewForm):
"""View for user profile."""
_default_tabs = (
u"userprofile-view",
u"userprofile-info",
u"userprofile-followers",
u"userprofile-following",
u"userprofile-documents",
u"userprofile-workspaces",
)
@property
@memoize
def allowed_tabs(self):
""" Filter out some tabs according to the registry configuration
"""
try:
banned_tabs = api.portal.get_registry_record(
"ploneintranet.userprofile.userprofile_hidden_info"
)
except api.exc.InvalidParameterError:
banned_tabs = ()
if "userprofile-follow*" in banned_tabs:
banned_tabs += (u"userprofile-followers", u"userprofile-following")
return [tab for tab in self._default_tabs if tab not in banned_tabs]
@property
@memoize
def display_tabs(self):
""" Check if the navigation should be displayed
"""
return len(self.allowed_tabs) > 1
@property
@memoize
def default_tab(self):
""" Check if the navigation should be displayed
"""
allowed_tabs = self.allowed_tabs
if not allowed_tabs:
return u""
tab = self.request.form.get("tab")
if tab in allowed_tabs:
return tab
return allowed_tabs[0]
@property
@memoize
def display_followers(self):
""" Check if we should display the followers informations
"""
return u"userprofile-followers" in self.allowed_tabs
@property
@memoize
def display_following(self):
""" Check if we should display the following informations
"""
return u"userprofile-following" in self.allowed_tabs
@property
@memoize
def display_more_info_link(self):
""" The more information link does not make sense if the only
tab available is the userprofile-info or if userprofile-info is not
between the allowed tabs
"""
return self.display_tabs and "userprofile-info" in self.allowed_tabs
@property
@memoize
def display_change_personal_image(self):
""" if portrait is not listed in read_only_fields
we do not display the form to change it
"""
ro_fields = api.portal.get_registry_record(
"ploneintranet.userprofile.read_only_fields", default=()
)
return "portrait" not in ro_fields
[docs] @memoize
def is_ajax(self):
""" Check if we have an ajax call
"""
requested_with = self.request.environ.get("HTTP_X_REQUESTED_WITH")
return requested_with == "XMLHttpRequest"
[docs] def disable_diazo(self):
""" Disable diazo if this is an ajax call
"""
self.request.response.setHeader("X-Theme-Disabled", "1")
@property
@memoize_contextless
def pas_view(self):
""" The graph containing the relations between the principals
managed by PAS
"""
return api.content.get_view("pas_view", self.context, self.request)
@property
@memoize
def ancestorspasuids(self):
""" Return the PAS graph uids of the ancestors this user has
"""
pasuid = "userprofile:{}".format(self.context.UID())
return self.pas_view.ancestors(pasuid)
@property
@memoize
def grouppasuids(self):
""" Return the PAS graph uids of the groups this user is member of
"""
return {
uid for uid in self.ancestorspasuids if uid.partition(":")[0] == "workgroup"
}
@property
@memoize
def workspacepasuids(self):
""" Return the PAS graph uids of the workspaces this user is member of
"""
result = set()
workspaces = self.portal.get("workspaces")
if not workspaces:
return result
# use this to filter out templates
allowed_plonepath = "/".join(workspaces.getPhysicalPath())
uid2path = self.pas_view.uid2path
# mark as seen the uids of the userspaces (we do not count them)
brains = api.content.find(portal_type="ploneintranet.workspace.superspace")
seen_ploneuids = {brain.UID for brain in brains}
for pasuid in self.ancestorspasuids:
ploneuid = pasuid.rpartition(":")[-1]
if ploneuid not in seen_ploneuids:
# Given we have multiple groups for every workspaces
# we consider just the first one
seen_ploneuids.add(ploneuid)
plonepath = uid2path[pasuid].rpartition(":")[-1]
if plonepath.startswith(allowed_plonepath):
result.add(pasuid)
sitesearch = getUtility(ISiteSearch)
solr_result = sitesearch.query(
filters={
"UID": [res.rpartition(":")[-1] for res in result],
"is_archived": True,
},
step=len(result),
)
archived_uids = [r.UID for r in solr_result]
result = set(
[res for res in result if res.rpartition(":")[-1] not in archived_uids]
)
return result
@property
@memoize
def my_groups(self):
""" Return the attribute _my_groups,
if needed invoke the function _get_my_groups_and_workspaces to set it
"""
try:
return sorted(
imap(self.pas_view.resolvepasuid, self.grouppasuids),
key=lambda obj: obj.Title(),
)
except KeyError:
pass
try:
return self._my_groups
except AttributeError:
self._get_my_groups_and_workspaces()
return self._my_groups
@property
@memoize
def my_workspaces(self):
""" Return the attribute _my_groups,
if needed invoke the function _get_my_groups_and_workspaces to set it
"""
try:
return sorted(
imap(self.pas_view.resolvepasuid, self.workspacepasuids),
key=lambda obj: obj.Title(),
)
except KeyError:
pass
try:
return self._my_workspaces
except AttributeError:
self._get_my_groups_and_workspaces()
return self._my_workspaces
@property
@memoize
def num_groups(self):
""" Return the number of groups this user is member of
"""
try:
return len(self.grouppasuids)
except KeyError:
return len(self.my_groups)
@property
@memoize
def num_workspaces(self):
""" Return the number of workspaces this user is member of
"""
try:
return len(self.workspacepasuids)
except KeyError:
return len(self.my_workspaces)
[docs] def update(self):
if self.__name__ not in ("userprofile-info", "userprofile-view"):
return
self._update_recent_contacts()
[docs] def is_me(self):
"""Does this user profile belong to the current user"""
# .username is actually the userid see #1043
return self.context.username == api.user.get_current().getId()
[docs] def can_edit(self):
""" Check current user permissions to edit this object
"""
if self.request.form.get("skip_edit"):
return False
if self.is_me():
return True
return api.user.has_permission(ModifyPortalContent, obj=self.context)
[docs] def following(self):
"""Users this profile is following"""
graph = getUtility(INetworkTool)
return self._user_details(graph.get_following("user", self.context.username))
[docs] def followers(self):
"""Users who are following this profile"""
graph = getUtility(INetworkTool)
return self._user_details(graph.get_followers("user", self.context.username))
@property
@memoize_contextless
def portal(self):
""" Retuns the portal object
"""
return api.portal.get()
@property
@memoize_contextless
def group_container(self):
""" Retuns the group_container or an empty dict
"""
return self.portal.get("groups", {})
@property
@memoize_contextless
def workspace_container(self):
""" The main workspace container
"""
return self.portal.get("workspaces", {})
@property
@memoize_contextless
def workspace_container_view(self):
""" The main workspace container view
"""
return api.content.get_view(
"workspaces.html", self.workspace_container, self.request
)
@property
@memoize
def group_ids(self):
pg = api.portal.get_tool("portal_groups")
user = api.user.get(username=self.context.username)
# Filter out some trivial groups
my_groupids = {
groupid
for groupid in pg.getGroupsForPrincipal(user)
if groupid not in {"Members", "AuthenticatedUsers"}
}
return my_groupids
[docs] def _get_my_groups_and_workspaces(self):
"""
Find all the groups and all the workspaces the user is a member of.
Since workspaces can also act as groups, only count those items
as groups which are not also a workspace.
"""
my_groupids = self.group_ids
workspace_uids = set([])
groups = []
group_container = self.group_container
# Remove the collective.workspace groups
for groupid in list(my_groupids):
group, uid = groupid.partition(":")[::2]
if group in AVAILABLE_GROUPS and len(uid) >= 32:
workspace_uids.add(uid)
my_groupids.remove(groupid)
mt = api.portal.get_tool("membrane_tool")
workspaces_path = "/".join(self.workspace_container.getPhysicalPath())
groups_path = "/".join(group_container.getPhysicalPath())
brains = mt(
exact_getGroupId=list(my_groupids), path=[workspaces_path, groups_path]
)
for brain in brains:
brain_path = brain.getPath()
if brain_path.startswith(workspaces_path):
workspace_uids.add(brain.UID)
my_groupids.remove(brain.getId)
else:
group = brain.getObject()
groups.append(group)
my_groupids.remove(group.getGroupId())
self._my_groups = groups
# Keep only the workspace that are in the right place
self._my_workspaces = [
brain
for brain in self.portal.portal_catalog(
path="/".join(self.workspace_container.getPhysicalPath()),
UID=list(workspace_uids),
is_archived=False,
)
if "superspace" not in brain.portal_type
]
[docs] def _user_details(self, userids):
"""Basic user details for the given userids"""
details = []
for userid in userids:
profile = pi_api.userprofile.get(userid)
if profile is None:
continue
details.append(
{
"title": profile.fullname,
"url": profile.absolute_url(),
"userid": userid,
}
)
return details
[docs] def fields_for_display(self):
return get_fields_for_template(self)
[docs] @memoize
def user_search_placeholder(self):
msg = _(
u"user_search_placeholder",
default=u"Search ${user_name}'s documents",
mapping={"user_name": self.context.fullname},
)
return msg
[docs]class UserProfileTabView(UserProfileView):
""" Personalize the userprofile tab view class to not be transformed
by diazo if we have an ajax call
"""
def __call__(self):
""" Set diazo.off if this is an ajax request
"""
if self.is_ajax():
self.disable_diazo()
return super(UserProfileTabView, self).__call__()
[docs]class AuthorView(BaseAuthorView):
"""Overrides default author view to link to PI profiles"""
def __call__(self):
profile = pi_api.userprofile.get(self.username)
if profile is not None:
return self.request.response.redirect(profile.absolute_url())
raise NotFound
[docs]class MyProfileView(BrowserView):
"""Helper view to redirect to current user's profile page"""
def __call__(self):
profile = pi_api.userprofile.get_current()
if profile is not None:
return self.request.response.redirect(profile.absolute_url())
raise Unauthorized
[docs]def stream_avatar_data(profile, size, request):
"""Generate avatar at the specified size and stream it
This is a utility method used by the browser views below.
"""
response = request.response
if not profile:
return default_avatar(response)
imaging = api.content.get_view(request=request, context=profile, name="images")
if size not in AVATAR_SIZES:
return default_avatar(response)
width = height = AVATAR_SIZES.get(size)
try:
scale = imaging.scale(
fieldname="portrait", width=width, height=height, direction="down"
)
except TypeError:
# No image found
return default_avatar(response)
if scale is not None:
data = scale.data
mtime = rfc1123_date(profile._p_mtime)
response.setHeader("Last-Modified", mtime)
from plone.namedfile.utils import set_headers, stream_data
set_headers(data, response)
return stream_data(data)
else:
return default_avatar(response)
[docs]@implementer(IPublishTraverse)
class AvatarsView(BrowserView):
"""Helper view to render a user's avatar image
This view is designed to mimic Plone's default portrait setup.
Where portraits are accessed via:
/plone/portal_memberdata/portraits/userid
this can be replaced with:
/plone/@@avatars/userid
This allows you to easily link to an avatar without first
looking up the user profile object.
"""
[docs] def publishTraverse(self, request, name):
# @@avatars/userid/size
self.userid = name
stack = request.get("TraversalRequestNameStack", [])
if stack:
self.size = stack.pop()
else:
self.size = "stream"
request["TraversalRequestNameStack"] = []
return self
def __call__(self):
profile = pi_api.userprofile.get(self.userid)
return stream_avatar_data(profile, self.size, self.request)
[docs]class MyAvatar(BrowserView):
"""Helper view to render a user's avatar image
This view is designed to be used on the end of a user profile URL,
e.g. in search results or listings
/path/to/profile/avatar.jpg
"""
def __call__(self):
return stream_avatar_data(self.context, "stream", self.request)
[docs] def avatar_profile(self):
return stream_avatar_data(self.context, "profile", self.request)