Source code for ploneintranet.microblog.tests.test_statuscontainer_threaded

import unittest
from zope.interface import implements

from ploneintranet.microblog.interfaces import IStatusContainer
from ploneintranet.microblog.interfaces import IStatusUpdate
from ploneintranet.microblog import statuscontainer
from ploneintranet.microblog import statusupdate


[docs]class StatusContainer(statuscontainer.BaseStatusContainer): """Override actual implementation with unittest features""" implements(IStatusContainer)
[docs] def _check_add_permission(self, statusupdate): pass
[docs] def _blacklist_microblogcontext_uuids(self): return []
[docs]class StatusUpdate(statusupdate.StatusUpdate): """Override actual implementation with unittest features""" implements(IStatusUpdate) def __init__(self, text, userid, creator=None, thread_id=None, tags=None): statusupdate.StatusUpdate.__init__(self, text, tags=tags) self.userid = userid self.thread_id = thread_id if creator: self.creator = creator else: self.creator = userid
[docs] def _init_userid(self): pass
[docs] def _init_creator(self): pass
[docs]class TestStatusContainer_Threaded(unittest.TestCase): """This is about 'conversation threading' not about 'process threading'."""
[docs] def test_keys_self(self): container = StatusContainer() # add normal status update status = StatusUpdate("test", "arnold") container.add(status) (key, value) = list(container.items())[0] self.assertEqual(key, value.id) self.assertEqual(status, value) keys = [x for x in container.thread_keys(thread_id=status.id)] self.assertEqual([status.id], keys)
[docs] def test_keys(self): container = StatusContainer() # add normal status update status = StatusUpdate("test", "arnold") container.add(status) (key, value) = list(container.items())[0] self.assertEqual(key, value.id) self.assertEqual(status, value) # add reply to status.id sa = StatusUpdate("test reply a", "arnold", thread_id=status.id) container.add(sa) sb = StatusUpdate("test reply b", "bernard", thread_id=status.id) container.add(sb) sc = StatusUpdate("test reply c", "cary", thread_id=status.id) container.add(sc) # get all thread items from parent status.id keys = [x for x in container.thread_keys(thread_id=status.id)] # comments are reversed by default (like status updates) self.assertEqual([sc.id, sb.id, sa.id, status.id], keys)
[docs] def test_values(self): container = StatusContainer() # add normal status update status = StatusUpdate("test", "arnold") container.add(status) (key, value) = list(container.items())[0] self.assertEqual(key, value.id) self.assertEqual(status, value) # add reply to status.id sa = StatusUpdate("test reply a", "arnold", thread_id=status.id) container.add(sa) sb = StatusUpdate("test reply b", "bernard", thread_id=status.id) container.add(sb) sc = StatusUpdate("test reply c", "cary", thread_id=status.id) container.add(sc) # get all thread items from parent status.id values = [x for x in container.thread_values(thread_id=status.id)] # comments are reversed by default (like status updates) self.assertEqual([sc, sb, sa, status], values)
[docs] def test_items(self): container = StatusContainer() # add normal status update status = StatusUpdate("test", "arnold") container.add(status) (key, value) = list(container.items())[0] self.assertEqual(key, value.id) self.assertEqual(status, value) # add reply to status.id sa = StatusUpdate("test reply a", "arnold", thread_id=status.id) container.add(sa) sb = StatusUpdate("test reply b", "bernard", thread_id=status.id) container.add(sb) sc = StatusUpdate("test reply c", "cary", thread_id=status.id) container.add(sc) # get all thread items from parent status.id values = [x[1] for x in container.thread_items(thread_id=status.id)] self.assertEqual([sc, sb, sa, status], values)
[docs] def test_get_thread_by_thread_item(self): container = StatusContainer() # add normal status update status = StatusUpdate("test", "arnold") container.add(status) (key, value) = list(container.items())[0] self.assertEqual(key, value.id) self.assertEqual(status, value) # add reply to status.id sa = StatusUpdate("test reply a", "arnold", thread_id=status.id) container.add(sa) sb = StatusUpdate("test reply b", "bernard", thread_id=status.id) container.add(sb) sc = StatusUpdate("test reply c", "cary", thread_id=status.id) container.add(sc) # get all thread items from thread item sa.id si = container.get(sa.id) if getattr(si, "thread_id"): values = [x[1] for x in container.thread_items(thread_id=si.thread_id)] self.assertEqual([sc, sb, sa, status], values)
[docs] def test_get_nothing_by_none_thread_item(self): container = StatusContainer() # add normal status update status = StatusUpdate("test", "arnold") container.add(status) (key, value) = list(container.items())[0] self.assertEqual(key, value.id) self.assertEqual(status, value) # add reply to status.id sa = StatusUpdate("test reply a", "arnold", thread_id=status.id) container.add(sa) sb = StatusUpdate("test reply b", "bernard", thread_id=status.id) container.add(sb) sc = StatusUpdate("test reply c", "cary", thread_id=status.id) container.add(sc) # test by giving none values = [x[1] for x in container.thread_items(thread_id=None)] self.assertEqual([], values)
[docs] def test_is_most_recent_in_thread(self): container = StatusContainer() # add normal status update SA = StatusUpdate("test A", "arnold") container.add(SA) SB = StatusUpdate("test B", "arnold") container.add(SB) SC = StatusUpdate("test C", "arnold") container.add(SC) # add replies sa1 = StatusUpdate("test reply a", "arnold", thread_id=SA.id) container.add(sa1) sb1 = StatusUpdate("test reply b1", "bernard", thread_id=SB.id) container.add(sb1) sb2 = StatusUpdate("test reply b2", "cary", thread_id=SB.id) container.add(sb2) self.assertFalse(container.is_most_recent_in_thread(SA)) self.assertTrue(container.is_most_recent_in_thread(sa1)) self.assertFalse(container.is_most_recent_in_thread(SB)) self.assertFalse(container.is_most_recent_in_thread(sb1)) self.assertTrue(container.is_most_recent_in_thread(sb2)) self.assertTrue(container.is_most_recent_in_thread(SC))
[docs] def test_is_most_recent_in_thread_tagged(self): container = StatusContainer() # add normal status update SA = StatusUpdate("test A", "arnold", tags=["foo"]) container.add(SA) SB = StatusUpdate("test B", "arnold", tags=["foo", "bar"]) container.add(SB) SC = StatusUpdate("test C", "arnold", tags=None) container.add(SC) # add replies sa1 = StatusUpdate("test reply a", "arnold", thread_id=SA.id, tags=None) container.add(sa1) sb1 = StatusUpdate( "test reply b1", "bernard", thread_id=SB.id, tags=["foo", "bar"] ) container.add(sb1) sb2 = StatusUpdate("test reply b2", "cary", thread_id=SB.id, tags=["flux"]) container.add(sb2) self.assertTrue(container.is_most_recent_in_thread(SA, tags=["foo"])) self.assertTrue(container.is_most_recent_in_thread(SA, tags=["foo", "bar"])) self.assertFalse(container.is_most_recent_in_thread(sa1, tags=["foo"])) self.assertFalse(container.is_most_recent_in_thread(SB, tags=["foo", "bar"])) self.assertTrue(container.is_most_recent_in_thread(sb1, tags=["foo", "bar"])) self.assertTrue(container.is_most_recent_in_thread(sb1, tags=["foo"])) self.assertFalse(container.is_most_recent_in_thread(sb2, tags=["foo", "bar"])) self.assertFalse(container.is_most_recent_in_thread(sb2, tags=["foo"])) self.assertTrue(container.is_most_recent_in_thread(sb2, tags=["flux"])) # This is a weird one, but a standalone toplevel is *always* the most # recent in its thread, even if it does not have any tags. self.assertTrue(container.is_most_recent_in_thread(SC, tags=["foo"]))