Source code for ploneintranet.workspace.browser.tiles.sidebar

# coding=utf-8
from ...basecontent.utils import dexterity_update
from ...interfaces import IGroupingStorage
from ...utils import map_content_type
from ...utils import parent_workspace
from .events import format_event_date_for_title
from AccessControl import Unauthorized
from Acquisition import aq_base
from BTrees.OOBTree import OOBTree
from collections import OrderedDict
from collective.workspace.interfaces import IWorkspace
from copy import deepcopy
from DateTime import DateTime
from plone import api
from plone.app.contenttypes.interfaces import IEvent
from plone.app.event.base import localized_now
from plone.batching import Batch
from plone.behavior.interfaces import IBehaviorAssignable
from plone.i18n.normalizer import idnormalizer
from plone.protect.authenticator import createToken
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.layout.interfaces import IDiazoNoTemplate
from ploneintranet.layout.memoize.view import memoize
from ploneintranet.layout.memoize.view import memoize_contextless
from ploneintranet.search.interfaces import ISearchResponse
from ploneintranet.search.interfaces import ISiteSearch
from ploneintranet.todo.utils import update_task_status
from ploneintranet.workspace.basecontent.utils import get_selection_classes
from ploneintranet.workspace.browser.show_extra import set_show_extra_cookie
from ploneintranet.workspace.config import INTRANET_USERS_GROUP_ID
from ploneintranet.workspace.events import WorkspaceRosterChangedEvent
from ploneintranet.workspace.interfaces import ITrashed
from Products.CMFPlone.utils import safe_unicode
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
from Products.statusmessages.interfaces import IStatusMessage
from scorched.dates import solr_date
from slc.mailrouter.interfaces import IFriendlyNameStorage
from slc.mailrouter.utils import store_name
from urllib import quote
from urllib import urlencode
from zope.annotation import IAnnotations
from zope.component import getAdapter
from zope.component import getUtility
from zope.event import notify
from zope.interface import implementer
from zope.lifecycleevent import ObjectModifiedEvent
from zope.publisher.browser import BrowserView
from zope.schema import getFieldNames
from zope.schema.interfaces import IVocabularyFactory
from ZTUtils import make_query

import logging


log = logging.getLogger(__name__)

vocab = "ploneintranet.workspace.vocabularies.Divisions"

DATE_GROUPING_VALUES = OrderedDict(
    [
        ("today", dict(title=_(u"Today"), description=_(u"Items modified today"))),
        (
            "week",
            dict(title=_(u"Last Week"), description=_(u"Items modified last week")),
        ),
        (
            "month",
            dict(title=_(u"Last Month"), description=_(u"Items modified last month")),
        ),
        ("ever", dict(title=_(u"All Time"), description=_(u"Older"))),
    ]
)


[docs]class BaseTile(BrowserView): """ Shared baseclass for the sidebar tiles. Cave: many of the actual sidebar templates *also* bind to ploneintranet.workspace.browser.workspace.WorkspaceView to use methods defined there... """ index = None form_submitted = False general_settings_autoload = "trigger: autoload;" _user_groupings_key = "ploneintranet.workspace.user_groupings" _batch_size = 20 _grouping_options = [ {"title": _("Group by folder"), "value": "folder"}, {"title": _("Group by tag"), "value": "label"}, {"title": _("Group by document type"), "value": "type"}, {"title": _("Group by author"), "value": "author"}, {"title": _("Group by date"), "value": "date"}, {"title": _("Group by first letter"), "value": "first_letter"}, ] @property @memoize def is_trashed(self): """ Check if the current document is in the trash can """ return ITrashed.providedBy(self.context) @property @memoize def default_grouping_option(self): """ Override this one: the default can be set in the site settings, per context, per user or whatever """ @property @memoize def selected_grouping_option(self): """ Override this one: the selected value can be set in in the context, in the user or whatever """ @property @memoize def grouping_options(self): """ Options to set the sidebar grouping """ options = deepcopy(self._grouping_options) default = self.default_grouping_option selected = self.selected_grouping_option or default for option in options: value = option["value"] option["selected"] = "selected" if value == selected else None option["default"] = value == default return options @property @memoize def root(self): """ The root object for the sidebar, typically a workspace, but the sidebar can also be used for apps, e.g. quaive.app.slides. """ return parent_workspace(self.context)
[docs] def page_number(self): """Get current page number from the request """ try: page = int(self.request.form.get("page", 1)) except ValueError: page = 1 return page
[docs] def get_query_start(self): """ Fills the start parameter of the search query, i.e. the first element of the batch """ return (self.page_number() - 1) * self._batch_size
[docs] def next_page_number(self,): """Get page number for next page of search results""" page = self.page_number() total_items = len(self.items()) if page * self._batch_size < total_items: return page + 1 else: return None
[docs] def next_page_url(self): """Get url for the next page of results""" next_page_number = self.next_page_number() if not next_page_number: return new_query = make_query(self.request.form.copy(), {"page": next_page_number}) return "{url}?{qs}".format(url=self.request.ACTUAL_URL, qs=new_query)
@property @memoize def site_default_grouping(self): """ The site default grouping as specified in the registry """ return api.portal.get_registry_record( "ploneintranet.workspace.default_grouping", default="folder" )
[docs] @memoize def get_default_grouping(self): """ Get the default grouping for document navigation in the sidebar """ return self.root.default_grouping or self.site_default_grouping
@property @memoize def current_user(self): """ Return the current authenticated user id """ return api.user.get_current() @property @memoize def current_userid(self): """ Return the current authenticated user id """ return self.current_user.getId()
[docs] def render(self): return self.index()
def __call__(self): return self.render()
[docs] def notify_success(self): """ Notify success """ msg = _("Attributes changed.") api.portal.show_message(msg, request=self.request, type="success")
[docs] def notify_errors(self, errors): """ Notify errors upon save """ msg = _( "sidebar_save_error_message", default=("There was a problem updating the content: ${errors}."), mapping={"errors": errors}, ) api.portal.show_message(msg, request=self.request, type="error")
[docs] def _basic_save(self): """ Performs a simple save of entered attributes. Not all sidebars need this """ form = self.request.form if ( not form or self.request.method != "POST" or not self.workspace_view.can_manage_workspace() ): return modified, errors = dexterity_update(self.context) if errors: return self.notify_errors(errors) if not modified: return self.notify_success() self.workspace().reindexObject() notify(ObjectModifiedEvent(self.context)) # If we modified the hero image we want to rerender the workspace view if not form.get("hero_image", None): return return self.request.response.redirect(self.context.absolute_url())
[docs] def status_messages(self): """ Returns status messages if any """ messages = IStatusMessage(self.request) m = messages.show() for item in m: item.id = idnormalizer.normalize(item.message) return m
[docs] @memoize def workspace(self): """ Acquire the workspace of the current context """ return parent_workspace(self)
@property @memoize def workspace_view(self): """ Acquire the workspace of the current context """ return api.content.get_view("view", self.workspace(), self.request)
[docs] def can_add(self): """ Is this user allowed to add content? Cave. We don't use the plone.app.contenttypes per-type permissions. Our workflows don't map those, only cmf.AddPortalContent. """ if getattr(aq_base(self.context), "disable_add_from_sidebar", False): return False return self.current_user.has_permission("Add portal content", self.context)
[docs] def can_edit(self): """ Is this user allowed to add content? """ return self.current_user.has_permission("Modify portal content", self.context)
[docs] def can_change_label(self): """ Is this user allowed to change a label, and can the label be edited? """ if self.can_edit(): return self.grouping() == "folder" return False
[docs] def can_delete(self, obj=None): """ Is this user allowed to delete an object? """ if obj is None: obj = self.context elif hasattr(obj, "getObject"): obj = obj.getObject() return self.current_user.has_permission("Delete objects", obj)
[docs] def can_subscribe(self): """ Check if we see the subscribe feature in the bulk actions """ return api.portal.get_registry_record( "ploneintranet.workspace.allow_bulk_subscribe", default=False )
[docs] def date_as_datetime(self, date): """ The content items in the sidebar have a `DateTime` as their modification date. But search results have a `datetime`. """ if isinstance(date, DateTime): date = date.asdatetime() return date
[docs]class SidebarSettingsGeneral(BaseTile): index = ViewPageTemplateFile("templates/sidebar-settings-general.pt") def __call__(self): """ write attributes, if any, set state, render """ self._basic_save() return self.render()
[docs]class SidebarSettingsMembers(BaseTile): """ A view to serve as the member roster in the sidebar """ index = ViewPageTemplateFile("templates/sidebar-settings-members.pt")
[docs] def describe_permissions(self): """ """ policy = self.context.participant_policy if self.context.accessible_to_intranet_users: if policy == "consumers": return _( "<strong>All intranet users</strong> can read content. " "They cannot add, publish or edit content." ) if policy == "producers": return _( "<strong>All intranet users</strong> can read and add " "content. They can neither publish nor edit content." ) if policy == "publishers": return _( "<strong>All intranet users</strong> can read, add and " "publish content. They cannot edit other members content." ) if policy == "moderators": return _( "<strong>All intranet users</strong> can do everything. " "Read, add, publish and edit content." ) if policy == "consumers": return _( "<strong>The users listed below</strong> can read content. " "They cannot add, publish or edit content." ) if policy == "producers": return _( "<strong>The users listed below</strong> can read and add " "content. They can neither publish nor edit content." ) if policy == "publishers": return _( "<strong>The users listed below</strong> can read, add and " "publish content. They cannot edit other members content." ) if policy == "moderators": return _( "<strong>The users listed below</strong> can do everything. " "Read, add, publish and edit content." ) log.warning("Unknown participation policy")
[docs] def display_principal(self, principal): """Check if we should display this principal We do not want to display the "All intranet user group" if he has no particular roles """ try: groupid = principal.getGroupId() except Exception: return True if groupid != INTRANET_USERS_GROUP_ID: return True # Display the group if it has some roles return bool(self.workspace_view.get_principal_roles(principal))
[docs] def execute_batch_function(self): def _check_can_manage(self): if not self.workspace_view.can_manage_roster(): msg = _( u"You do not have permission to change the workspace " u"membership roster." ) raise Unauthorized(msg) def _check_can_add(self): if not self.workspace_view.can_add_users(): msg = _( u"You do not have permission to add users to the " u"workspace." ) raise Unauthorized(msg) form = self.request.form user_ids = form.get("user_id") if not user_ids: return if isinstance(user_ids, basestring): user_ids = user_ids.split(",") ws = self.workspace() batch_function = form.get("batch-function") if batch_function == "add": _check_can_add(self) for user_id in user_ids: IWorkspace(ws).add_to_team(user_id) msg = _(u"Member(s) added") msg_type = "success" elif batch_function == "remove": _check_can_manage(self) for user_id in user_ids: IWorkspace(ws).remove_from_team(user_id) msg = _(u"Member(s) removed") msg_type = "success" elif batch_function == "role": _check_can_manage(self) role = self.request.get("role") groups = role and {role} or None for user_id in user_ids: IWorkspace(ws).add_to_team(user_id, groups=groups) msg = _(u"Role updated") msg_type = "success" else: msg = _(u"Unknown function") msg_type = "error" api.portal.show_message(msg, self.request, msg_type) notify(WorkspaceRosterChangedEvent(self.context))
def __call__(self): if self.request.method == "POST": try: self.execute_batch_function() except Unauthorized as err: api.portal.show_message(err, self.request, "error") return self.render()
[docs]class SidebarSettingsAdvanced(BaseTile): """ A view to serve as the advanced config in the sidebar """ index = ViewPageTemplateFile("templates/sidebar-settings-advanced.pt")
[docs] def can_delete_workspace(self): ws = parent_workspace(self.context) return self.current_user.has_permission("Delete objects", ws)
[docs] def can_be_division(self): """ Check if this object can be a division, i.e. has a division field in the schema or in a behavior """ schema = self.context.getTypeInfo().lookupSchema() if "is_division" in getFieldNames(schema): return True behavior_assignable = IBehaviorAssignable(self.context) if behavior_assignable: behaviors = behavior_assignable.enumerateBehaviors() for behavior in behaviors: if "is_division" in getFieldNames(schema): return True return False
[docs] def update_relation_targets(self, old_relations): """ Get old relations and new relations and update the targets to point to self.context. """ context_uid = self.context.UID() old_relations = set(old_relations or []) new_relations = set(self.context.related_workspaces or []) to_add = list(new_relations - old_relations) if to_add: targets = (brain.getObject() for brain in api.content.find(UID=to_add)) else: targets = () for target in targets: if not target.related_workspaces: target.related_workspaces = [context_uid] elif context_uid not in target.related_workspaces: target.related_workspaces.append(context_uid) to_remove = list(old_relations - new_relations) if to_remove: targets = (brain.getObject() for brain in api.content.find(UID=to_remove)) else: targets = () for target in targets: if target.related_workspaces and context_uid in target.related_workspaces: target.related_workspaces.remove(context_uid)
@property @memoize def default_grouping_option(self): """ In the settings the default is defined in the registry """ return self.site_default_grouping @property @memoize def selected_grouping_option(self): """ In the settings the selected grouping is stored in the root """ return self.root.default_grouping
[docs] def divisions(self): """ return available divisions """ divisions = getUtility(IVocabularyFactory, vocab)(self.context) return divisions
[docs] def get_selection_classes(self, field, default=None): """ identify all groups in the invitees """ return get_selection_classes(self.context, field, default)
def __call__(self): """ write attributes, if any, set state, render """ form = self.request.form if self.request.method == "POST" and form: if self.workspace_view.can_manage_workspace(): if "email" in form: old_email = self.context.email if form["email"]: if "@" in form["email"]: # Only use the name part as the domain is fixed. form["email"] = form["email"].split("@")[0] if "related_workspaces" in form and form["related_workspaces"]: # We defined this as a list of TextLine values. # Therefore, the value from the form must be passed # as a string with one value per line. value = form["related_workspaces"] # First, if this is a list, flatten to a comma-separated # string value = ",".join([x for x in value if x]) # Now, replace all commas with a new-line character value = value.replace(",", "\n") form["related_workspaces"] = value old_related_workspaces = self.context.related_workspaces else: old_related_workspaces = "" modified, errors = dexterity_update(self.context) self.update_relation_targets(old_related_workspaces) if "email" in form: if form["email"]: errors += store_name(self.context, form["email"]).values() elif old_email: storage = getUtility(IFriendlyNameStorage) storage.remove(storage.get(old_email)) if modified and not errors: self.notify_success() notify(ObjectModifiedEvent(self.context)) if errors: self.notify_errors(errors) return self.render()
[docs]class SidebarDocuments(Sidebar): """ Customized tile that shows only the Documents """ index = ViewPageTemplateFile("templates/sidebar-documents.pt") can_slides = True @property @memoize def default_grouping_option(self): """ In the settings the default is defined in the registry """ return self.root.default_grouping or self.site_default_grouping @property @memoize def selected_grouping_option(self): """ In the document sidebar this is probably in the request or in the user preferences """ return self.grouping() @property @memoize def show_trash_item(self): """ The trash will be shown only when browsing by folder and at the workspace root """ if self.grouping() != "folder": return False if self.context != self.root: return False return True @property @memoize def current_label(self): """ Return the label (aka tag or subject) that we are exploring """ if not self.grouping().startswith("label"): return groupname = self.request.get("groupname") if not groupname: return if groupname == "Untagged": return return groupname @property @memoize def autotag_bulk_uploaded(self): """ Check if we should activate the autotag feature on the bulk uploaded files """ return self.current_label @property @memoize def bulk_upload_url(self): params = {"_authenticator": createToken()} if self.autotag_bulk_uploaded: params["groupname"] = self.current_label return "{url}/workspaceFileUpload?{qs}".format( url=self.context.absolute_url(), qs=urlencode(params) )
[docs]@implementer(IDiazoNoTemplate) class SidebarContentDocuments(SidebarDocuments): """ The document sidedar contents """ index = ViewPageTemplateFile("templates/sidebar-content-documents.pt")
[docs] @memoize def batched_items(self): """ XXX: This is not really smart but it is needed because we cannot always rely on solr for batching things """ items = self.items() start = self.get_query_start() stop = start + self._batch_size return items[start:stop]
[docs]class SidebarPaste(SidebarDocuments): """ The document sidedar contents """
[docs] def do_paste(self): """ Reuse the cart-paste homonymous method """ view = api.content.get_view("cart-paste", self.context, self.request) return view.do_paste()
def __call__(self): self.do_paste() return self.request.response.redirect( self.context.absolute_url() + "/@@sidebar.documents" )
[docs]class SidebarEvents(Sidebar): """ Customized tile that shows only the Events """ index = ViewPageTemplateFile("templates/sidebar-events.pt")
[docs] def get_base_query(self): """ This is the base query for searching inside a workspace """ query = { "path": "/".join(self.root.getPhysicalPath()), "sort_on": "start", "object_provides": IEvent.__identifier__, } return query
[docs] def events(self): """ Return the events in this workspace to be shown in the events section of the sidebar """ now = localized_now() # Current and future events upcoming_events = self.query_items( { "end": {"query": now, "range": "min"}, "sort_order": "ascending", "step": 5, } ) # Events which have finished older_events = self.query_items( {"end": {"query": now, "range": "max"}, "sort_order": "descending"} ) return {"upcoming": upcoming_events, "older": older_events}
[docs] @memoize def items(self): """ This one will not return any item """ return []
[docs]@implementer(IDiazoNoTemplate) class SidebarContentEvents(SidebarEvents): """ Customized tile that shows only the content in the event sidebar """ index = ViewPageTemplateFile("templates/sidebar-content-events.pt") _batch_size = 5 @property @memoize def category(self): """ Valid values are upcoming or older """ return self.request.form.get("event_category")
[docs] @memoize def items(self): """ Depending on the event_category request parameter return upcoming or older events """ category = self.category if category not in ("upcoming", "older"): return [] now = localized_now() if category == "upcoming": query = { "end": {"query": now, "range": "min"}, "sort_order": "ascending", "start": self.get_query_start(), "step": self._batch_size, } else: query = { "end": {"query": now, "range": "max"}, "sort_order": "descending", "start": self.get_query_start(), "step": self._batch_size, } return self.query_items(query)
[docs]class SidebarTodos(Sidebar): """ Customized tile that shows only the Todos """ index = ViewPageTemplateFile("templates/sidebar-todos.pt")
[docs]class SidebarTrash(SidebarDocuments): """ The trash sidebar """ show_trash_item = False
[docs] def logical_parent(self): """ Needed to render the back button to a normal sidebar """ logical_parent = { "url": self.context.absolute_url(), "title": self.context.Title(), } return logical_parent
[docs] @memoize def items(self): """ The trash items contained in this workspace """ query = { "object_provides": ITrashed.__identifier__, "path": "/".join(self.context.getPhysicalPath()), "start": self.get_query_start(), "step": self._batch_size, "trashed": True, "!UID": self.context.UID(), } sidebar_search = self.request.get("sidebar-search", None) if sidebar_search: query["SearchableText"] = sidebar_search response = self.query_items(query) results = self._extract_attrs(response) return map(self.result2item, results)
[docs] def batched_items(self): return self.items()
[docs]@implementer(IDiazoNoTemplate) class SidebarContentTrash(SidebarTrash): """ The trash sidebar items """ index = ViewPageTemplateFile("templates/sidebar-content-documents.pt")