Pack notifications to be compatible with SDL golang
In SDL Golang 'publish' API implementation packs all the events to single DB
notification, where events are separated by '___' characters. In SDL Golang
subscriber receives this packed DB notification and it splits the DB
notification by '___' characters and calls application notification handler
callback function with list of received events.
Current implementation of SDL Python is however different, is does not do any
packing for events and hence it calls application notification callback many
times, once for each event it has received. Also if SDL Python application is
used as event subscriber and SDL Golang application as event published,
Python application won't be able to handle those notifications what are packed
to one db notification by SDL Golang application.
With this commit implement notification packing to SDL Python.
Issue-ID: RIC-795
Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: Ie494430cf46756ea476b98343a8c651a2fa1dbcd
diff --git a/ricsdl-package/examples/notify.py b/ricsdl-package/examples/notify.py
index 473dd73..7495bfe 100755
--- a/ricsdl-package/examples/notify.py
+++ b/ricsdl-package/examples/notify.py
@@ -57,13 +57,14 @@
from ricsdl.syncstorage import SyncStorage
from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
+from typing import (Union, List)
# There are two available methods for applications to handle notifications:
# - EVENT_LISTENER (true):
# - User calls sdl.start_event_listener() which will create an SDL managed
# event loop for handling messages.
# - EVENT_LISTENER (false):
-# - User need to call sdl.handle_messages() which will return the message
+# - User need to call sdl.handle_events() which will return the message
#
# Note: In both cases, the given callback function will be executed.
EVENT_LISTENER = True
@@ -114,8 +115,9 @@
stop_thread = False
-def cb(channel: str, message: str):
- """An example of function that will be called when an event is received.
+def cb(channel: str, message: Union[str, List[str]]):
+ """An example of function that will be called when a single event or list of
+ events are received.
This function sets last_cb_channel and last_cb_message as channel and
message respectively. This also unlocks the global lock variable.
@@ -160,6 +162,13 @@
lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
+# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
+# type must be bytes and multiple key values can be set in one set function call.
+# Function publishes two events into one channel.
+_try_func_callback_return(
+ lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: ["SET PUBLISH1", "SET PUBLISH2"]}, {'my_key': b'my_value'}))
+assert last_cb_channel == MY_CHANNEL and last_cb_message == ["SET PUBLISH1", "SET PUBLISH2"]
+
# Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
# 'my_value'.
was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
diff --git a/ricsdl-package/ricsdl/__init__.py b/ricsdl-package/ricsdl/__init__.py
index d969bec..81f7e72 100644
--- a/ricsdl-package/ricsdl/__init__.py
+++ b/ricsdl-package/ricsdl/__init__.py
@@ -31,7 +31,7 @@
)
-__version__ = '2.2.0'
+__version__ = '2.3.0'
__all__ = [
diff --git a/ricsdl-package/ricsdl/backend/dbbackend_abc.py b/ricsdl-package/ricsdl/backend/dbbackend_abc.py
index cdf0311..49944b5 100755
--- a/ricsdl-package/ricsdl/backend/dbbackend_abc.py
+++ b/ricsdl-package/ricsdl/backend/dbbackend_abc.py
@@ -161,7 +161,8 @@
pass
@abstractmethod
- def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+ def subscribe_channel(self, ns: str,
+ cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
channels: List[str]) -> None:
"""
This takes a callback function and one or many channels to be subscribed.
@@ -184,10 +185,10 @@
pass
@abstractmethod
- def handle_events(self) -> Optional[Tuple[str, str]]:
+ def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
"""
handle_events is a non-blocking function that returns a tuple containing channel
- name and message received from notification.
+ name and message(s) received from notification.
"""
pass
diff --git a/ricsdl-package/ricsdl/backend/fake_dict_db.py b/ricsdl-package/ricsdl/backend/fake_dict_db.py
index 5a49f10..1a63ebd 100755
--- a/ricsdl-package/ricsdl/backend/fake_dict_db.py
+++ b/ricsdl-package/ricsdl/backend/fake_dict_db.py
@@ -149,15 +149,13 @@
data_map: Dict[str, bytes]) -> None:
self._db.update(data_map.copy())
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
old_data: bytes, new_data: bytes) -> bool:
if self.set_if(ns, key, old_data, new_data):
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
return True
return False
@@ -165,8 +163,7 @@
key: str, data: bytes) -> bool:
if self.set_if_not_exists(ns, key, data):
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
return True
return False
@@ -175,15 +172,13 @@
for key in keys:
self._db.pop(key, None)
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
data: bytes) -> bool:
if self.remove_if(ns, key, data):
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
return True
return False
@@ -191,10 +186,10 @@
# Note: Since fake db has only one namespace, this deletes all keys
self._db.clear()
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
- def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+ def subscribe_channel(self, ns: str,
+ cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
channels: List[str]) -> None:
for channel in channels:
self._channel_cbs[channel] = cb
@@ -206,7 +201,7 @@
message = self._queue.get()
cb = self._channel_cbs.get(message[0], None)
if cb:
- cb(message[0], message[1])
+ cb(message[0], message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1])
time.sleep(0.001)
def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
@@ -220,7 +215,7 @@
self._listen_thread.start()
self._run_in_thread = True
- def handle_events(self) -> Optional[Tuple[str, str]]:
+ def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
if self._listen_thread.is_alive() or self._run_in_thread:
raise Exception("Event loop already started")
try:
@@ -228,9 +223,10 @@
except queue.Empty:
return None
cb = self._channel_cbs.get(message[0], None)
+ notifications = message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1]
if cb:
- cb(message[0], message[1])
- return (message[0], message[1])
+ cb(message[0], notifications)
+ return (message[0], notifications)
class FakeDictBackendLock(DbBackendLockAbc):
diff --git a/ricsdl-package/ricsdl/backend/redis.py b/ricsdl-package/ricsdl/backend/redis.py
index 12726a3..bc4b43b 100755
--- a/ricsdl-package/ricsdl/backend/redis.py
+++ b/ricsdl-package/ricsdl/backend/redis.py
@@ -57,6 +57,10 @@
class PubSub(redis.client.PubSub):
+ def __init__(self, event_separator, connection_pool, ignore_subscribe_messages=False):
+ super().__init__(connection_pool, shard_hint=None, ignore_subscribe_messages=ignore_subscribe_messages)
+ self.event_separator = event_separator
+
def handle_message(self, response, ignore_subscribe_messages=False):
"""
Parses a pub/sub message. If the channel or pattern was subscribed to
@@ -112,8 +116,10 @@
# message
message_channel = self._strip_ns_from_bin_key('', message['channel'])
message_data = message['data'].decode('utf-8')
- handler(message_channel, message_data)
- return message_channel, message_data
+ messages = message_data.split(self.event_separator)
+ notification = messages[0] if len(messages) == 1 else messages
+ handler(message_channel, notification)
+ return message_channel, notification
elif message_type != 'pong':
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
@@ -148,6 +154,7 @@
def __init__(self, configuration: _Configuration) -> None:
super().__init__()
self.next_client_event = 0
+ self.event_separator = configuration.get_event_separator()
self.clients = list()
with _map_to_sdl_exception():
self.clients = self.__create_redis_clients(configuration)
@@ -323,7 +330,8 @@
*channels_and_events_prepared,
)
- def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+ def subscribe_channel(self, ns: str,
+ cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
channels: List[str]) -> None:
channels = self.__add_keys_ns_prefix(ns, channels)
for channel in channels:
@@ -349,7 +357,7 @@
redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
redis_ctx.run_in_thread = True
- def handle_events(self) -> Optional[Tuple[str, str]]:
+ def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
if self.next_client_event >= len(self.clients):
self.next_client_event = 0
redis_ctx = self.clients[self.next_client_event]
@@ -389,7 +397,7 @@
new_redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
- redis_pubsub = PubSub(new_redis.connection_pool, ignore_subscribe_messages=True)
+ redis_pubsub = PubSub(self.event_separator, new_redis.connection_pool, ignore_subscribe_messages=True)
pubsub_thread = threading.Thread(target=None)
run_in_thread = False
@@ -448,17 +456,20 @@
ret_keys.append(nskey[1])
return ret_keys
- @classmethod
- def _prepare_channels(cls, ns: str, channels_and_events: Dict[str,
- List[str]]) -> Tuple[List, int]:
+ def _prepare_channels(self, ns: str,
+ channels_and_events: Dict[str, List[str]]) -> Tuple[List, int]:
channels_and_events_prepared = []
- total_events = 0
for channel, events in channels_and_events.items():
+ one_channel_join_events = None
for event in events:
- channels_and_events_prepared.append(cls.__add_key_ns_prefix(ns, channel))
- channels_and_events_prepared.append(event)
- total_events += 1
- return channels_and_events_prepared, total_events
+ if one_channel_join_events is None:
+ channels_and_events_prepared.append(self.__add_key_ns_prefix(ns, channel))
+ one_channel_join_events = event
+ else:
+ one_channel_join_events = one_channel_join_events + self.event_separator + event
+ channels_and_events_prepared.append(one_channel_join_events)
+ pairs_cnt = int(len(channels_and_events_prepared) / 2)
+ return channels_and_events_prepared, pairs_cnt
def get_redis_connection(self, ns: str):
"""Return existing Redis database connection valid for the namespace."""
diff --git a/ricsdl-package/ricsdl/configuration.py b/ricsdl-package/ricsdl/configuration.py
index b1521da..e99e68e 100644
--- a/ricsdl-package/ricsdl/configuration.py
+++ b/ricsdl-package/ricsdl/configuration.py
@@ -76,3 +76,7 @@
db_sentinel_master_name=os.getenv('DBAAS_MASTER_NAME'),
db_cluster_addr_list=os.getenv('DBAAS_CLUSTER_ADDR_LIST'),
db_type=backend_type)
+
+ @classmethod
+ def get_event_separator(cls):
+ return "___"
diff --git a/ricsdl-package/ricsdl/syncstorage.py b/ricsdl-package/ricsdl/syncstorage.py
index 48b5e3d..55063e4 100755
--- a/ricsdl-package/ricsdl/syncstorage.py
+++ b/ricsdl-package/ricsdl/syncstorage.py
@@ -120,6 +120,7 @@
def __init__(self, fake_db_backend=None) -> None:
super().__init__()
self.__configuration = _Configuration(fake_db_backend)
+ self.event_separator = self.__configuration.get_event_separator()
self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
def __del__(self):
@@ -265,7 +266,8 @@
self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
@func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
- def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+ def subscribe_channel(self, ns: str,
+ cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
channels: Union[str, Set[str]]) -> None:
self._validate_callback(cb)
channels = [channels] if isinstance(channels, str) else list(channels)
@@ -279,7 +281,7 @@
def start_event_listener(self) -> None:
self.__dbbackend.start_event_listener()
- def handle_events(self) -> Optional[Tuple[str, str]]:
+ def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
return self.__dbbackend.handle_events()
@func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
@@ -302,8 +304,7 @@
if not isinstance(v, bytes):
raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
- @classmethod
- def _validate_channels_events(cls, channels_and_events: Dict[Any, Any]):
+ def _validate_channels_events(self, channels_and_events: Dict[Any, Any]):
for channel, events in channels_and_events.items():
if not isinstance(channel, str):
raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
@@ -316,6 +317,13 @@
if not isinstance(event, str):
raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
events, type(events)))
+ if self.event_separator in event:
+ raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
+ events, self.event_separator))
+ else:
+ if self.event_separator in events:
+ raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
+ events, self.event_separator))
@classmethod
def _validate_callback(cls, cb):
diff --git a/ricsdl-package/ricsdl/syncstorage_abc.py b/ricsdl-package/ricsdl/syncstorage_abc.py
index 61dd67f..1151979 100755
--- a/ricsdl-package/ricsdl/syncstorage_abc.py
+++ b/ricsdl-package/ricsdl/syncstorage_abc.py
@@ -829,7 +829,8 @@
pass
@abstractmethod
- def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+ def subscribe_channel(self, ns: str,
+ cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
channels: Union[str, Set[str]]) -> None:
"""
Subscribes the client to the specified channels.
@@ -840,10 +841,10 @@
the events.
When subscribing for a channel, a callback function is given as a parameter.
- Whenever a notification is received from a channel, this callback is called
- with channel and notifications as parameter. A call to subscribe_channel function
- returns immediately, callbacks will be called synchronously from a dedicated
- thread.
+ Whenever single notification or many notifications are received from a channel,
+ this callback is called with channel and notifications as parameter. A call to
+ subscribe_channel function returns immediately, callbacks will be called
+ synchronously from a dedicated thread.
It is possible to subscribe to different channels using different callbacks. In
this case simply use subscribe_channel function separately for each channel.
@@ -855,7 +856,7 @@
Args:
ns: Namespace under which this operation is targeted.
- cb: A function that is called when an event on channel is received.
+ cb: A function that is called when events on channel are received.
channels: One channel or multiple channels to be subscribed.
Returns:
@@ -909,10 +910,10 @@
pass
@abstractmethod
- def handle_events(self) -> Optional[Tuple[str, str]]:
+ def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
"""
handle_events is a non-blocking function that returns a tuple containing channel
- name and message received from an event. The registered callback function will
+ name and message(s) received from an event. The registered callback function will
still be called when an event is received.
This function is called if SDL user decides to handle notifications in its own
@@ -924,7 +925,8 @@
events handling starts.
Returns:
- Tuple: (channel: str, message: str)
+ Tuple: (channel: str, message: str) or
+ Tuple: (channel: str, messages: list of str)
Raises:
SdlTypeError: If function's argument is of an inappropriate type.
diff --git a/ricsdl-package/tests/backend/test_redis.py b/ricsdl-package/tests/backend/test_redis.py
index 1d79752..7d2928b 100755
--- a/ricsdl-package/tests/backend/test_redis.py
+++ b/ricsdl-package/tests/backend/test_redis.py
@@ -28,6 +28,7 @@
from ricsdl.configuration import DbBackendType
import ricsdl.exceptions
+EVENT_SEPARATOR = "___"
def get_test_sdl_standby_config():
return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
@@ -79,15 +80,17 @@
request.cls.group_redis = '{some-ns},some-group'
request.cls.groupmembers = set([b'm1', b'm2'])
request.cls.groupmember = b'm1'
- request.cls.channels = ['abs', 'gma']
- request.cls.channels_and_events = {'abs': ['cbn'], 'gma': ['jkl']}
- request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn', '{some-ns},gma', 'jkl']
+ request.cls.channels = ['ch1', 'ch2']
+ request.cls.channels_and_events = {'ch1': ['ev1'], 'ch2': ['ev2', 'ev3']}
+ request.cls.channels_and_events_redis = ['{some-ns},ch1', 'ev1',
+ '{some-ns},ch2', 'ev2' + EVENT_SEPARATOR + 'ev3']
yield
@pytest.fixture(params=['standalone', 'sentinel', 'sentinel_cluster'])
def redis_backend_fixture(request, redis_backend_common_fixture):
request.cls.configuration = Mock()
+ request.cls.configuration.get_event_separator.return_value = EVENT_SEPARATOR
request.cls.test_backend_type = request.param
if request.param == 'standalone':
cfg = get_test_sdl_standby_config()
@@ -103,7 +106,8 @@
request.cls.db = db
mock_redis.assert_called_once_with(db=0, host=cfg.db_host, max_connections=20, port=cfg.db_port)
- mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+ mock_pubsub.assert_called_once_with(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool,
+ ignore_subscribe_messages=True)
assert request.cls.mock_redis.set_response_callback.call_count == 2
assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
@@ -122,7 +126,8 @@
mock_sentinel.assert_called_once_with([(cfg.db_host, cfg.db_sentinel_port)])
mock_sentinel.master_for.called_once_with(cfg.db_sentinel_master_name)
- mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+ mock_pubsub.assert_called_once_with(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool,
+ ignore_subscribe_messages=True)
assert request.cls.mock_redis.set_response_callback.call_count == 2
assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
@@ -150,8 +155,8 @@
)
assert mock_pubsub.call_count == 2
mock_pubsub.assert_has_calls([
- call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
- call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+ call(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+ call(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
])
assert request.cls.mock_redis.set_response_callback.call_count == 4
assert request.cls.mock_redis.set_response_callback.call_args_list == [
@@ -543,7 +548,7 @@
def test_unsubscribe_channel_success(self):
self.db.unsubscribe_channel(self.ns, [self.channels[0]])
- self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},abs')
+ self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},ch1')
def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
@@ -859,9 +864,9 @@
class TestRedisClient:
@classmethod
def setup_class(cls):
- cls.pubsub = ricsdl.backend.redis.PubSub(Mock())
- cls.pubsub.channels = {b'{some-ns},abs': Mock()}
+ cls.pubsub = ricsdl.backend.redis.PubSub(EVENT_SEPARATOR, Mock())
+ cls.pubsub.channels = {b'{some-ns},ch1': Mock()}
def test_handle_pubsub_message(self):
- assert self.pubsub.handle_message([b'message', b'{some-ns},abs', b'cbn']) == ('abs', 'cbn')
- self.pubsub.channels.get(b'{some-ns},abs').assert_called_once_with('abs', 'cbn')
+ assert self.pubsub.handle_message([b'message', b'{some-ns},ch1', b'cbn']) == ('ch1', 'cbn')
+ self.pubsub.channels.get(b'{some-ns},ch1').assert_called_once_with('ch1', 'cbn')
diff --git a/ricsdl-package/tests/test_configuration.py b/ricsdl-package/tests/test_configuration.py
index a9d24ba..1c89663 100644
--- a/ricsdl-package/tests/test_configuration.py
+++ b/ricsdl-package/tests/test_configuration.py
@@ -62,6 +62,9 @@
db_type=DbBackendType.FAKE_DICT)
assert expected_config == self.config.get_params()
+ def test_get_event_separator_function_return_expected_separator(self, config_fixture):
+ assert "___" == _Configuration.get_event_separator()
+
def test_get_params_function_can_raise_exception_if_wrong_fake_db_type(self):
with pytest.raises(ValueError, match=r"Configuration error"):
_Configuration(fake_db_backend='bad value')
diff --git a/ricsdl-package/tests/test_syncstorage.py b/ricsdl-package/tests/test_syncstorage.py
index 332a2fa..839d159 100755
--- a/ricsdl-package/tests/test_syncstorage.py
+++ b/ricsdl-package/tests/test_syncstorage.py
@@ -26,6 +26,7 @@
from ricsdl.syncstorage import func_arg_checker
from ricsdl.exceptions import (SdlTypeError, NotConnected)
+EVENT_SEPARATOR = "___"
@pytest.fixture()
def sync_storage_fixture(request):
@@ -44,7 +45,8 @@
request.cls.lock_int_expiration = 10
request.cls.lock_float_expiration = 1.1
request.cls.channels = {'abs', 'cbn'}
- request.cls.channels_and_events = {'abs': 'cbn'}
+ request.cls.channels_and_events = {'ch1': 'ev1', 'ch2': ['ev1', 'ev2', 'ev3']}
+ request.cls.ill_event = "illegal" + EVENT_SEPARATOR + "ev"
with patch('ricsdl.backend.get_backend_instance') as mock_db_backend:
storage = SyncStorage()
@@ -322,13 +324,25 @@
def test_set_and_publish_can_raise_exception_for_wrong_argument(self):
with pytest.raises(SdlTypeError):
- self.storage.set_and_publish(123, self.channels_and_events, {'a': b'v1'})
+ self.storage.set_and_publish(123, self.channels_and_events, self.dm)
with pytest.raises(SdlTypeError):
- self.storage.set_and_publish('ns', self.channels_and_events, [1, 2])
+ self.storage.set_and_publish(self.ns, None, self.dm)
with pytest.raises(SdlTypeError):
- self.storage.set_and_publish('ns', self.channels_and_events, {0xbad: b'v1'})
+ self.storage.set_and_publish(self.ns, {0xbad: "ev1"}, self.dm)
with pytest.raises(SdlTypeError):
- self.storage.set_and_publish('ns', self.channels_and_events, {'a': 0xbad})
+ self.storage.set_and_publish(self.ns, {"ch1": 0xbad}, self.dm)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish(self.ns, {"ch1": self.ill_event}, self.dm)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.dm)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.dm)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish(self.ns, self.channels_and_events, [1, 2])
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish(self.ns, self.channels_and_events, {0xbad: b'v1'})
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish(self.ns, self.channels_and_events, {'a': 0xbad})
def test_set_if_and_publish_success(self):
self.mock_db_backend.set_if_and_publish.return_value = True
@@ -348,13 +362,35 @@
def test_set_if_and_publish_can_raise_exception_for_wrong_argument(self):
with pytest.raises(SdlTypeError):
- self.storage.set_if_and_publish(0xbad, self.channels_and_events, 'key', b'v1', b'v2')
+ self.storage.set_if_and_publish(0xbad, self.channels_and_events, self.key,
+ self.old_data, self.new_data)
with pytest.raises(SdlTypeError):
- self.storage.set_if_and_publish('ns', self.channels_and_events, 0xbad, b'v1', b'v2')
+ self.storage.set_if_and_publish(self.ns, None, self.key, self.old_data,
+ self.new_data)
with pytest.raises(SdlTypeError):
- self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', 0xbad, b'v2')
+ self.storage.set_if_and_publish(self.ns, {0xbad: "ev1"}, self.key,
+ self.old_data, self.new_data)
with pytest.raises(SdlTypeError):
- self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', b'v1', 0xbad)
+ self.storage.set_if_and_publish(self.ns, {"ch1": 0xbad}, self.key,
+ self.old_data, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish(self.ns, {"ch1": self.ill_event}, self.key,
+ self.old_data, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.key,
+ self.old_data, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.key,
+ self.old_data, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish(self.ns, self.channels_and_events, 0xbad,
+ self.old_data, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key,
+ 0xbad, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.old_data, 0xbad)
def test_set_if_not_exists_and_publish_success(self):
self.mock_db_backend.set_if_not_exists_and_publish.return_value = True
@@ -374,12 +410,32 @@
def test_set_if_not_exists_and_publish_can_raise_exception_for_wrong_argument(self):
with pytest.raises(SdlTypeError):
- self.storage.set_if_not_exists_and_publish(0xbad, self.channels_and_events, 'key',
- b'v1')
+ self.storage.set_if_not_exists_and_publish(0xbad, self.channels_and_events,
+ self.key, self.new_data)
with pytest.raises(SdlTypeError):
- self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 0xbad, b'v1')
+ self.storage.set_if_not_exists_and_publish(self.ns, None, self.key,
+ self.new_data)
with pytest.raises(SdlTypeError):
- self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 'key', 0xbad)
+ self.storage.set_if_not_exists_and_publish(self.ns, {0xbad: "ev1"},
+ self.key, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": 0xbad},
+ self.key, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": self.ill_event},
+ self.key, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": ["ev1", 0xbad]},
+ self.key, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]},
+ self.key, self.new_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events,
+ 0xbad, b'v1')
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events,
+ self.key, 0xbad)
def test_remove_and_publish_function_success(self):
self.storage.remove_and_publish(self.ns, self.channels_and_events, self.keys)
@@ -390,7 +446,19 @@
with pytest.raises(SdlTypeError):
self.storage.remove_and_publish(0xbad, self.channels_and_events, self.keys)
with pytest.raises(SdlTypeError):
- self.storage.remove(self.ns, self.channels_and_events, 0xbad)
+ self.storage.remove_and_publish(self.ns, None, self.keys)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_and_publish(self.ns, {0xbad: "ev1"}, self.keys)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_and_publish(self.ns, {"ch1": 0xbad}, self.keys)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_and_publish(self.ns, {"ch1": self.ill_event}, self.keys)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.keys)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.keys)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_and_publish(self.ns, self.channels_and_events, 0xbad)
def test_remove_if_and_publish_success(self):
self.mock_db_backend.remove_if_and_publish.return_value = True
@@ -413,6 +481,23 @@
self.storage.remove_if_and_publish(0xbad, self.channels_and_events, self.keys,
self.old_data)
with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(self.ns, None, self.keys, self.old_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(self.ns, {0xbad: "ev1"}, self.keys,
+ self.old_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(self.ns, {"ch1": 0xbad}, self.keys,
+ self.old_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(self.ns, {"ch1": self.ill_event}, self.keys,
+ self.old_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.keys,
+ self.old_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.keys,
+ self.old_data)
+ with pytest.raises(SdlTypeError):
self.storage.remove_if_and_publish(self.ns, self.channels_and_events, 0xbad,
self.old_data)
with pytest.raises(SdlTypeError):
@@ -426,6 +511,18 @@
def test_remove_all_and_publish_can_raise_exception_for_wrong_argument(self):
with pytest.raises(SdlTypeError):
self.storage.remove_all_and_publish(0xbad, self.channels_and_events)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_all_and_publish(self.ns, None)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_all_and_publish(self.ns, {0xbad: "ev1"})
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_all_and_publish(self.ns, {"ch1": 0xbad})
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_all_and_publish(self.ns, {"ch1": self.ill_event})
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_all_and_publish(self.ns, {"ch1": ["ev1", 0xbad]})
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_all_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]})
def test_subscribe_function_success(self):
def cb(channel, message):