Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 1 | # Copyright (c) 2020 Samsung Electronics |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | # |
| 16 | # This source code is part of the near-RT RIC (RAN Intelligent Controller) |
| 17 | # platform project (RICP). |
| 18 | # |
| 19 | |
| 20 | """Examples on how to use Shared Data Layer (SDL) notification feature. |
| 21 | |
| 22 | Execution of these examples requires: |
| 23 | * Following Redis extension commands have been installed to runtime environment: |
| 24 | - MSETPUB |
| 25 | - SETIE |
Timo Tietavainen | 6589d4d | 2021-03-09 14:37:14 +0200 | [diff] [blame] | 26 | - SETIEMPUB |
| 27 | - SETNXMPUB |
| 28 | - DELMPUB |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 29 | - DELIE |
Timo Tietavainen | 6589d4d | 2021-03-09 14:37:14 +0200 | [diff] [blame] | 30 | - DELIEMPUB |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 31 | Redis v4.0 or greater is required. Older versions do not support extension modules. |
| 32 | Implementation of above commands is produced by RIC DBaaS: |
| 33 | https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas |
| 34 | In official RIC deployments these commands are installed by `dbaas` service to Redis |
| 35 | container(s). |
| 36 | In development environment you may want install commands manually to pod/container, which is |
| 37 | running Redis. |
| 38 | * Following environment variables are needed to set to the pod/container where the application |
| 39 | utilizing SDL is going to be run. |
Timo Tietavainen | c6c9af2 | 2021-05-11 14:43:09 +0300 | [diff] [blame] | 40 | DBAAS_SERVICE_HOST = [DB service address] |
Timo Tietavainen | e67b9ab | 2022-03-14 07:27:30 +0200 | [diff] [blame^] | 41 | DBAAS_SERVICE_PORT= [Comma separated list of DB service ports]. Only one port supported in |
| 42 | RIC deployments, Nokia SEP deployments can have multiple ports. |
| 43 | DBAAS_MASTER_NAME = [Comma separated list of DB names]. Needed to set only if Redis |
| 44 | sentinel is used to provide high availability for Redis DB solution. Only one DB name |
| 45 | supported in RIC deployments, Nokia SEP deployments can have multiple DB names. |
| 46 | DBAAS_SERVICE_SENTINEL_PORT = [Comma separated list of Redis sentinel port number]. Needed |
| 47 | to set only if Redis sentinel is in use. Only one port supported in RIC deployments, Nokia |
| 48 | SEP deployments can have multiple ports. |
| 49 | DBASS_CLUSTER_ADDR_LIST = [Comma separated list of DB service addresses]. Is set only if |
| 50 | more than one Redis sentinel groups are in use. Only in use in Nokia SEP deployments. |
Timo Tietavainen | c6c9af2 | 2021-05-11 14:43:09 +0300 | [diff] [blame] | 51 | In official RIC deployments four first environment variables are defined in Helm configMaps |
| 52 | of the DBaaS and these configurations can be loaded automatically as environment variables |
| 53 | into application pods via `envFrom dbaas-appconfig` statement in an application Helm Charts. |
| 54 | The last environment variable is not for time being in use in official RIC deployments, only |
| 55 | in Nokia SEP deployments. |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 56 | """ |
| 57 | |
| 58 | import threading |
| 59 | import time |
| 60 | |
| 61 | from ricsdl.syncstorage import SyncStorage |
| 62 | from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError |
Timo Tietavainen | db77539 | 2021-06-09 05:56:54 +0300 | [diff] [blame] | 63 | from typing import (Union, List) |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 64 | |
| 65 | # There are two available methods for applications to handle notifications: |
| 66 | # - EVENT_LISTENER (true): |
| 67 | # - User calls sdl.start_event_listener() which will create an SDL managed |
| 68 | # event loop for handling messages. |
| 69 | # - EVENT_LISTENER (false): |
Timo Tietavainen | db77539 | 2021-06-09 05:56:54 +0300 | [diff] [blame] | 70 | # - User need to call sdl.handle_events() which will return the message |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 71 | # |
| 72 | # Note: In both cases, the given callback function will be executed. |
| 73 | EVENT_LISTENER = True |
| 74 | |
| 75 | # Constants used in the examples below. |
| 76 | MY_NS = 'my_ns' |
| 77 | MY_CHANNEL = "my_channel" |
| 78 | MY_LOCK = threading.Lock() |
| 79 | |
| 80 | |
| 81 | def _try_func_return(func): |
| 82 | """ |
| 83 | Generic wrapper function to call SDL API function and handle exceptions if they are raised. |
| 84 | """ |
| 85 | try: |
| 86 | return func() |
| 87 | except RejectedByBackend as exp: |
| 88 | print(f'SDL function {func.__name__} failed: {str(exp)}') |
| 89 | # Permanent failure, just forward the exception |
| 90 | raise |
| 91 | except (NotConnected, BackendError) as exp: |
| 92 | print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}') |
| 93 | # Here we could have a retry logic |
| 94 | |
| 95 | |
| 96 | def _try_func_callback_return(func): |
| 97 | """Generic wrapper function for testing SDL APIs with callback functions. |
| 98 | |
| 99 | threading.Lock is unlocked in the callback function and threading.Lock is |
| 100 | only used to demonstrate that the callback function was called. |
| 101 | """ |
| 102 | global MY_LOCK |
| 103 | MY_LOCK.acquire() |
| 104 | ret = _try_func_return(func) |
| 105 | while MY_LOCK.locked(): |
| 106 | time.sleep(0.01) |
| 107 | return ret |
| 108 | |
| 109 | |
| 110 | # Creates SDL instance. The call creates connection to the SDL database backend. |
| 111 | mysdl = _try_func_return(SyncStorage) |
| 112 | |
| 113 | # Stores the last received channel and message |
| 114 | last_cb_channel = "" |
| 115 | last_cb_message = "" |
| 116 | |
| 117 | # Allows program to stop receive thread at the end of execution |
| 118 | stop_thread = False |
| 119 | |
| 120 | |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 121 | def cb(channel: str, message: List[str]): |
| 122 | """An example of function that will be called when an event list is received. |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 123 | |
| 124 | This function sets last_cb_channel and last_cb_message as channel and |
| 125 | message respectively. This also unlocks the global lock variable. |
| 126 | |
| 127 | Args: |
| 128 | channel: Channel where the message was received |
| 129 | message: Received message |
| 130 | """ |
| 131 | global last_cb_channel, last_cb_message, MY_LOCK |
| 132 | last_cb_channel = channel |
| 133 | last_cb_message = message |
| 134 | if MY_LOCK.locked(): |
| 135 | MY_LOCK.release() |
| 136 | |
| 137 | |
| 138 | def listen_thread(): |
| 139 | """An example of a listener thread that continuously calls sdl.handle_events().""" |
| 140 | global mysdl |
| 141 | global stop_thread |
| 142 | while not stop_thread: |
| 143 | message = mysdl.handle_events() |
| 144 | if message: |
| 145 | # You could process message here |
| 146 | pass |
| 147 | time.sleep(0.001) |
| 148 | |
Timo Tietavainen | 6589d4d | 2021-03-09 14:37:14 +0200 | [diff] [blame] | 149 | # Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the |
| 150 | # channel, cb function will be called. |
| 151 | _try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL)) |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 152 | |
| 153 | # As mentioned above, there are two available methods for applications to |
| 154 | # handle notifications |
| 155 | if EVENT_LISTENER: |
| 156 | _try_func_return(mysdl.start_event_listener) |
| 157 | else: |
| 158 | thread = threading.Thread(target=listen_thread) |
| 159 | thread.start() |
| 160 | |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 161 | # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value |
| 162 | # type must be bytes and multiple key values can be set in one set function call. |
| 163 | _try_func_callback_return( |
| 164 | lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'})) |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 165 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 166 | |
Timo Tietavainen | db77539 | 2021-06-09 05:56:54 +0300 | [diff] [blame] | 167 | # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value |
| 168 | # type must be bytes and multiple key values can be set in one set function call. |
| 169 | # Function publishes two events into one channel. |
| 170 | _try_func_callback_return( |
| 171 | lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: ["SET PUBLISH1", "SET PUBLISH2"]}, {'my_key': b'my_value'})) |
| 172 | assert last_cb_channel == MY_CHANNEL and last_cb_message == ["SET PUBLISH1", "SET PUBLISH2"] |
| 173 | |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 174 | # Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is |
| 175 | # 'my_value'. |
| 176 | was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish( |
| 177 | MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2')) |
| 178 | assert was_set is True |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 179 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 180 | # Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2' |
| 181 | # value. Callback function will not be called here. |
| 182 | was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, |
| 183 | 'my_key', b'my_value', b'my_value2')) |
| 184 | assert was_set is False |
| 185 | |
| 186 | # Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist. |
| 187 | # Note that value types must be bytes. |
| 188 | was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish( |
| 189 | MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value')) |
| 190 | assert was_set is True |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 191 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF NOT PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 192 | # Try again. This time the key 'my_key2' already exists. Callback function will not be called here. |
| 193 | was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish( |
| 194 | MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value')) |
| 195 | assert was_set is False |
| 196 | |
| 197 | # Removes a key 'my_key' under given namespace. |
| 198 | _try_func_callback_return( |
| 199 | lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key')) |
| 200 | my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key')) |
| 201 | assert my_ret_dict == {} |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 202 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 203 | |
| 204 | # Removes a key 'my_key' under given namespace only if the old value is 'my_value'. |
| 205 | was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish( |
| 206 | MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value')) |
| 207 | assert was_removed is True |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 208 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE IF PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 209 | # Try again to remove not anymore existing key 'my_key'. Callback function will not be called here. |
| 210 | was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish( |
| 211 | MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value')) |
| 212 | assert was_removed is False |
| 213 | |
| 214 | # Removes all the keys under given namespace. |
| 215 | _try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'})) |
| 216 | my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'})) |
| 217 | assert my_ret_dict != {} |
| 218 | |
| 219 | _try_func_callback_return( |
| 220 | lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"})) |
| 221 | my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'})) |
| 222 | assert my_ret_dict == {} |
Timo Tietavainen | f5af904 | 2021-06-10 15:54:37 +0300 | [diff] [blame] | 223 | assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE ALL PUBLISH" |
Timothy Ebido | a439239 | 2020-08-12 10:49:26 +0900 | [diff] [blame] | 224 | |
| 225 | stop_thread = True |
| 226 | mysdl.close() |