blob: 47d9b04a150f332866d993d26b6efa94a4daffab [file] [log] [blame]
Timothy Ebidoa4392392020-08-12 10:49:26 +09001# Copyright (c) 2020 Samsung Electronics
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15#
16# This source code is part of the near-RT RIC (RAN Intelligent Controller)
17# platform project (RICP).
18#
19
20"""Examples on how to use Shared Data Layer (SDL) notification feature.
21
22Execution of these examples requires:
23 * Following Redis extension commands have been installed to runtime environment:
24 - MSETPUB
25 - SETIE
Timo Tietavainen6589d4d2021-03-09 14:37:14 +020026 - SETIEMPUB
27 - SETNXMPUB
28 - DELMPUB
Timothy Ebidoa4392392020-08-12 10:49:26 +090029 - DELIE
Timo Tietavainen6589d4d2021-03-09 14:37:14 +020030 - DELIEMPUB
Timothy Ebidoa4392392020-08-12 10:49:26 +090031 Redis v4.0 or greater is required. Older versions do not support extension modules.
32 Implementation of above commands is produced by RIC DBaaS:
33 https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
34 In official RIC deployments these commands are installed by `dbaas` service to Redis
35 container(s).
36 In development environment you may want install commands manually to pod/container, which is
37 running Redis.
38 * Following environment variables are needed to set to the pod/container where the application
39 utilizing SDL is going to be run.
Timo Tietavainenc6c9af22021-05-11 14:43:09 +030040 DBAAS_SERVICE_HOST = [DB service address]
Timo Tietavainene67b9ab2022-03-14 07:27:30 +020041 DBAAS_SERVICE_PORT= [Comma separated list of DB service ports]. Only one port supported in
42 RIC deployments, Nokia SEP deployments can have multiple ports.
43 DBAAS_MASTER_NAME = [Comma separated list of DB names]. Needed to set only if Redis
44 sentinel is used to provide high availability for Redis DB solution. Only one DB name
45 supported in RIC deployments, Nokia SEP deployments can have multiple DB names.
46 DBAAS_SERVICE_SENTINEL_PORT = [Comma separated list of Redis sentinel port number]. Needed
47 to set only if Redis sentinel is in use. Only one port supported in RIC deployments, Nokia
48 SEP deployments can have multiple ports.
49 DBASS_CLUSTER_ADDR_LIST = [Comma separated list of DB service addresses]. Is set only if
50 more than one Redis sentinel groups are in use. Only in use in Nokia SEP deployments.
Timo Tietavainenc6c9af22021-05-11 14:43:09 +030051 In official RIC deployments four first environment variables are defined in Helm configMaps
52 of the DBaaS and these configurations can be loaded automatically as environment variables
53 into application pods via `envFrom dbaas-appconfig` statement in an application Helm Charts.
54 The last environment variable is not for time being in use in official RIC deployments, only
55 in Nokia SEP deployments.
Timothy Ebidoa4392392020-08-12 10:49:26 +090056"""
57
58import threading
59import time
60
61from ricsdl.syncstorage import SyncStorage
62from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
Timo Tietavainendb775392021-06-09 05:56:54 +030063from typing import (Union, List)
Timothy Ebidoa4392392020-08-12 10:49:26 +090064
65# There are two available methods for applications to handle notifications:
66# - EVENT_LISTENER (true):
67# - User calls sdl.start_event_listener() which will create an SDL managed
68# event loop for handling messages.
69# - EVENT_LISTENER (false):
Timo Tietavainendb775392021-06-09 05:56:54 +030070# - User need to call sdl.handle_events() which will return the message
Timothy Ebidoa4392392020-08-12 10:49:26 +090071#
72# Note: In both cases, the given callback function will be executed.
73EVENT_LISTENER = True
74
75# Constants used in the examples below.
76MY_NS = 'my_ns'
77MY_CHANNEL = "my_channel"
78MY_LOCK = threading.Lock()
79
80
81def _try_func_return(func):
82 """
83 Generic wrapper function to call SDL API function and handle exceptions if they are raised.
84 """
85 try:
86 return func()
87 except RejectedByBackend as exp:
88 print(f'SDL function {func.__name__} failed: {str(exp)}')
89 # Permanent failure, just forward the exception
90 raise
91 except (NotConnected, BackendError) as exp:
92 print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
93 # Here we could have a retry logic
94
95
96def _try_func_callback_return(func):
97 """Generic wrapper function for testing SDL APIs with callback functions.
98
99 threading.Lock is unlocked in the callback function and threading.Lock is
100 only used to demonstrate that the callback function was called.
101 """
102 global MY_LOCK
103 MY_LOCK.acquire()
104 ret = _try_func_return(func)
105 while MY_LOCK.locked():
106 time.sleep(0.01)
107 return ret
108
109
110# Creates SDL instance. The call creates connection to the SDL database backend.
111mysdl = _try_func_return(SyncStorage)
112
113# Stores the last received channel and message
114last_cb_channel = ""
115last_cb_message = ""
116
117# Allows program to stop receive thread at the end of execution
118stop_thread = False
119
120
Timo Tietavainenf5af9042021-06-10 15:54:37 +0300121def cb(channel: str, message: List[str]):
122 """An example of function that will be called when an event list is received.
Timothy Ebidoa4392392020-08-12 10:49:26 +0900123
124 This function sets last_cb_channel and last_cb_message as channel and
125 message respectively. This also unlocks the global lock variable.
126
127 Args:
128 channel: Channel where the message was received
129 message: Received message
130 """
131 global last_cb_channel, last_cb_message, MY_LOCK
132 last_cb_channel = channel
133 last_cb_message = message
134 if MY_LOCK.locked():
135 MY_LOCK.release()
136
137
138def listen_thread():
139 """An example of a listener thread that continuously calls sdl.handle_events()."""
140 global mysdl
141 global stop_thread
142 while not stop_thread:
143 message = mysdl.handle_events()
144 if message:
145 # You could process message here
146 pass
147 time.sleep(0.001)
148
Timo Tietavainen6589d4d2021-03-09 14:37:14 +0200149# Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
150# channel, cb function will be called.
151_try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
Timothy Ebidoa4392392020-08-12 10:49:26 +0900152
153# As mentioned above, there are two available methods for applications to
154# handle notifications
155if EVENT_LISTENER:
156 _try_func_return(mysdl.start_event_listener)
157else:
158 thread = threading.Thread(target=listen_thread)
159 thread.start()
160
Timothy Ebidoa4392392020-08-12 10:49:26 +0900161# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
162# type must be bytes and multiple key values can be set in one set function call.
163_try_func_callback_return(
164 lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
Timo Tietavainenf5af9042021-06-10 15:54:37 +0300165assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET PUBLISH"
Timothy Ebidoa4392392020-08-12 10:49:26 +0900166
Timo Tietavainendb775392021-06-09 05:56:54 +0300167# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
168# type must be bytes and multiple key values can be set in one set function call.
169# Function publishes two events into one channel.
170_try_func_callback_return(
171 lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: ["SET PUBLISH1", "SET PUBLISH2"]}, {'my_key': b'my_value'}))
172assert last_cb_channel == MY_CHANNEL and last_cb_message == ["SET PUBLISH1", "SET PUBLISH2"]
173
Timothy Ebidoa4392392020-08-12 10:49:26 +0900174# Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
175# 'my_value'.
176was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
177 MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
178assert was_set is True
Timo Tietavainenf5af9042021-06-10 15:54:37 +0300179assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF PUBLISH"
Timothy Ebidoa4392392020-08-12 10:49:26 +0900180# Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
181# value. Callback function will not be called here.
182was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
183 'my_key', b'my_value', b'my_value2'))
184assert was_set is False
185
186# Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
187# Note that value types must be bytes.
188was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
189 MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
190assert was_set is True
Timo Tietavainenf5af9042021-06-10 15:54:37 +0300191assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF NOT PUBLISH"
Timothy Ebidoa4392392020-08-12 10:49:26 +0900192# Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
193was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
194 MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
195assert was_set is False
196
197# Removes a key 'my_key' under given namespace.
198_try_func_callback_return(
199 lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
200my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
201assert my_ret_dict == {}
Timo Tietavainenf5af9042021-06-10 15:54:37 +0300202assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE PUBLISH"
Timothy Ebidoa4392392020-08-12 10:49:26 +0900203
204# Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
205was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
206 MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
207assert was_removed is True
Timo Tietavainenf5af9042021-06-10 15:54:37 +0300208assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE IF PUBLISH"
Timothy Ebidoa4392392020-08-12 10:49:26 +0900209# Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
210was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
211 MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
212assert was_removed is False
213
214# Removes all the keys under given namespace.
215_try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
216my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
217assert my_ret_dict != {}
218
219_try_func_callback_return(
220 lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
221my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
222assert my_ret_dict == {}
Timo Tietavainenf5af9042021-06-10 15:54:37 +0300223assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE ALL PUBLISH"
Timothy Ebidoa4392392020-08-12 10:49:26 +0900224
225stop_thread = True
226mysdl.close()