import unittest
from zope.interface import implements
from ploneintranet.microblog.interfaces import IStatusContainer
from ploneintranet.microblog.interfaces import IStatusUpdate
from ploneintranet.microblog import statuscontainer
from ploneintranet.microblog import statusupdate
[docs]class StatusContainer(statuscontainer.BaseStatusContainer):
"""Override actual implementation with unittest features"""
implements(IStatusContainer)
[docs] def _check_add_permission(self, statusupdate):
pass
[docs] def _blacklist_microblogcontext_uuids(self):
return []
[docs]class StatusUpdate(statusupdate.StatusUpdate):
"""Override actual implementation with unittest features"""
implements(IStatusUpdate)
def __init__(self, text, userid, creator=None, thread_id=None, tags=None):
statusupdate.StatusUpdate.__init__(self, text, tags=tags)
self.userid = userid
self.thread_id = thread_id
if creator:
self.creator = creator
else:
self.creator = userid
[docs] def _init_userid(self):
pass
[docs] def _init_creator(self):
pass
[docs]class TestStatusContainer_Threaded(unittest.TestCase):
"""This is about 'conversation threading' not about 'process threading'."""
[docs] def test_keys_self(self):
container = StatusContainer()
# add normal status update
status = StatusUpdate("test", "arnold")
container.add(status)
(key, value) = list(container.items())[0]
self.assertEqual(key, value.id)
self.assertEqual(status, value)
keys = [x for x in container.thread_keys(thread_id=status.id)]
self.assertEqual([status.id], keys)
[docs] def test_keys(self):
container = StatusContainer()
# add normal status update
status = StatusUpdate("test", "arnold")
container.add(status)
(key, value) = list(container.items())[0]
self.assertEqual(key, value.id)
self.assertEqual(status, value)
# add reply to status.id
sa = StatusUpdate("test reply a", "arnold", thread_id=status.id)
container.add(sa)
sb = StatusUpdate("test reply b", "bernard", thread_id=status.id)
container.add(sb)
sc = StatusUpdate("test reply c", "cary", thread_id=status.id)
container.add(sc)
# get all thread items from parent status.id
keys = [x for x in container.thread_keys(thread_id=status.id)]
# comments are reversed by default (like status updates)
self.assertEqual([sc.id, sb.id, sa.id, status.id], keys)
[docs] def test_values(self):
container = StatusContainer()
# add normal status update
status = StatusUpdate("test", "arnold")
container.add(status)
(key, value) = list(container.items())[0]
self.assertEqual(key, value.id)
self.assertEqual(status, value)
# add reply to status.id
sa = StatusUpdate("test reply a", "arnold", thread_id=status.id)
container.add(sa)
sb = StatusUpdate("test reply b", "bernard", thread_id=status.id)
container.add(sb)
sc = StatusUpdate("test reply c", "cary", thread_id=status.id)
container.add(sc)
# get all thread items from parent status.id
values = [x for x in container.thread_values(thread_id=status.id)]
# comments are reversed by default (like status updates)
self.assertEqual([sc, sb, sa, status], values)
[docs] def test_items(self):
container = StatusContainer()
# add normal status update
status = StatusUpdate("test", "arnold")
container.add(status)
(key, value) = list(container.items())[0]
self.assertEqual(key, value.id)
self.assertEqual(status, value)
# add reply to status.id
sa = StatusUpdate("test reply a", "arnold", thread_id=status.id)
container.add(sa)
sb = StatusUpdate("test reply b", "bernard", thread_id=status.id)
container.add(sb)
sc = StatusUpdate("test reply c", "cary", thread_id=status.id)
container.add(sc)
# get all thread items from parent status.id
values = [x[1] for x in container.thread_items(thread_id=status.id)]
self.assertEqual([sc, sb, sa, status], values)
[docs] def test_get_thread_by_thread_item(self):
container = StatusContainer()
# add normal status update
status = StatusUpdate("test", "arnold")
container.add(status)
(key, value) = list(container.items())[0]
self.assertEqual(key, value.id)
self.assertEqual(status, value)
# add reply to status.id
sa = StatusUpdate("test reply a", "arnold", thread_id=status.id)
container.add(sa)
sb = StatusUpdate("test reply b", "bernard", thread_id=status.id)
container.add(sb)
sc = StatusUpdate("test reply c", "cary", thread_id=status.id)
container.add(sc)
# get all thread items from thread item sa.id
si = container.get(sa.id)
if getattr(si, "thread_id"):
values = [x[1] for x in container.thread_items(thread_id=si.thread_id)]
self.assertEqual([sc, sb, sa, status], values)
[docs] def test_get_nothing_by_none_thread_item(self):
container = StatusContainer()
# add normal status update
status = StatusUpdate("test", "arnold")
container.add(status)
(key, value) = list(container.items())[0]
self.assertEqual(key, value.id)
self.assertEqual(status, value)
# add reply to status.id
sa = StatusUpdate("test reply a", "arnold", thread_id=status.id)
container.add(sa)
sb = StatusUpdate("test reply b", "bernard", thread_id=status.id)
container.add(sb)
sc = StatusUpdate("test reply c", "cary", thread_id=status.id)
container.add(sc)
# test by giving none
values = [x[1] for x in container.thread_items(thread_id=None)]
self.assertEqual([], values)
[docs] def test_is_most_recent_in_thread(self):
container = StatusContainer()
# add normal status update
SA = StatusUpdate("test A", "arnold")
container.add(SA)
SB = StatusUpdate("test B", "arnold")
container.add(SB)
SC = StatusUpdate("test C", "arnold")
container.add(SC)
# add replies
sa1 = StatusUpdate("test reply a", "arnold", thread_id=SA.id)
container.add(sa1)
sb1 = StatusUpdate("test reply b1", "bernard", thread_id=SB.id)
container.add(sb1)
sb2 = StatusUpdate("test reply b2", "cary", thread_id=SB.id)
container.add(sb2)
self.assertFalse(container.is_most_recent_in_thread(SA))
self.assertTrue(container.is_most_recent_in_thread(sa1))
self.assertFalse(container.is_most_recent_in_thread(SB))
self.assertFalse(container.is_most_recent_in_thread(sb1))
self.assertTrue(container.is_most_recent_in_thread(sb2))
self.assertTrue(container.is_most_recent_in_thread(SC))
[docs] def test_is_most_recent_in_thread_tagged(self):
container = StatusContainer()
# add normal status update
SA = StatusUpdate("test A", "arnold", tags=["foo"])
container.add(SA)
SB = StatusUpdate("test B", "arnold", tags=["foo", "bar"])
container.add(SB)
SC = StatusUpdate("test C", "arnold", tags=None)
container.add(SC)
# add replies
sa1 = StatusUpdate("test reply a", "arnold", thread_id=SA.id, tags=None)
container.add(sa1)
sb1 = StatusUpdate(
"test reply b1", "bernard", thread_id=SB.id, tags=["foo", "bar"]
)
container.add(sb1)
sb2 = StatusUpdate("test reply b2", "cary", thread_id=SB.id, tags=["flux"])
container.add(sb2)
self.assertTrue(container.is_most_recent_in_thread(SA, tags=["foo"]))
self.assertTrue(container.is_most_recent_in_thread(SA, tags=["foo", "bar"]))
self.assertFalse(container.is_most_recent_in_thread(sa1, tags=["foo"]))
self.assertFalse(container.is_most_recent_in_thread(SB, tags=["foo", "bar"]))
self.assertTrue(container.is_most_recent_in_thread(sb1, tags=["foo", "bar"]))
self.assertTrue(container.is_most_recent_in_thread(sb1, tags=["foo"]))
self.assertFalse(container.is_most_recent_in_thread(sb2, tags=["foo", "bar"]))
self.assertFalse(container.is_most_recent_in_thread(sb2, tags=["foo"]))
self.assertTrue(container.is_most_recent_in_thread(sb2, tags=["flux"]))
# This is a weird one, but a standalone toplevel is *always* the most
# recent in its thread, even if it does not have any tags.
self.assertTrue(container.is_most_recent_in_thread(SC, tags=["foo"]))