Source code for ploneintranet.workspace.browser.tiles.sidebar
# coding=utf-8
from ...basecontent.utils import dexterity_update
from ...interfaces import IGroupingStorage
from ...utils import map_content_type
from ...utils import parent_workspace
from .events import format_event_date_for_title
from AccessControl import Unauthorized
from Acquisition import aq_base
from BTrees.OOBTree import OOBTree
from collections import OrderedDict
from collective.workspace.interfaces import IWorkspace
from copy import deepcopy
from DateTime import DateTime
from plone import api
from plone.app.contenttypes.interfaces import IEvent
from plone.app.event.base import localized_now
from plone.batching import Batch
from plone.behavior.interfaces import IBehaviorAssignable
from plone.i18n.normalizer import idnormalizer
from plone.protect.authenticator import createToken
from ploneintranet.core import ploneintranetCoreMessageFactory as _
from ploneintranet.layout.interfaces import IDiazoNoTemplate
from ploneintranet.layout.memoize.view import memoize
from ploneintranet.layout.memoize.view import memoize_contextless
from ploneintranet.search.interfaces import ISearchResponse
from ploneintranet.search.interfaces import ISiteSearch
from ploneintranet.todo.utils import update_task_status
from ploneintranet.workspace.basecontent.utils import get_selection_classes
from ploneintranet.workspace.browser.show_extra import set_show_extra_cookie
from ploneintranet.workspace.config import INTRANET_USERS_GROUP_ID
from ploneintranet.workspace.events import WorkspaceRosterChangedEvent
from ploneintranet.workspace.interfaces import ITrashed
from Products.CMFPlone.utils import safe_unicode
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
from Products.statusmessages.interfaces import IStatusMessage
from scorched.dates import solr_date
from slc.mailrouter.interfaces import IFriendlyNameStorage
from slc.mailrouter.utils import store_name
from urllib import quote
from urllib import urlencode
from zope.annotation import IAnnotations
from zope.component import getAdapter
from zope.component import getUtility
from zope.event import notify
from zope.interface import implementer
from zope.lifecycleevent import ObjectModifiedEvent
from zope.publisher.browser import BrowserView
from zope.schema import getFieldNames
from zope.schema.interfaces import IVocabularyFactory
from ZTUtils import make_query
import logging
log = logging.getLogger(__name__)
vocab = "ploneintranet.workspace.vocabularies.Divisions"
DATE_GROUPING_VALUES = OrderedDict(
[
("today", dict(title=_(u"Today"), description=_(u"Items modified today"))),
(
"week",
dict(title=_(u"Last Week"), description=_(u"Items modified last week")),
),
(
"month",
dict(title=_(u"Last Month"), description=_(u"Items modified last month")),
),
("ever", dict(title=_(u"All Time"), description=_(u"Older"))),
]
)
[docs]class BaseTile(BrowserView):
"""
Shared baseclass for the sidebar tiles.
Cave: many of the actual sidebar templates *also* bind to
ploneintranet.workspace.browser.workspace.WorkspaceView
to use methods defined there...
"""
index = None
form_submitted = False
general_settings_autoload = "trigger: autoload;"
_user_groupings_key = "ploneintranet.workspace.user_groupings"
_batch_size = 20
_grouping_options = [
{"title": _("Group by folder"), "value": "folder"},
{"title": _("Group by tag"), "value": "label"},
{"title": _("Group by document type"), "value": "type"},
{"title": _("Group by author"), "value": "author"},
{"title": _("Group by date"), "value": "date"},
{"title": _("Group by first letter"), "value": "first_letter"},
]
@property
@memoize
def is_trashed(self):
""" Check if the current document is in the trash can
"""
return ITrashed.providedBy(self.context)
@property
@memoize
def default_grouping_option(self):
""" Override this one: the default can be set in the site settings,
per context, per user or whatever
"""
@property
@memoize
def selected_grouping_option(self):
""" Override this one: the selected value can be set in
in the context, in the user or whatever
"""
@property
@memoize
def grouping_options(self):
""" Options to set the sidebar grouping
"""
options = deepcopy(self._grouping_options)
default = self.default_grouping_option
selected = self.selected_grouping_option or default
for option in options:
value = option["value"]
option["selected"] = "selected" if value == selected else None
option["default"] = value == default
return options
@property
@memoize
def root(self):
""" The root object for the sidebar, typically a workspace, but the
sidebar can also be used for apps, e.g. quaive.app.slides.
"""
return parent_workspace(self.context)
[docs] def page_number(self):
"""Get current page number from the request
"""
try:
page = int(self.request.form.get("page", 1))
except ValueError:
page = 1
return page
[docs] def get_query_start(self):
""" Fills the start parameter of the search query,
i.e. the first element of the batch
"""
return (self.page_number() - 1) * self._batch_size
[docs] def next_page_number(self,):
"""Get page number for next page of search results"""
page = self.page_number()
total_items = len(self.items())
if page * self._batch_size < total_items:
return page + 1
else:
return None
[docs] def next_page_url(self):
"""Get url for the next page of results"""
next_page_number = self.next_page_number()
if not next_page_number:
return
new_query = make_query(self.request.form.copy(), {"page": next_page_number})
return "{url}?{qs}".format(url=self.request.ACTUAL_URL, qs=new_query)
@property
@memoize
def site_default_grouping(self):
""" The site default grouping as specified in the registry
"""
return api.portal.get_registry_record(
"ploneintranet.workspace.default_grouping", default="folder"
)
[docs] @memoize
def get_default_grouping(self):
""" Get the default grouping for document navigation in the sidebar
"""
return self.root.default_grouping or self.site_default_grouping
@property
@memoize
def current_user(self):
""" Return the current authenticated user id
"""
return api.user.get_current()
@property
@memoize
def current_userid(self):
""" Return the current authenticated user id
"""
return self.current_user.getId()
def __call__(self):
return self.render()
[docs] def notify_success(self):
""" Notify success
"""
msg = _("Attributes changed.")
api.portal.show_message(msg, request=self.request, type="success")
[docs] def notify_errors(self, errors):
""" Notify errors upon save
"""
msg = _(
"sidebar_save_error_message",
default=("There was a problem updating the content: ${errors}."),
mapping={"errors": errors},
)
api.portal.show_message(msg, request=self.request, type="error")
[docs] def _basic_save(self):
""" Performs a simple save of entered attributes.
Not all sidebars need this
"""
form = self.request.form
if (
not form
or self.request.method != "POST"
or not self.workspace_view.can_manage_workspace()
):
return
modified, errors = dexterity_update(self.context)
if errors:
return self.notify_errors(errors)
if not modified:
return
self.notify_success()
self.workspace().reindexObject()
notify(ObjectModifiedEvent(self.context))
# If we modified the hero image we want to rerender the workspace view
if not form.get("hero_image", None):
return
return self.request.response.redirect(self.context.absolute_url())
[docs] def status_messages(self):
"""
Returns status messages if any
"""
messages = IStatusMessage(self.request)
m = messages.show()
for item in m:
item.id = idnormalizer.normalize(item.message)
return m
[docs] @memoize
def workspace(self):
""" Acquire the workspace of the current context
"""
return parent_workspace(self)
@property
@memoize
def workspace_view(self):
""" Acquire the workspace of the current context
"""
return api.content.get_view("view", self.workspace(), self.request)
[docs] def can_add(self):
"""
Is this user allowed to add content?
Cave. We don't use the plone.app.contenttypes per-type permissions.
Our workflows don't map those, only cmf.AddPortalContent.
"""
if getattr(aq_base(self.context), "disable_add_from_sidebar", False):
return False
return self.current_user.has_permission("Add portal content", self.context)
[docs] def can_edit(self):
"""
Is this user allowed to add content?
"""
return self.current_user.has_permission("Modify portal content", self.context)
[docs] def can_change_label(self):
"""
Is this user allowed to change a label, and can the label be edited?
"""
if self.can_edit():
return self.grouping() == "folder"
return False
[docs] def can_delete(self, obj=None):
""" Is this user allowed to delete an object?
"""
if obj is None:
obj = self.context
elif hasattr(obj, "getObject"):
obj = obj.getObject()
return self.current_user.has_permission("Delete objects", obj)
[docs] def can_subscribe(self):
""" Check if we see the subscribe feature in the bulk actions
"""
return api.portal.get_registry_record(
"ploneintranet.workspace.allow_bulk_subscribe", default=False
)
[docs] def date_as_datetime(self, date):
"""
The content items in the sidebar have a `DateTime` as their
modification date. But search results have a `datetime`.
"""
if isinstance(date, DateTime):
date = date.asdatetime()
return date
[docs]class SidebarSettingsGeneral(BaseTile):
index = ViewPageTemplateFile("templates/sidebar-settings-general.pt")
def __call__(self):
""" write attributes, if any, set state, render
"""
self._basic_save()
return self.render()
[docs]class SidebarSettingsMembers(BaseTile):
"""
A view to serve as the member roster in the sidebar
"""
index = ViewPageTemplateFile("templates/sidebar-settings-members.pt")
[docs] def describe_permissions(self):
"""
"""
policy = self.context.participant_policy
if self.context.accessible_to_intranet_users:
if policy == "consumers":
return _(
"<strong>All intranet users</strong> can read content. "
"They cannot add, publish or edit content."
)
if policy == "producers":
return _(
"<strong>All intranet users</strong> can read and add "
"content. They can neither publish nor edit content."
)
if policy == "publishers":
return _(
"<strong>All intranet users</strong> can read, add and "
"publish content. They cannot edit other members content."
)
if policy == "moderators":
return _(
"<strong>All intranet users</strong> can do everything. "
"Read, add, publish and edit content."
)
if policy == "consumers":
return _(
"<strong>The users listed below</strong> can read content. "
"They cannot add, publish or edit content."
)
if policy == "producers":
return _(
"<strong>The users listed below</strong> can read and add "
"content. They can neither publish nor edit content."
)
if policy == "publishers":
return _(
"<strong>The users listed below</strong> can read, add and "
"publish content. They cannot edit other members content."
)
if policy == "moderators":
return _(
"<strong>The users listed below</strong> can do everything. "
"Read, add, publish and edit content."
)
log.warning("Unknown participation policy")
[docs] def display_principal(self, principal):
"""Check if we should display this principal
We do not want to display the "All intranet user group"
if he has no particular roles
"""
try:
groupid = principal.getGroupId()
except Exception:
return True
if groupid != INTRANET_USERS_GROUP_ID:
return True
# Display the group if it has some roles
return bool(self.workspace_view.get_principal_roles(principal))
[docs] def execute_batch_function(self):
def _check_can_manage(self):
if not self.workspace_view.can_manage_roster():
msg = _(
u"You do not have permission to change the workspace "
u"membership roster."
)
raise Unauthorized(msg)
def _check_can_add(self):
if not self.workspace_view.can_add_users():
msg = _(
u"You do not have permission to add users to the " u"workspace."
)
raise Unauthorized(msg)
form = self.request.form
user_ids = form.get("user_id")
if not user_ids:
return
if isinstance(user_ids, basestring):
user_ids = user_ids.split(",")
ws = self.workspace()
batch_function = form.get("batch-function")
if batch_function == "add":
_check_can_add(self)
for user_id in user_ids:
IWorkspace(ws).add_to_team(user_id)
msg = _(u"Member(s) added")
msg_type = "success"
elif batch_function == "remove":
_check_can_manage(self)
for user_id in user_ids:
IWorkspace(ws).remove_from_team(user_id)
msg = _(u"Member(s) removed")
msg_type = "success"
elif batch_function == "role":
_check_can_manage(self)
role = self.request.get("role")
groups = role and {role} or None
for user_id in user_ids:
IWorkspace(ws).add_to_team(user_id, groups=groups)
msg = _(u"Role updated")
msg_type = "success"
else:
msg = _(u"Unknown function")
msg_type = "error"
api.portal.show_message(msg, self.request, msg_type)
notify(WorkspaceRosterChangedEvent(self.context))
def __call__(self):
if self.request.method == "POST":
try:
self.execute_batch_function()
except Unauthorized as err:
api.portal.show_message(err, self.request, "error")
return self.render()
[docs]class SidebarSettingsAdvanced(BaseTile):
"""
A view to serve as the advanced config in the sidebar
"""
index = ViewPageTemplateFile("templates/sidebar-settings-advanced.pt")
[docs] def can_delete_workspace(self):
ws = parent_workspace(self.context)
return self.current_user.has_permission("Delete objects", ws)
[docs] def can_be_division(self):
""" Check if this object can be a division,
i.e. has a division field in the schema or in a behavior
"""
schema = self.context.getTypeInfo().lookupSchema()
if "is_division" in getFieldNames(schema):
return True
behavior_assignable = IBehaviorAssignable(self.context)
if behavior_assignable:
behaviors = behavior_assignable.enumerateBehaviors()
for behavior in behaviors:
if "is_division" in getFieldNames(schema):
return True
return False
[docs] def update_relation_targets(self, old_relations):
""" Get old relations and new relations and update the targets
to point to self.context.
"""
context_uid = self.context.UID()
old_relations = set(old_relations or [])
new_relations = set(self.context.related_workspaces or [])
to_add = list(new_relations - old_relations)
if to_add:
targets = (brain.getObject() for brain in api.content.find(UID=to_add))
else:
targets = ()
for target in targets:
if not target.related_workspaces:
target.related_workspaces = [context_uid]
elif context_uid not in target.related_workspaces:
target.related_workspaces.append(context_uid)
to_remove = list(old_relations - new_relations)
if to_remove:
targets = (brain.getObject() for brain in api.content.find(UID=to_remove))
else:
targets = ()
for target in targets:
if target.related_workspaces and context_uid in target.related_workspaces:
target.related_workspaces.remove(context_uid)
@property
@memoize
def default_grouping_option(self):
""" In the settings the default is defined in the registry
"""
return self.site_default_grouping
@property
@memoize
def selected_grouping_option(self):
""" In the settings the selected grouping is stored in the root
"""
return self.root.default_grouping
[docs] def divisions(self):
""" return available divisions """
divisions = getUtility(IVocabularyFactory, vocab)(self.context)
return divisions
[docs] def get_selection_classes(self, field, default=None):
""" identify all groups in the invitees """
return get_selection_classes(self.context, field, default)
def __call__(self):
""" write attributes, if any, set state, render
"""
form = self.request.form
if self.request.method == "POST" and form:
if self.workspace_view.can_manage_workspace():
if "email" in form:
old_email = self.context.email
if form["email"]:
if "@" in form["email"]:
# Only use the name part as the domain is fixed.
form["email"] = form["email"].split("@")[0]
if "related_workspaces" in form and form["related_workspaces"]:
# We defined this as a list of TextLine values.
# Therefore, the value from the form must be passed
# as a string with one value per line.
value = form["related_workspaces"]
# First, if this is a list, flatten to a comma-separated
# string
value = ",".join([x for x in value if x])
# Now, replace all commas with a new-line character
value = value.replace(",", "\n")
form["related_workspaces"] = value
old_related_workspaces = self.context.related_workspaces
else:
old_related_workspaces = ""
modified, errors = dexterity_update(self.context)
self.update_relation_targets(old_related_workspaces)
if "email" in form:
if form["email"]:
errors += store_name(self.context, form["email"]).values()
elif old_email:
storage = getUtility(IFriendlyNameStorage)
storage.remove(storage.get(old_email))
if modified and not errors:
self.notify_success()
notify(ObjectModifiedEvent(self.context))
if errors:
self.notify_errors(errors)
return self.render()
[docs]class Sidebar(BaseTile):
"""
A view to serve as a sidebar navigation for workspaces
"""
index = ViewPageTemplateFile("templates/sidebar.pt")
drop_files_label = _(
u"drop_files_here", default=u"Drop files here or click to browse..."
)
section = "documents"
def __call__(self):
"""
Write attributes, if any, set state, render
"""
form = self.request.form
if self.request.method == "POST" and form and "skip_sidebar_submit" not in form:
ws = self.workspace()
self.set_grouping_choice()
# wft = api.portal.get_tool("portal_workflow")
section = self.request.form.get("section", self.section)
set_show_extra_cookie(self.request, section)
do_reindex = False
# Do the workflow transitions based on what tasks the user checked
# or unchecked
if section == "task":
update_task_status(self)
# Do the property editing. Edits only if there is something to edit
# in form
if self.workspace_view.can_manage_workspace() and form:
modified, errors = dexterity_update(self.context)
if modified and not errors:
self.notify_success()
do_reindex = True
notify(ObjectModifiedEvent(self.context))
if errors:
return self.notify_errors(errors)
if do_reindex:
ws.reindexObject()
return self.render()
@property
@memoize
def get_principal_title(self):
""" Return the homonymous method from the workspace view
"""
view = api.content.get_view("view", self.root, self.request)
return view.get_principal_title
[docs] def logical_parent(self):
"""
Needed for the back button in the sidebar.
Depending on the selected grouping, this returns the information
needed to get one step back.
"""
if self.is_trashed:
return dict(
title=_("Trash"), url=self.root.absolute_url() + "/@@sidebar.trash"
)
grouping = self.grouping()
if grouping == "folder":
if self.context == self.root:
return
parent = self.request.PARENTS[1]
return dict(
title=parent.Title(),
url=parent.absolute_url(),
grp_title=self.context.Title(),
grp_description=self.context.Description(),
)
groupname = self.request.get("groupname")
if not groupname:
return
grp_title = safe_unicode(groupname)
grp_description = ""
if grouping == "date":
title = _(u"All Dates")
grp_entry = DATE_GROUPING_VALUES.get(groupname, {})
grp_title = grp_entry.get("title", grp_title)
grp_description = grp_entry.get("description", grp_description)
elif grouping.startswith("label"):
title = _(u"All Tags")
if groupname == "Untagged":
grp_title = _(u"Untagged")
grp_description = _(u"All items without tags")
elif grouping == "author":
title = _(u"All Authors")
grp_title = self.get_principal_title(groupname)
elif grouping == "type":
title = _(u"All Types")
if groupname == "other":
grp_title = _("grouping_label_other", default=u"Other")
grp_description = _(u"Any other document types")
else:
content_type = map_content_type(groupname)
grp_title = (content_type and content_type or grp_title).capitalize()
elif grouping == "first_letter":
title = _(u"All Letters")
grp_title = grp_title.capitalize()
else:
title = _(u"Back")
grp_title = self.context.Title()
return dict(
title=title,
url=self.root.absolute_url(),
grp_title=grp_title,
grp_description=grp_description,
)
[docs] def get_item_portal_type(self, item):
""" Try to get the item portal_type for every possible thing
that arrives here (a dict, a brain, a search result, ...)
"""
return getattr(item, "portal_type", None) or item.get("portal_type")
[docs] @memoize
def is_folderish(self, item):
""" Check if this item is folderish
"""
portal_type = self.get_item_portal_type(item)
return portal_type in self.folderish_types
[docs] def get_item_dpi(self, item):
""" Item data-pat-inject options
"""
if self.is_folderish(item):
# What exactly do we need to inject, and where?
dpi = "source: #workspace-documents; " "target: #workspace-documents "
else:
dpi = (
"target: #document-body; " "source: #document-body; " "history: record"
)
return dpi
[docs] def get_item_dps(self, item):
""" Item data-pat-switch options
"""
if self.is_folderish(item):
return
return (
"selector: #application-body; remove: focus-*; add: focus-document && " # noqa
"selector: #application-body; remove: sidebar-large; add: sidebar-normal" # noqa
)
@property
@memoize
def view_types(self):
""" Item url
"""
return api.portal.get_registry_record("plone.types_use_view_action_in_listings")
[docs] def get_item_url(self, item):
""" Item url
"""
url = item.getURL()
if self.is_folderish(item):
return url + "/@@sidebar.documents"
if url.rpartition("/")[-1] in ("view", "@@view"):
return url
if item["portal_type"] in self.view_types:
return url + "/view"
return url
[docs] def _extract_attrs(self, catalog_results):
"""
The items to show in the sidebar may come from the current folder or
a grouping storage for quick access. If they come from the current
folder, the brains get converted to the same data structure as used
in the grouping storage to allow unified handling. This extracts the
attributes from brains and returns dicts
"""
results = []
for r in catalog_results:
content_type = map_content_type(r.mimetype, r.portal_type)
structural_type = self.is_folderish(r) and "group" or "document"
results.append(
dict(
title=r["Title"],
description=r["Description"],
id=r["getId"],
structural_type=structural_type,
content_type=content_type,
dpi=self.get_item_dpi(r),
dps=self.get_item_dps(r),
url=self.get_item_url(r),
creator=r["Creator"],
modified=r["modified"],
subject=r["Subject"],
UID=r["UID"],
path=r.getPath(),
is_archived=r.get("is_archived", False),
context=r,
)
)
return results
[docs] def item2ctype(self, item):
""" We have an item coming from of one of those two methods:
- self._extract_attrs
- self.get_headers_for_group
We try to return its ctype (content type)
"""
ctype = item.get("content_type", "code")
if ctype:
return "type-{0}".format(ctype)
# we have two fallbacks:
# - one for folderish objects
# - one for everything else
if item.get("structural_type") == "group":
return "type-folder"
return "document"
[docs] def update_with_show_extra(self, query):
""" Update the query with the values from show_extra cookie
"""
show_extra = self.show_extra
if "my_documents" in show_extra:
query["Creator"] = self.current_userid
if "archived_documents" not in show_extra:
query["is_archived"] = False
if "deleted_documents" not in show_extra:
query["trashed"] = False
@property
@memoize_contextless
def folderish_types(self):
""" The portal_types considered as foldersish for the sidebar
The email type, e.g., is folderish but is threated by the UI
as a non folderish one.
"""
portal_types = api.portal.get_registry_record(
"ploneintranet.workspace.sidebar.folderish_types", default=("Folder",)
)
return portal_types
@property
@memoize_contextless
def blacklisted_item_types(self):
""" The portal_types that will not be included in the sidebar
"""
# We do not want the workspaces
portal_types = set(
api.portal.get_registry_record(
"ploneintranet.workspace.workspace_type_filters", default={}
).keys()
)
# Just in case the registry is misconfigured, add the root portal_type
portal_types.add(self.root.portal_type)
# Some other types have their own sidebar
portal_types.update(
api.portal.get_registry_record(
"ploneintranet.workspace.sidebar.blacklisted_types",
default=("Event", "todo"),
)
)
if self.grouping() != u"folder":
# When not grouping by folder we do want only the documents
portal_types.update(self.folderish_types)
return list(portal_types)
@property
@memoize_contextless
def portal_types_filter(self):
""" Some contents are threated differently, e.g. todo and events
We want to include folderish types only if we are grouping by folder
"""
all_types = set(api.portal.get_tool("portal_types"))
all_types.difference_update(self.blacklisted_item_types)
return list(all_types)
[docs] def get_item_class(self, item):
""" The classes that we should apply to the item
item can be a dictionary or a search result
"""
# When present context should be a search result
# we should miss the content when dealing with headers
context = item.get("context")
has_description = item.get("description")
if context:
is_folderish = self.is_folderish(context)
else:
is_folderish = True
parts = [
"item",
"group" if is_folderish else "document",
self.item2ctype(item),
"has-description" if has_description else "has-no-description",
]
# item can be a dictionary or a
context = getattr(item, "context", item.get("context", {}))
if getattr(context, "trashed", context.get("trashed", False)):
parts.append("trashed")
return " ".join(parts)
[docs] def result2item(self, result):
""" Make a result out of an item, item should be a dict
"""
# BBB: For the time being item is a dictionary that contains everything
# this should change
item = result
# Do checks to set the right classes for icons and candy
item["cls"] = self.get_item_class(result)
item["mime-type"] = ""
# Work around a solr-quirk: We might end up with "/view" being
# appended twice to the URL. See #1056
if item["url"].endswith("/view/view"):
item["url"] = item["url"][:-5]
# default to sidebar injection
if "dpi" in item:
return item
if item.get("structural_type", "item") == "group":
item["dpi"] = (
"source: #workspace-documents; "
"target: #workspace-documents; "
"url: %s" % item["url"]
)
else:
item["dpi"] = (
"source: #document-documents; "
"target: #document-documents; "
"history: record;"
"url: %s" % item["url"]
)
return item
[docs] def _maybe_resort(self, results):
""" Make sure we first show the group-elements (= folders or similar)
before any content items
Note: since False==0, it gets sorted before True!
XXX: This has to go away if we want to implement proper batching
"""
if self.grouping() in ["date", "label_custom"]:
return results
return sorted(
results,
key=lambda x: (
x["structural_type"] != "group",
safe_unicode(x["title"]).lower(),
),
)
[docs] @memoize
def items(self):
"""
This is called in the template and returns a list of dicts of items in
the current context.
It returns the items based on the selected grouping (and later may
take sorting, archiving etc into account)
"""
sidebar_search = self.request.get("sidebar-search", None)
if sidebar_search:
# When searching, grouping is disregarded
if isinstance(sidebar_search, basestring):
sidebar_search += "*"
query = {
"portal_type": self.portal_types_filter,
"path": "/".join(self.context.getPhysicalPath()),
"SearchableText": sidebar_search,
}
self.update_with_show_extra(query)
response = self.query_items(query)
results = self._extract_attrs(response)
elif self.grouping_value():
# User has clicked on a specific group header
results = self._extract_attrs(self.get_group_children())
else:
# User has selected a grouping and now gets the headers for that
results = self.get_headers_for_group()
# XXX resorting will soon go away to implement batching
results = self._maybe_resort(results)
return map(self.result2item, results)
[docs] def group_url(self, groupname):
""" Return the url for this group
"""
return (
"{context_url}/@@sidebar.documents?"
"groupname={groupname}#workspace-documents"
).format(context_url=self.root.absolute_url(), groupname=quote(groupname))
[docs] def get_headers_for_group(self):
"""
Return the entries according to a particular grouping
(e.g. label, author, type, first_letter)
"""
root = self.root
# if the user may not view the workspace, don't bother with
# getting groups
user = self.current_user
if not root or not user.has_permission("View", root):
return []
grouping = self.grouping()
if grouping == "folder":
# Group by Folder - use list contents
query = {
"sort_on": "sortable_title",
"path": {"query": "/".join(self.context.getPhysicalPath()), "depth": 1},
"portal_type": self.portal_types_filter,
}
return self._extract_attrs(self.query_items(query))
if grouping == "date":
# Group by Date, this is a manual list
grp_list = []
for id, entry in DATE_GROUPING_VALUES.items():
data = entry.copy()
data.update(
{
"id": id,
"structural_type": "group",
"content_type": "date",
"url": self.group_url(id),
}
)
grp_list.append(data)
return grp_list
# All other groupings come from the grouping storage,
# so retrieve that now.
storage = getAdapter(root, IGroupingStorage)
# In the grouping storage, all entries are accessible under their
# respective grouping headers. Fetch them for the selected grouping.
if grouping.startswith("label"):
include_archived = self.archived_tags_shown()
else:
include_archived = self.archived_documents_shown()
if grouping.startswith("label"):
# Show all labels stored in the grouping storage
headers = storage.get_order_for(
"label", include_archived=include_archived, alphabetical=False
)
for header in headers:
header["url"] = self.group_url(header["id"])
header["content_type"] = "tag"
# In case of grouping by label, we also must show all
# unlabeled entries
headers.append(
dict(
title=_("Untagged"),
description=_("All items without tags"),
id="untagged",
url=self.group_url("Untagged"),
content_type="tag",
archived=False,
)
)
elif grouping == "author":
headers = storage.get_order_for(
grouping, include_archived=include_archived, alphabetical=True
)
# Show all authors stored in the grouping storage
# XXX May come soon in UI
# # If we are grouping by 'author', but the filter is for documents
# # only by the current user, then we return only the current user
# # as a grouping.
# if 'my_documents' in self.show_extra:
# username = user.getId()
# headers = [dict(title=username,
# description=user.getProperty('fullname'),
# content_type='user',
# url=group_url_tmpl % username,
# id=username)]
for header in headers:
userid = header["id"]
header["title"] = self.get_principal_title(userid)
header["url"] = self.group_url(header["id"])
header["content_type"] = "user"
elif grouping == "type":
# Show all types stored in the grouping storage
headers = storage.get_order_for(
grouping, include_archived=include_archived, alphabetical=True
)
for header in headers:
content_type = map_content_type(header["id"])
header["title"] = (
content_type and content_type or header["title"]
).capitalize()
header["url"] = self.group_url(header["id"])
header["content_type"] = content_type
# Document types using mimetypes
headers.append(
dict(
title=_("grouping_label_other", default=u"Other"),
description=_(u"Any other document types"),
id="other",
url=self.group_url("other"),
)
)
elif grouping == "first_letter":
# Show all items by first letter stored in the grouping storage
headers = storage.get_order_for(
grouping, include_archived=include_archived, alphabetical=True
)
for header in headers:
header["title"] = header["title"].upper()
header["url"] = self.group_url(header["id"])
header["content_type"] = "none"
else:
# The exception case. Should not happen.
headers = [
dict(
title="Ungrouped",
description="Other",
url=self.group_url(""),
content_type="none",
id="none",
)
]
# All headers here are to be displayed as group
for header in headers:
header["structural_type"] = "group"
return headers
[docs] def get_groupings(self):
""" Return the whole grouping storage
"""
gs = IGroupingStorage(self.root)
groupings = gs.get_groupings()
return groupings
[docs] def get_grouping_by_name(self, name=None):
""" Get the grouping from the grouping storage given a name
If no name is given default to the one returned by self.get_groupings()
"""
if name is None:
name = self.groupings()
groupings = self.get_groupings()
grouping = groupings.get(name, {})
return grouping
[docs] def uids_in_grouping(self, name, value):
""" Get the values in grouping
name is something like: 'label', 'author', ...
value is something like 'tag1', 'john.doe', ...
"""
grouping = self.get_grouping_by_name(name)
return list(grouping.get(value, []))
[docs] def get_base_query(self):
""" This is the base query for searching inside a workspace
"""
sorting = self.sorting()
sort_order = sorting == "modified" and "descending" or "ascending"
query = {
"path": "/".join(self.root.getPhysicalPath()),
"sort_on": sorting,
"sort_order": sort_order,
"portal_type": self.portal_types_filter,
}
self.update_with_show_extra(query)
return query
[docs] def _make_solr_query(self, query):
""" Transform a catalog query into a solr query
XXX: This is a compatibility method that has to die.
Use it at your own risk
"""
sort = query.pop("sort_on", None)
phrase = query.pop("SearchableText", None)
if sort:
if query.pop("sort_order", None) == "descending":
sort = "-{}".format(sort)
if isinstance(query.get("path"), dict):
path = query.pop("path")["query"]
path_depth = len(path.split("/")) + 1
query["path"] = path
query["path_depth"] = path_depth
if "trashed" in query:
# XXX: This is a nasty trick to avoid the need for a Solr reindex
# of the `trashed` field
query["!trashed"] = not query.pop("trashed")
if "is_archived" in query:
# XXX: This is a nasty trick to avoid the need for a Solr reindex
# of the `is_archived` field
query["!is_archived"] = not query.pop("is_archived")
for field in ("end", "modified"):
if isinstance(query.get(field), dict):
value = query.pop(field)
qrange = value["range"]
value = value["query"]
if qrange == "max":
value = solr_date(self.date_as_datetime(value))
query["%s__lt" % field] = value
elif qrange == "min":
value = solr_date(self.date_as_datetime(value))
query["%s__gt" % field] = value
elif qrange == "min:max":
value = (
solr_date(self.date_as_datetime(value[0])),
solr_date(self.date_as_datetime(value[1])),
)
query["%s__range" % field] = value
start = query.pop("start", 0)
step = query.pop("step", 99999)
squery = {
"phrase": phrase,
"filters": query,
"start": start,
"sort": sort,
"step": step,
}
return squery
[docs] def query_items(self, additional_query={}):
""" Return all the documents in the group for the given query
"""
query = self.get_base_query()
query.update(additional_query)
# XXX this will go away when in the test suite we will fully
# support Solr in all the tests
backup_query = deepcopy(query)
try:
search_util = getUtility(ISiteSearch)
response = search_util.query(**self._make_solr_query(query))
except BaseException:
log.warning("Fallback to the catalog, cannot query: %r", backup_query)
catalog = api.portal.get_tool(name="portal_catalog")
brains = catalog(**backup_query)
response = list(ISearchResponse(Batch(brains, len(brains), 0)))
return response
[docs] def get_group_children_by_label(self):
""" Get the children from the given label
"""
grouping_value = self.grouping_value()
if grouping_value != "Untagged":
uids = self.uids_in_grouping("label", grouping_value)
if not uids:
return []
return self.query_items({"UID": uids})
return filter(lambda item: not item.Subject(), self.query_items())
[docs] def get_group_children_by_date(self):
# Show the results grouped by today, the last week,
# the last month,
# all time. For every grouping, we exclude the previous one. So,
# last week won't again include today and all time would exclude
# the last month.
today_start = DateTime(DateTime().Date())
today_end = today_start + 1
week_start = today_start - 6
# FIXME: Last month should probably be the last literal month,
# not the last 30 days.
month_start = today_start - 30
grouping_value = self.grouping_value()
if grouping_value == "today":
modified = {"range": "min:max", "query": (today_start, today_end)}
elif grouping_value == "week":
modified = {"range": "min:max", "query": (week_start, today_start)}
elif grouping_value == "month":
modified = {"range": "min:max", "query": (month_start, week_start)}
elif grouping_value == "ever":
modified = {"range": "max", "query": month_start}
else:
modified = ""
return self.query_items({"modified": modified})
[docs] def get_group_children_by_type(self):
grouping_value = self.grouping_value()
if grouping_value != "other":
uids = self.uids_in_grouping(self.grouping(), grouping_value)
if not uids:
return []
return self.query_items({"UID": uids})
results = self.query_items()
return [
result
for result in results
if not map_content_type(result.mimetype, result.portal_type)
]
[docs] def get_group_children(self):
""" Return all the children for the selected grouping
and grouping value
"""
grouping = self.grouping()
if grouping.startswith("label"):
return self.get_group_children_by_label()
if grouping == "date":
return self.get_group_children_by_date()
if grouping == "type":
return self.get_group_children_by_type()
# When no fancy things are needed, this should be enough
grouping_value = self.grouping_value()
uids = self.uids_in_grouping(grouping, grouping_value)
if not uids:
return []
return self.query_items({"UID": uids})
[docs] def set_grouping_choice(self):
""" Check if the user asked for a new grouping and persist his choice
in the workspace annotations
"""
if "grouping.submitted" not in self.request.form:
return
grouping = self.request.form.pop("grouping", "")
annotations = IAnnotations(self.root)
key = self._user_groupings_key
if key not in annotations:
annotations[key] = OOBTree()
if grouping:
annotations[key][self.current_userid] = grouping
else:
annotations[key].pop(self.current_userid, None)
[docs] def get_from_request_or_cookie(self, key, cookie_name, default):
"""
Helper method to return a value from either request or fallback
to cookie
"""
if key in self.request:
return self.request.get(key)
if cookie_name in self.request:
return self.request.get(cookie_name)
return default
[docs] @memoize
def grouping(self):
""" First look in the request, then in the ws annotations, then return
the default (for the ws or the site)
"""
if "grouping" in self.request.form:
return self.request.form["grouping"]
annotations = IAnnotations(self.root)
storage = annotations.get(self._user_groupings_key, {})
user_choice = storage.get(self.current_userid, "")
return user_choice or self.get_default_grouping()
[docs] @memoize
def grouping_value(self):
"""
Return the user selected grouping value
"""
return self.request.get("groupname")
[docs] def sorting(self):
"""
Return the user selected sorting
"""
cookie_name = "%s-sort-on-%s" % (self.section, self.current_userid)
return self.get_from_request_or_cookie("sorting", cookie_name, "modified")
@property
@memoize
def show_extra(self):
cookie_name = "%s-show-extra-%s" % (self.section, self.current_userid)
return self.request.get(cookie_name, "").split("|")
[docs] def archived_documents_shown(self):
"""
Tell if we should show archived documents or not
"""
return "archived_documents" in self.show_extra
[docs] def archived_tags_shown(self):
"""
Tell if we should show archived tags or not
"""
return "archived_tags" in self.show_extra
[docs]class SidebarDocuments(Sidebar):
""" Customized tile that shows only the Documents
"""
index = ViewPageTemplateFile("templates/sidebar-documents.pt")
can_slides = True
@property
@memoize
def default_grouping_option(self):
""" In the settings the default is defined in the registry
"""
return self.root.default_grouping or self.site_default_grouping
@property
@memoize
def selected_grouping_option(self):
""" In the document sidebar this is probably in the request
or in the user preferences
"""
return self.grouping()
@property
@memoize
def show_trash_item(self):
""" The trash will be shown only when browsing by folder
and at the workspace root
"""
if self.grouping() != "folder":
return False
if self.context != self.root:
return False
return True
@property
@memoize
def current_label(self):
""" Return the label (aka tag or subject) that we are exploring
"""
if not self.grouping().startswith("label"):
return
groupname = self.request.get("groupname")
if not groupname:
return
if groupname == "Untagged":
return
return groupname
@property
@memoize
def autotag_bulk_uploaded(self):
""" Check if we should activate the autotag feature
on the bulk uploaded files
"""
return self.current_label
@property
@memoize
def bulk_upload_url(self):
params = {"_authenticator": createToken()}
if self.autotag_bulk_uploaded:
params["groupname"] = self.current_label
return "{url}/workspaceFileUpload?{qs}".format(
url=self.context.absolute_url(), qs=urlencode(params)
)
[docs]@implementer(IDiazoNoTemplate)
class SidebarContentDocuments(SidebarDocuments):
""" The document sidedar contents
"""
index = ViewPageTemplateFile("templates/sidebar-content-documents.pt")
[docs] @memoize
def batched_items(self):
""" XXX: This is not really smart but it is needed because we cannot
always rely on solr for batching things
"""
items = self.items()
start = self.get_query_start()
stop = start + self._batch_size
return items[start:stop]
[docs]class SidebarPaste(SidebarDocuments):
""" The document sidedar contents
"""
[docs] def do_paste(self):
""" Reuse the cart-paste homonymous method
"""
view = api.content.get_view("cart-paste", self.context, self.request)
return view.do_paste()
def __call__(self):
self.do_paste()
return self.request.response.redirect(
self.context.absolute_url() + "/@@sidebar.documents"
)
[docs]class SidebarEvents(Sidebar):
""" Customized tile that shows only the Events
"""
index = ViewPageTemplateFile("templates/sidebar-events.pt")
[docs] def get_base_query(self):
""" This is the base query for searching inside a workspace
"""
query = {
"path": "/".join(self.root.getPhysicalPath()),
"sort_on": "start",
"object_provides": IEvent.__identifier__,
}
return query
[docs] def events(self):
""" Return the events in this workspace
to be shown in the events section of the sidebar
"""
now = localized_now()
# Current and future events
upcoming_events = self.query_items(
{
"end": {"query": now, "range": "min"},
"sort_order": "ascending",
"step": 5,
}
)
# Events which have finished
older_events = self.query_items(
{"end": {"query": now, "range": "max"}, "sort_order": "descending"}
)
return {"upcoming": upcoming_events, "older": older_events}
[docs]@implementer(IDiazoNoTemplate)
class SidebarContentEvents(SidebarEvents):
""" Customized tile that shows only the content in the event sidebar
"""
index = ViewPageTemplateFile("templates/sidebar-content-events.pt")
_batch_size = 5
@property
@memoize
def category(self):
""" Valid values are upcoming or older
"""
return self.request.form.get("event_category")
[docs] @memoize
def items(self):
""" Depending on the event_category request parameter return
upcoming or older events
"""
category = self.category
if category not in ("upcoming", "older"):
return []
now = localized_now()
if category == "upcoming":
query = {
"end": {"query": now, "range": "min"},
"sort_order": "ascending",
"start": self.get_query_start(),
"step": self._batch_size,
}
else:
query = {
"end": {"query": now, "range": "max"},
"sort_order": "descending",
"start": self.get_query_start(),
"step": self._batch_size,
}
return self.query_items(query)
[docs]class SidebarTodos(Sidebar):
""" Customized tile that shows only the Todos
"""
index = ViewPageTemplateFile("templates/sidebar-todos.pt")
[docs]class SidebarTrash(SidebarDocuments):
""" The trash sidebar
"""
show_trash_item = False
[docs] def logical_parent(self):
""" Needed to render the back button to a normal sidebar
"""
logical_parent = {
"url": self.context.absolute_url(),
"title": self.context.Title(),
}
return logical_parent
[docs] @memoize
def items(self):
""" The trash items contained in this workspace
"""
query = {
"object_provides": ITrashed.__identifier__,
"path": "/".join(self.context.getPhysicalPath()),
"start": self.get_query_start(),
"step": self._batch_size,
"trashed": True,
"!UID": self.context.UID(),
}
sidebar_search = self.request.get("sidebar-search", None)
if sidebar_search:
query["SearchableText"] = sidebar_search
response = self.query_items(query)
results = self._extract_attrs(response)
return map(self.result2item, results)
[docs]@implementer(IDiazoNoTemplate)
class SidebarContentTrash(SidebarTrash):
""" The trash sidebar items
"""
index = ViewPageTemplateFile("templates/sidebar-content-documents.pt")