Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 1 | # Copyright (c) 2020 Samsung Electronics |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | # |
| 16 | # This source code is part of the near-RT RIC (RAN Intelligent Controller) |
| 17 | # platform project (RICP). |
| 18 | # |
| 19 | |
| 20 | """Examples on how to use Shared Data Layer (SDL) notification feature. |
| 21 | |
| 22 | Execution of these examples requires: |
| 23 | * Following Redis extension commands have been installed to runtime environment: |
| 24 | - MSETPUB |
| 25 | - SETIE |
Timo Tietavainen | 6589d4d | 2021-03-09 14:37:14 +0200 | [diff] [blame] | 26 | - SETIEMPUB |
| 27 | - SETNXMPUB |
| 28 | - DELMPUB |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 29 | - DELIE |
Timo Tietavainen | 6589d4d | 2021-03-09 14:37:14 +0200 | [diff] [blame] | 30 | - DELIEMPUB |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 31 | Redis v4.0 or greater is required. Older versions do not support extension modules. |
| 32 | Implementation of above commands is produced by RIC DBaaS: |
| 33 | https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas |
| 34 | In official RIC deployments these commands are installed by `dbaas` service to Redis |
| 35 | container(s). |
| 36 | In development environment you may want install commands manually to pod/container, which is |
| 37 | running Redis. |
| 38 | * Following environment variables are needed to set to the pod/container where the application |
| 39 | utilizing SDL is going to be run. |
Timo Tietavainen | c6c9af2 | 2021-05-11 14:43:09 +0300 | [diff] [blame] | 40 | DBAAS_SERVICE_HOST = [DB service address] |
| 41 | DBAAS_SERVICE_PORT= [DB service port] |
| 42 | DBAAS_MASTER_NAME = [DB name]. Needed to set only if Redis sentinel is used to provide high |
| 43 | availability for Redis DB solution. |
| 44 | DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if Redis |
| 45 | sentinel is in use. |
| 46 | DBASS_CLUSTER_ADDR_LIST = [list of DB service addresses]. Is set only if more than one |
| 47 | Redis sentinel groups are in use. |
| 48 | In official RIC deployments four first environment variables are defined in Helm configMaps |
| 49 | of the DBaaS and these configurations can be loaded automatically as environment variables |
| 50 | into application pods via `envFrom dbaas-appconfig` statement in an application Helm Charts. |
| 51 | The last environment variable is not for time being in use in official RIC deployments, only |
| 52 | in Nokia SEP deployments. |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 53 | """ |
| 54 | |
| 55 | import threading |
| 56 | import time |
| 57 | |
| 58 | from ricsdl.syncstorage import SyncStorage |
| 59 | from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError |
Timo Tietavainen | db77539 | 2021-06-09 05:56:54 +0300 | [diff] [blame] | 60 | from typing import (Union, List) |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 61 | |
| 62 | # There are two available methods for applications to handle notifications: |
| 63 | # - EVENT_LISTENER (true): |
| 64 | # - User calls sdl.start_event_listener() which will create an SDL managed |
| 65 | # event loop for handling messages. |
| 66 | # - EVENT_LISTENER (false): |
Timo Tietavainen | db77539 | 2021-06-09 05:56:54 +0300 | [diff] [blame] | 67 | # - User need to call sdl.handle_events() which will return the message |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 68 | # |
| 69 | # Note: In both cases, the given callback function will be executed. |
| 70 | EVENT_LISTENER = True |
| 71 | |
| 72 | # Constants used in the examples below. |
| 73 | MY_NS = 'my_ns' |
| 74 | MY_CHANNEL = "my_channel" |
| 75 | MY_LOCK = threading.Lock() |
| 76 | |
| 77 | |
| 78 | def _try_func_return(func): |
| 79 | """ |
| 80 | Generic wrapper function to call SDL API function and handle exceptions if they are raised. |
| 81 | """ |
| 82 | try: |
| 83 | return func() |
| 84 | except RejectedByBackend as exp: |
| 85 | print(f'SDL function {func.__name__} failed: {str(exp)}') |
| 86 | # Permanent failure, just forward the exception |
| 87 | raise |
| 88 | except (NotConnected, BackendError) as exp: |
| 89 | print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}') |
| 90 | # Here we could have a retry logic |
| 91 | |
| 92 | |
| 93 | def _try_func_callback_return(func): |
| 94 | """Generic wrapper function for testing SDL APIs with callback functions. |
| 95 | |
| 96 | threading.Lock is unlocked in the callback function and threading.Lock is |
| 97 | only used to demonstrate that the callback function was called. |
| 98 | """ |
| 99 | global MY_LOCK |
| 100 | MY_LOCK.acquire() |
| 101 | ret = _try_func_return(func) |
| 102 | while MY_LOCK.locked(): |
| 103 | time.sleep(0.01) |
| 104 | return ret |
| 105 | |
| 106 | |
| 107 | # Creates SDL instance. The call creates connection to the SDL database backend. |
| 108 | mysdl = _try_func_return(SyncStorage) |
| 109 | |
| 110 | # Stores the last received channel and message |
| 111 | last_cb_channel = "" |
| 112 | last_cb_message = "" |
| 113 | |
| 114 | # Allows program to stop receive thread at the end of execution |
| 115 | stop_thread = False |
| 116 | |
| 117 | |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 118 | def cb(channel: str, message: List[str]): |
| 119 | """An example of function that will be called when an event list is received. |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 120 | |
| 121 | This function sets last_cb_channel and last_cb_message as channel and |
| 122 | message respectively. This also unlocks the global lock variable. |
| 123 | |
| 124 | Args: |
| 125 | channel: Channel where the message was received |
| 126 | message: Received message |
| 127 | """ |
| 128 | global last_cb_channel, last_cb_message, MY_LOCK |
| 129 | last_cb_channel = channel |
| 130 | last_cb_message = message |
| 131 | if MY_LOCK.locked(): |
| 132 | MY_LOCK.release() |
| 133 | |
| 134 | |
| 135 | def listen_thread(): |
| 136 | """An example of a listener thread that continuously calls sdl.handle_events().""" |
| 137 | global mysdl |
| 138 | global stop_thread |
| 139 | while not stop_thread: |
| 140 | message = mysdl.handle_events() |
| 141 | if message: |
| 142 | # You could process message here |
| 143 | pass |
| 144 | time.sleep(0.001) |
| 145 | |
Timo Tietavainen | 6589d4d | 2021-03-09 14:37:14 +0200 | [diff] [blame] | 146 | # Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the |
| 147 | # channel, cb function will be called. |
| 148 | _try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL)) |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 149 | |
| 150 | # As mentioned above, there are two available methods for applications to |
| 151 | # handle notifications |
| 152 | if EVENT_LISTENER: |
| 153 | _try_func_return(mysdl.start_event_listener) |
| 154 | else: |
| 155 | thread = threading.Thread(target=listen_thread) |
| 156 | thread.start() |
| 157 | |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 158 | # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value |
| 159 | # type must be bytes and multiple key values can be set in one set function call. |
| 160 | _try_func_callback_return( |
| 161 | lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'})) |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 162 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 163 | |
Timo Tietavainen | db77539 | 2021-06-09 05:56:54 +0300 | [diff] [blame] | 164 | # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value |
| 165 | # type must be bytes and multiple key values can be set in one set function call. |
| 166 | # Function publishes two events into one channel. |
| 167 | _try_func_callback_return( |
| 168 | lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: ["SET PUBLISH1", "SET PUBLISH2"]}, {'my_key': b'my_value'})) |
| 169 | assert last_cb_channel == MY_CHANNEL and last_cb_message == ["SET PUBLISH1", "SET PUBLISH2"] |
| 170 | |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 171 | # Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is |
| 172 | # 'my_value'. |
| 173 | was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish( |
| 174 | MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2')) |
| 175 | assert was_set is True |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 176 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 177 | # Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2' |
| 178 | # value. Callback function will not be called here. |
| 179 | was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, |
| 180 | 'my_key', b'my_value', b'my_value2')) |
| 181 | assert was_set is False |
| 182 | |
| 183 | # Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist. |
| 184 | # Note that value types must be bytes. |
| 185 | was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish( |
| 186 | MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value')) |
| 187 | assert was_set is True |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 188 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF NOT PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 189 | # Try again. This time the key 'my_key2' already exists. Callback function will not be called here. |
| 190 | was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish( |
| 191 | MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value')) |
| 192 | assert was_set is False |
| 193 | |
| 194 | # Removes a key 'my_key' under given namespace. |
| 195 | _try_func_callback_return( |
| 196 | lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key')) |
| 197 | my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key')) |
| 198 | assert my_ret_dict == {} |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 199 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 200 | |
| 201 | # Removes a key 'my_key' under given namespace only if the old value is 'my_value'. |
| 202 | was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish( |
| 203 | MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value')) |
| 204 | assert was_removed is True |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 205 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE IF PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 206 | # Try again to remove not anymore existing key 'my_key'. Callback function will not be called here. |
| 207 | was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish( |
| 208 | MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value')) |
| 209 | assert was_removed is False |
| 210 | |
| 211 | # Removes all the keys under given namespace. |
| 212 | _try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'})) |
| 213 | my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'})) |
| 214 | assert my_ret_dict != {} |
| 215 | |
| 216 | _try_func_callback_return( |
| 217 | lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"})) |
| 218 | my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'})) |
| 219 | assert my_ret_dict == {} |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 220 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE ALL PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 221 | |
| 222 | stop_thread = True |
| 223 | mysdl.close() |