blob: 3364497e0f8383e6f635a5b24a3f69adda59311e [file] [log] [blame]
Timo Tietavainendada8462019-11-27 11:50:01 +02001# Copyright (c) 2019 AT&T Intellectual Property.
2# Copyright (c) 2018-2019 Nokia.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16#
17# This source code is part of the near-RT RIC (RAN Intelligent Controller)
18# platform project (RICP).
19#
20
21
22"""The module provides implementation of Shared Data Layer (SDL) database backend interface."""
23import contextlib
24from typing import (Dict, Set, List, Union)
25from redis import Redis
26from redis.sentinel import Sentinel
27from redis.lock import Lock
28from redis._compat import nativestr
29from redis import exceptions as redis_exceptions
30from ricsdl.configuration import _Configuration
31from ricsdl.exceptions import (
32 RejectedByBackend,
33 NotConnected,
34 BackendError
35)
36from .dbbackend_abc import DbBackendAbc
37from .dbbackend_abc import DbBackendLockAbc
38
39
40@contextlib.contextmanager
41def _map_to_sdl_exception():
42 """Translates known redis exceptions into SDL exceptions."""
43 try:
44 yield
45 except(redis_exceptions.ResponseError) as exc:
46 raise RejectedByBackend("SDL backend rejected the request: {}".
47 format(str(exc))) from exc
48 except(redis_exceptions.ConnectionError, redis_exceptions.TimeoutError) as exc:
49 raise NotConnected("SDL not connected to backend: {}".
50 format(str(exc))) from exc
51 except(redis_exceptions.RedisError) as exc:
52 raise BackendError("SDL backend failed to process the request: {}".
53 format(str(exc))) from exc
54
55
56class RedisBackend(DbBackendAbc):
57 """
58 A class providing an implementation of database backend of Shared Data Layer (SDL), when
59 backend database solution is Redis.
60
61 Args:
62 configuration (_Configuration): SDL configuration, containing credentials to connect to
63 Redis database backend.
64 """
65 def __init__(self, configuration: _Configuration) -> None:
66 super().__init__()
67 with _map_to_sdl_exception():
68 if configuration.get_params().db_sentinel_port:
69 sentinel_node = (configuration.get_params().db_host,
70 configuration.get_params().db_sentinel_port)
71 master_name = configuration.get_params().db_sentinel_master_name
72 self.__sentinel = Sentinel([sentinel_node])
73 self.__redis = self.__sentinel.master_for(master_name)
74 else:
75 self.__redis = Redis(host=configuration.get_params().db_host,
76 port=configuration.get_params().db_port,
77 db=0,
78 max_connections=20)
79 self.__redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
80 self.__redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
81
82 def __del__(self):
83 self.close()
84
85 def __str__(self):
86 return str(
87 {
Timo Tietavainen598ca392020-01-08 16:49:11 +020088 "DB type": "Redis",
Timo Tietavainendada8462019-11-27 11:50:01 +020089 "Redis connection": repr(self.__redis)
90 }
91 )
92
Timo Tietavainenc979c0d2020-01-21 21:57:17 +020093 def is_connected(self):
94 with _map_to_sdl_exception():
95 return self.__redis.ping()
96
Timo Tietavainendada8462019-11-27 11:50:01 +020097 def close(self):
98 self.__redis.close()
99
100 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
101 db_data_map = self._add_data_map_ns_prefix(ns, data_map)
102 with _map_to_sdl_exception():
103 self.__redis.mset(db_data_map)
104
105 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
106 db_key = self._add_key_ns_prefix(ns, key)
107 with _map_to_sdl_exception():
108 return self.__redis.execute_command('SETIE', db_key, new_data, old_data)
109
110 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
111 db_key = self._add_key_ns_prefix(ns, key)
112 with _map_to_sdl_exception():
113 return self.__redis.setnx(db_key, data)
114
115 def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
116 ret = dict()
117 db_keys = self._add_keys_ns_prefix(ns, keys)
118 with _map_to_sdl_exception():
119 values = self.__redis.mget(db_keys)
120 for idx, val in enumerate(values):
121 # return only key values, which has a value
122 if val:
123 ret[keys[idx]] = val
124 return ret
125
Timo Tietavainen276ed3c2019-12-15 20:16:23 +0200126 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
127 db_key_pattern = self._add_key_ns_prefix(ns, key_pattern)
Timo Tietavainendada8462019-11-27 11:50:01 +0200128 with _map_to_sdl_exception():
Timo Tietavainen276ed3c2019-12-15 20:16:23 +0200129 ret = self.__redis.keys(db_key_pattern)
Timo Tietavainendada8462019-11-27 11:50:01 +0200130 return self._strip_ns_from_bin_keys(ns, ret)
131
Timo Tietavainen276ed3c2019-12-15 20:16:23 +0200132 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
Timo Tietavainendada8462019-11-27 11:50:01 +0200133 # todo: replace below implementation with redis 'NGET' module
134 ret = dict() # type: Dict[str, bytes]
135 with _map_to_sdl_exception():
Timo Tietavainen276ed3c2019-12-15 20:16:23 +0200136 matched_keys = self.find_keys(ns, key_pattern)
Timo Tietavainendada8462019-11-27 11:50:01 +0200137 if matched_keys:
138 ret = self.get(ns, matched_keys)
139 return ret
140
141 def remove(self, ns: str, keys: List[str]) -> None:
142 db_keys = self._add_keys_ns_prefix(ns, keys)
143 with _map_to_sdl_exception():
144 self.__redis.delete(*db_keys)
145
146 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
147 db_key = self._add_key_ns_prefix(ns, key)
148 with _map_to_sdl_exception():
149 return self.__redis.execute_command('DELIE', db_key, data)
150
151 def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
152 db_key = self._add_key_ns_prefix(ns, group)
153 with _map_to_sdl_exception():
154 self.__redis.sadd(db_key, *members)
155
156 def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
157 db_key = self._add_key_ns_prefix(ns, group)
158 with _map_to_sdl_exception():
159 self.__redis.srem(db_key, *members)
160
161 def remove_group(self, ns: str, group: str) -> None:
162 db_key = self._add_key_ns_prefix(ns, group)
163 with _map_to_sdl_exception():
164 self.__redis.delete(db_key)
165
166 def get_members(self, ns: str, group: str) -> Set[bytes]:
167 db_key = self._add_key_ns_prefix(ns, group)
168 with _map_to_sdl_exception():
169 return self.__redis.smembers(db_key)
170
171 def is_member(self, ns: str, group: str, member: bytes) -> bool:
172 db_key = self._add_key_ns_prefix(ns, group)
173 with _map_to_sdl_exception():
174 return self.__redis.sismember(db_key, member)
175
176 def group_size(self, ns: str, group: str) -> int:
177 db_key = self._add_key_ns_prefix(ns, group)
178 with _map_to_sdl_exception():
179 return self.__redis.scard(db_key)
180
181 @classmethod
182 def _add_key_ns_prefix(cls, ns: str, key: str):
183 return '{' + ns + '},' + key
184
185 @classmethod
186 def _add_keys_ns_prefix(cls, ns: str, keylist: List[str]) -> List[str]:
187 ret_nskeys = []
188 for k in keylist:
189 ret_nskeys.append('{' + ns + '},' + k)
190 return ret_nskeys
191
192 @classmethod
193 def _add_data_map_ns_prefix(cls, ns: str, data_dict: Dict[str, bytes]) -> Dict[str, bytes]:
194 ret_nsdict = {}
195 for key, val in data_dict.items():
196 ret_nsdict['{' + ns + '},' + key] = val
197 return ret_nsdict
198
199 @classmethod
200 def _strip_ns_from_bin_keys(cls, ns: str, nskeylist: List[bytes]) -> List[str]:
201 ret_keys = []
202 for k in nskeylist:
Timo Tietavainen276ed3c2019-12-15 20:16:23 +0200203 try:
204 redis_key = k.decode("utf-8")
205 except UnicodeDecodeError as exc:
206 msg = u'Namespace %s key conversion to string failed: %s' % (ns, str(exc))
207 raise RejectedByBackend(msg)
208 nskey = redis_key.split(',', 1)
Timo Tietavainendada8462019-11-27 11:50:01 +0200209 if len(nskey) != 2:
Timo Tietavainen276ed3c2019-12-15 20:16:23 +0200210 msg = u'Namespace %s key:%s has no namespace prefix' % (ns, redis_key)
Timo Tietavainendada8462019-11-27 11:50:01 +0200211 raise RejectedByBackend(msg)
212 ret_keys.append(nskey[1])
213 return ret_keys
214
Timo Tietavainendada8462019-11-27 11:50:01 +0200215 def get_redis_connection(self):
216 """Return existing Redis database connection."""
217 return self.__redis
218
219
220class RedisBackendLock(DbBackendLockAbc):
221 """
222 A class providing an implementation of database backend lock of Shared Data Layer (SDL), when
223 backend database solution is Redis.
224
225 Args:
226 ns (str): Namespace under which this lock is targeted.
227 name (str): Lock name, identifies the lock key in a Redis database backend.
228 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
229 been released earlier by a 'release' method.
230 redis_backend (RedisBackend): Database backend object containing connection to Redis
231 database.
232 """
233 lua_get_validity_time = None
234 # KEYS[1] - lock name
235 # ARGS[1] - token
236 # return < 0 in case of failure, otherwise return lock validity time in milliseconds.
237 LUA_GET_VALIDITY_TIME_SCRIPT = """
238 local token = redis.call('get', KEYS[1])
239 if not token then
240 return -10
241 end
242 if token ~= ARGV[1] then
243 return -11
244 end
245 return redis.call('pttl', KEYS[1])
246 """
247
248 def __init__(self, ns: str, name: str, expiration: Union[int, float],
249 redis_backend: RedisBackend) -> None:
250 super().__init__(ns, name)
251 self.__redis = redis_backend.get_redis_connection()
252 with _map_to_sdl_exception():
253 redis_lockname = '{' + ns + '},' + self._lock_name
254 self.__redis_lock = Lock(redis=self.__redis, name=redis_lockname, timeout=expiration)
255 self._register_scripts()
256
257 def __str__(self):
258 return str(
259 {
Timo Tietavainen598ca392020-01-08 16:49:11 +0200260 "lock DB type": "Redis",
Timo Tietavainendada8462019-11-27 11:50:01 +0200261 "lock namespace": self._ns,
262 "lock name": self._lock_name,
263 "lock status": self._lock_status_to_string()
264 }
265 )
266
267 def acquire(self, retry_interval: Union[int, float] = 0.1,
268 retry_timeout: Union[int, float] = 10) -> bool:
269 succeeded = False
270 self.__redis_lock.sleep = retry_interval
271 with _map_to_sdl_exception():
272 succeeded = self.__redis_lock.acquire(blocking_timeout=retry_timeout)
273 return succeeded
274
275 def release(self) -> None:
276 with _map_to_sdl_exception():
277 self.__redis_lock.release()
278
279 def refresh(self) -> None:
280 with _map_to_sdl_exception():
281 self.__redis_lock.reacquire()
282
283 def get_validity_time(self) -> Union[int, float]:
284 validity = 0
285 if self.__redis_lock.local.token is None:
286 msg = u'Cannot get validity time of an unlocked lock %s' % self._lock_name
287 raise RejectedByBackend(msg)
288
289 with _map_to_sdl_exception():
290 validity = self.lua_get_validity_time(keys=[self.__redis_lock.name],
291 args=[self.__redis_lock.local.token],
292 client=self.__redis)
293 if validity < 0:
294 msg = (u'Getting validity time of a lock %s failed with error code: %d'
295 % (self._lock_name, validity))
296 raise RejectedByBackend(msg)
297 ftime = validity / 1000.0
298 if ftime.is_integer():
299 return int(ftime)
300 return ftime
301
302 def _register_scripts(self):
303 cls = self.__class__
304 client = self.__redis
305 if cls.lua_get_validity_time is None:
306 cls.lua_get_validity_time = client.register_script(cls.LUA_GET_VALIDITY_TIME_SCRIPT)
307
308 def _lock_status_to_string(self) -> str:
309 try:
310 if self.__redis_lock.locked():
311 if self.__redis_lock.owned():
312 return 'locked'
313 return 'locked by someone else'
314 return 'unlocked'
315 except(redis_exceptions.RedisError) as exc:
316 return f'Error: {str(exc)}'