Source code for ploneintranet.userprofile.browser.userprofile

# coding=utf-8
from AccessControl import Unauthorized
from copy import copy
from itertools import imap
from plone import api
from plone.app.blocks.interfaces import IBlocksTransformEnabled
from ploneintranet.layout.memoize.view import memoize
from ploneintranet.layout.memoize.view import memoize_contextless
from plone.protect.utils import safeWrite
from ploneintranet import api as pi_api
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.layout.interfaces import IDiazoAppTemplate
from ploneintranet.network.interfaces import INetworkTool
from ploneintranet.search.interfaces import ISiteSearch
from ploneintranet.userprofile.browser.forms import get_fields_for_template
from ploneintranet.userprofile.browser.forms import UserProfileViewForm
from ploneintranet.workspace.adapters import AVAILABLE_GROUPS
from Products.CMFCore.permissions import ModifyPortalContent
from Products.CMFPlone.browser.author import AuthorView as BaseAuthorView
from Products.Five import BrowserView
from webdav.common import rfc1123_date
from zExceptions import NotFound
from zope.component import getUtility
from zope.interface import implementer
from zope.publisher.interfaces import IPublishTraverse

import os


AVATAR_SIZES = {"profile": 200, "stream": 50}


[docs]def default_avatar(response): """Return the contents of a default profile image""" path = os.path.join(os.path.dirname(__file__), "defaultUser-168.png") img_data = open(path, "r").read() response.setHeader("content-type", "image/png") response.setHeader("content-disposition", 'inline; filename="DefaultAvatar.png"') response.setHeader("content-length", len(img_data)) return img_data
[docs]@implementer(IDiazoAppTemplate, IBlocksTransformEnabled) class UserProfileView(UserProfileViewForm): """View for user profile.""" _default_tabs = ( u"userprofile-view", u"userprofile-info", u"userprofile-followers", u"userprofile-following", u"userprofile-documents", u"userprofile-workspaces", ) @property @memoize def allowed_tabs(self): """ Filter out some tabs according to the registry configuration """ try: banned_tabs = api.portal.get_registry_record( "ploneintranet.userprofile.userprofile_hidden_info" ) except api.exc.InvalidParameterError: banned_tabs = () if "userprofile-follow*" in banned_tabs: banned_tabs += (u"userprofile-followers", u"userprofile-following") return [tab for tab in self._default_tabs if tab not in banned_tabs] @property @memoize def display_tabs(self): """ Check if the navigation should be displayed """ return len(self.allowed_tabs) > 1 @property @memoize def default_tab(self): """ Check if the navigation should be displayed """ allowed_tabs = self.allowed_tabs if not allowed_tabs: return u"" tab = self.request.form.get("tab") if tab in allowed_tabs: return tab return allowed_tabs[0] @property @memoize def display_followers(self): """ Check if we should display the followers informations """ return u"userprofile-followers" in self.allowed_tabs @property @memoize def display_following(self): """ Check if we should display the following informations """ return u"userprofile-following" in self.allowed_tabs @property @memoize def display_more_info_link(self): """ The more information link does not make sense if the only tab available is the userprofile-info or if userprofile-info is not between the allowed tabs """ return self.display_tabs and "userprofile-info" in self.allowed_tabs @property @memoize def display_change_personal_image(self): """ if portrait is not listed in read_only_fields we do not display the form to change it """ ro_fields = api.portal.get_registry_record( "ploneintranet.userprofile.read_only_fields", default=() ) return "portrait" not in ro_fields
[docs] @memoize def is_ajax(self): """ Check if we have an ajax call """ requested_with = self.request.environ.get("HTTP_X_REQUESTED_WITH") return requested_with == "XMLHttpRequest"
[docs] def disable_diazo(self): """ Disable diazo if this is an ajax call """ self.request.response.setHeader("X-Theme-Disabled", "1")
@property @memoize_contextless def pas_view(self): """ The graph containing the relations between the principals managed by PAS """ return api.content.get_view("pas_view", self.context, self.request) @property @memoize def ancestorspasuids(self): """ Return the PAS graph uids of the ancestors this user has """ pasuid = "userprofile:{}".format(self.context.UID()) return self.pas_view.ancestors(pasuid) @property @memoize def grouppasuids(self): """ Return the PAS graph uids of the groups this user is member of """ return { uid for uid in self.ancestorspasuids if uid.partition(":")[0] == "workgroup" } @property @memoize def workspacepasuids(self): """ Return the PAS graph uids of the workspaces this user is member of """ result = set() workspaces = self.portal.get("workspaces") if not workspaces: return result # use this to filter out templates allowed_plonepath = "/".join(workspaces.getPhysicalPath()) uid2path = self.pas_view.uid2path # mark as seen the uids of the userspaces (we do not count them) brains = api.content.find(portal_type="ploneintranet.workspace.superspace") seen_ploneuids = {brain.UID for brain in brains} for pasuid in self.ancestorspasuids: ploneuid = pasuid.rpartition(":")[-1] if ploneuid not in seen_ploneuids: # Given we have multiple groups for every workspaces # we consider just the first one seen_ploneuids.add(ploneuid) plonepath = uid2path[pasuid].rpartition(":")[-1] if plonepath.startswith(allowed_plonepath): result.add(pasuid) sitesearch = getUtility(ISiteSearch) solr_result = sitesearch.query( filters={ "UID": [res.rpartition(":")[-1] for res in result], "is_archived": True, }, step=len(result), ) archived_uids = [r.UID for r in solr_result] result = set( [res for res in result if res.rpartition(":")[-1] not in archived_uids] ) return result @property @memoize def my_groups(self): """ Return the attribute _my_groups, if needed invoke the function _get_my_groups_and_workspaces to set it """ try: return sorted( imap(self.pas_view.resolvepasuid, self.grouppasuids), key=lambda obj: obj.Title(), ) except KeyError: pass try: return self._my_groups except AttributeError: self._get_my_groups_and_workspaces() return self._my_groups @property @memoize def my_workspaces(self): """ Return the attribute _my_groups, if needed invoke the function _get_my_groups_and_workspaces to set it """ try: return sorted( imap(self.pas_view.resolvepasuid, self.workspacepasuids), key=lambda obj: obj.Title(), ) except KeyError: pass try: return self._my_workspaces except AttributeError: self._get_my_groups_and_workspaces() return self._my_workspaces @property @memoize def num_groups(self): """ Return the number of groups this user is member of """ try: return len(self.grouppasuids) except KeyError: return len(self.my_groups) @property @memoize def num_workspaces(self): """ Return the number of workspaces this user is member of """ try: return len(self.workspacepasuids) except KeyError: return len(self.my_workspaces)
[docs] def update(self): if self.__name__ not in ("userprofile-info", "userprofile-view"): return self._update_recent_contacts()
[docs] def is_me(self): """Does this user profile belong to the current user""" # .username is actually the userid see #1043 return self.context.username == api.user.get_current().getId()
[docs] def can_edit(self): """ Check current user permissions to edit this object """ if self.request.form.get("skip_edit"): return False if self.is_me(): return True return api.user.has_permission(ModifyPortalContent, obj=self.context)
[docs] def following(self): """Users this profile is following""" graph = getUtility(INetworkTool) return self._user_details(graph.get_following("user", self.context.username))
[docs] def followers(self): """Users who are following this profile""" graph = getUtility(INetworkTool) return self._user_details(graph.get_followers("user", self.context.username))
@property @memoize_contextless def portal(self): """ Retuns the portal object """ return api.portal.get() @property @memoize_contextless def group_container(self): """ Retuns the group_container or an empty dict """ return self.portal.get("groups", {}) @property @memoize_contextless def workspace_container(self): """ The main workspace container """ return self.portal.get("workspaces", {}) @property @memoize_contextless def workspace_container_view(self): """ The main workspace container view """ return api.content.get_view( "workspaces.html", self.workspace_container, self.request ) @property @memoize def group_ids(self): pg = api.portal.get_tool("portal_groups") user = api.user.get(username=self.context.username) # Filter out some trivial groups my_groupids = { groupid for groupid in pg.getGroupsForPrincipal(user) if groupid not in {"Members", "AuthenticatedUsers"} } return my_groupids
[docs] def _get_my_groups_and_workspaces(self): """ Find all the groups and all the workspaces the user is a member of. Since workspaces can also act as groups, only count those items as groups which are not also a workspace. """ my_groupids = self.group_ids workspace_uids = set([]) groups = [] group_container = self.group_container # Remove the collective.workspace groups for groupid in list(my_groupids): group, uid = groupid.partition(":")[::2] if group in AVAILABLE_GROUPS and len(uid) >= 32: workspace_uids.add(uid) my_groupids.remove(groupid) mt = api.portal.get_tool("membrane_tool") workspaces_path = "/".join(self.workspace_container.getPhysicalPath()) groups_path = "/".join(group_container.getPhysicalPath()) brains = mt( exact_getGroupId=list(my_groupids), path=[workspaces_path, groups_path] ) for brain in brains: brain_path = brain.getPath() if brain_path.startswith(workspaces_path): workspace_uids.add(brain.UID) my_groupids.remove(brain.getId) else: group = brain.getObject() groups.append(group) my_groupids.remove(group.getGroupId()) self._my_groups = groups # Keep only the workspace that are in the right place self._my_workspaces = [ brain for brain in self.portal.portal_catalog( path="/".join(self.workspace_container.getPhysicalPath()), UID=list(workspace_uids), is_archived=False, ) if "superspace" not in brain.portal_type ]
[docs] def _user_details(self, userids): """Basic user details for the given userids""" details = [] for userid in userids: profile = pi_api.userprofile.get(userid) if profile is None: continue details.append( { "title": profile.fullname, "url": profile.absolute_url(), "userid": userid, } ) return details
[docs] def _update_recent_contacts(self): """ Update, if needed, the list of the last twenty profiles that we have visited """ my_profile = pi_api.userprofile.get_current() contact = self.context.username if not my_profile or my_profile.username == contact: return recent_contacts = copy(my_profile.recent_contacts or []) # If the contact is already the first on the list, we have nothing todo try: if recent_contacts.index(contact) == 0: return except ValueError: pass # Otherwise we want it to be the first on the list try: recent_contacts.remove(contact) except ValueError: pass recent_contacts.insert(0, contact) # We limit ourselves recent_contacts = recent_contacts[:20] # Do not touch the DB if nothing has changed if my_profile.recent_contacts == recent_contacts: return safeWrite(my_profile, self.request) my_profile.recent_contacts = recent_contacts
[docs] def fields_for_display(self): return get_fields_for_template(self)
[docs] @memoize def user_search_placeholder(self): msg = _( u"user_search_placeholder", default=u"Search ${user_name}'s documents", mapping={"user_name": self.context.fullname}, ) return msg
[docs]class UserProfileTabView(UserProfileView): """ Personalize the userprofile tab view class to not be transformed by diazo if we have an ajax call """ def __call__(self): """ Set diazo.off if this is an ajax request """ if self.is_ajax(): self.disable_diazo() return super(UserProfileTabView, self).__call__()
[docs]class AuthorView(BaseAuthorView): """Overrides default author view to link to PI profiles""" def __call__(self): profile = pi_api.userprofile.get(self.username) if profile is not None: return self.request.response.redirect(profile.absolute_url()) raise NotFound
[docs]class MyProfileView(BrowserView): """Helper view to redirect to current user's profile page""" def __call__(self): profile = pi_api.userprofile.get_current() if profile is not None: return self.request.response.redirect(profile.absolute_url()) raise Unauthorized
[docs]def stream_avatar_data(profile, size, request): """Generate avatar at the specified size and stream it This is a utility method used by the browser views below. """ response = request.response if not profile: return default_avatar(response) imaging = api.content.get_view(request=request, context=profile, name="images") if size not in AVATAR_SIZES: return default_avatar(response) width = height = AVATAR_SIZES.get(size) try: scale = imaging.scale( fieldname="portrait", width=width, height=height, direction="down" ) except TypeError: # No image found return default_avatar(response) if scale is not None: data = scale.data mtime = rfc1123_date(profile._p_mtime) response.setHeader("Last-Modified", mtime) from plone.namedfile.utils import set_headers, stream_data set_headers(data, response) return stream_data(data) else: return default_avatar(response)
[docs]@implementer(IPublishTraverse) class AvatarsView(BrowserView): """Helper view to render a user's avatar image This view is designed to mimic Plone's default portrait setup. Where portraits are accessed via: /plone/portal_memberdata/portraits/userid this can be replaced with: /plone/@@avatars/userid This allows you to easily link to an avatar without first looking up the user profile object. """
[docs] def publishTraverse(self, request, name): # @@avatars/userid/size self.userid = name stack = request.get("TraversalRequestNameStack", []) if stack: self.size = stack.pop() else: self.size = "stream" request["TraversalRequestNameStack"] = [] return self
def __call__(self): profile = pi_api.userprofile.get(self.userid) return stream_avatar_data(profile, self.size, self.request)
[docs]class MyAvatar(BrowserView): """Helper view to render a user's avatar image This view is designed to be used on the end of a user profile URL, e.g. in search results or listings /path/to/profile/avatar.jpg """ def __call__(self): return stream_avatar_data(self.context, "stream", self.request)
[docs] def avatar_profile(self): return stream_avatar_data(self.context, "profile", self.request)