blob: 3041d80cbcf3a09f2955b4a7ad2bb41310fb5e79 [file] [log] [blame]
# Copyright (c) 2020 Samsung Electronics
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This source code is part of the near-RT RIC (RAN Intelligent Controller)
# platform project (RICP).
#
"""Examples on how to use Shared Data Layer (SDL) notification feature.
Execution of these examples requires:
* Following Redis extension commands have been installed to runtime environment:
- MSETPUB
- SETIE
- SETIEMPUB
- SETNXMPUB
- DELMPUB
- DELIE
- DELIEMPUB
Redis v4.0 or greater is required. Older versions do not support extension modules.
Implementation of above commands is produced by RIC DBaaS:
https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
In official RIC deployments these commands are installed by `dbaas` service to Redis
container(s).
In development environment you may want install commands manually to pod/container, which is
running Redis.
* Following environment variables are needed to set to the pod/container where the application
utilizing SDL is going to be run.
DBAAS_SERVICE_HOST = [redis server address]
DBAAS_SERVICE_PORT= [redis server port]
DBAAS_MASTER_NAME = [master Redis sentinel name]. Needed to set only if sentinel is in use.
DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if sentinel
is in use.
"""
import threading
import time
from ricsdl.syncstorage import SyncStorage
from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
# There are two available methods for applications to handle notifications:
# - EVENT_LISTENER (true):
# - User calls sdl.start_event_listener() which will create an SDL managed
# event loop for handling messages.
# - EVENT_LISTENER (false):
# - User need to call sdl.handle_messages() which will return the message
#
# Note: In both cases, the given callback function will be executed.
EVENT_LISTENER = True
# Constants used in the examples below.
MY_NS = 'my_ns'
MY_CHANNEL = "my_channel"
MY_LOCK = threading.Lock()
def _try_func_return(func):
"""
Generic wrapper function to call SDL API function and handle exceptions if they are raised.
"""
try:
return func()
except RejectedByBackend as exp:
print(f'SDL function {func.__name__} failed: {str(exp)}')
# Permanent failure, just forward the exception
raise
except (NotConnected, BackendError) as exp:
print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
# Here we could have a retry logic
def _try_func_callback_return(func):
"""Generic wrapper function for testing SDL APIs with callback functions.
threading.Lock is unlocked in the callback function and threading.Lock is
only used to demonstrate that the callback function was called.
"""
global MY_LOCK
MY_LOCK.acquire()
ret = _try_func_return(func)
while MY_LOCK.locked():
time.sleep(0.01)
return ret
# Creates SDL instance. The call creates connection to the SDL database backend.
mysdl = _try_func_return(SyncStorage)
# Stores the last received channel and message
last_cb_channel = ""
last_cb_message = ""
# Allows program to stop receive thread at the end of execution
stop_thread = False
def cb(channel: str, message: str):
"""An example of function that will be called when an event is received.
This function sets last_cb_channel and last_cb_message as channel and
message respectively. This also unlocks the global lock variable.
Args:
channel: Channel where the message was received
message: Received message
"""
global last_cb_channel, last_cb_message, MY_LOCK
last_cb_channel = channel
last_cb_message = message
if MY_LOCK.locked():
MY_LOCK.release()
def listen_thread():
"""An example of a listener thread that continuously calls sdl.handle_events()."""
global mysdl
global stop_thread
while not stop_thread:
message = mysdl.handle_events()
if message:
# You could process message here
pass
time.sleep(0.001)
# Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
# channel, cb function will be called.
_try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
# As mentioned above, there are two available methods for applications to
# handle notifications
if EVENT_LISTENER:
_try_func_return(mysdl.start_event_listener)
else:
thread = threading.Thread(target=listen_thread)
thread.start()
# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
# type must be bytes and multiple key values can be set in one set function call.
_try_func_callback_return(
lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
# Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
# 'my_value'.
was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
assert was_set is True
assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF PUBLISH"
# Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
# value. Callback function will not be called here.
was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
'my_key', b'my_value', b'my_value2'))
assert was_set is False
# Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
# Note that value types must be bytes.
was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
assert was_set is True
assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF NOT PUBLISH"
# Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
assert was_set is False
# Removes a key 'my_key' under given namespace.
_try_func_callback_return(
lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
assert my_ret_dict == {}
assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE PUBLISH"
# Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
assert was_removed is True
assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE IF PUBLISH"
# Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
assert was_removed is False
# Removes all the keys under given namespace.
_try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
assert my_ret_dict != {}
_try_func_callback_return(
lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
assert my_ret_dict == {}
assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE ALL PUBLISH"
stop_thread = True
mysdl.close()