blob: 482d6d7456df497a50efaaa9f8e3685986482a4c [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
Timo Tietavainena0745d22019-11-28 09:55:22 +020017/*
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
20*/
21
Rolf Badorekef2bf512019-08-20 11:17:15 +030022#include "config.h"
23#include <sstream>
24#include "private/error.hpp"
25#include <sdl/emptynamespace.hpp>
26#include <sdl/invalidnamespace.hpp>
27#include <sdl/publisherid.hpp>
28#include "private/abort.hpp"
29#include "private/createlogger.hpp"
30#include "private/engine.hpp"
31#include "private/logger.hpp"
32#include "private/namespacevalidator.hpp"
33#include "private/configurationreader.hpp"
34#include "private/redis/asynccommanddispatcher.hpp"
35#include "private/redis/asyncdatabasediscovery.hpp"
36#include "private/redis/asyncredisstorage.hpp"
37#include "private/redis/contents.hpp"
38#include "private/redis/contentsbuilder.hpp"
39#include "private/redis/redisgeneral.hpp"
40#include "private/redis/reply.hpp"
41
42using namespace shareddatalayer;
43using namespace shareddatalayer::redis;
44
45/* TODO: This implementation contains lot of duplicated code with old API (asyncRedisConnection).
46 * When this new API is fully ready and tested old API implementation could be changed to utilize this
47 * (bit like sync API utilizes async API).
48 */
49
50namespace
51{
52 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
53 const DatabaseInfo& databaseInfo,
54 std::shared_ptr<ContentsBuilder> contentsBuilder,
55 std::shared_ptr<Logger> logger)
56 {
57 return AsyncCommandDispatcher::create(engine,
58 databaseInfo,
59 contentsBuilder,
60 false,
Rolf Badorek8324d022019-09-17 16:47:20 +030061 logger,
62 false);
Rolf Badorekef2bf512019-08-20 11:17:15 +030063 }
64
65 class AsyncRedisStorageErrorCategory: public std::error_category
66 {
67 public:
68 AsyncRedisStorageErrorCategory() = default;
69
70 const char* name() const noexcept override;
71
72 std::string message(int condition) const override;
73
74 std::error_condition default_error_condition(int condition) const noexcept override;
75 };
76
77 const char* AsyncRedisStorageErrorCategory::name() const noexcept
78 {
79 return "asyncredisstorage";
80 }
81
82 std::string AsyncRedisStorageErrorCategory::message(int condition) const
83 {
84 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
85 {
86 case AsyncRedisStorage::ErrorCode::SUCCESS:
87 return std::error_code().message();
88 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
89 return "connection to the underlying data storage not yet available";
90 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
91 return "invalid namespace identifier passed to SDL API";
92 case AsyncRedisStorage::ErrorCode::END_MARKER:
93 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
94 return "unsupported error code for message()";
95 default:
96 return "description missing for AsyncRedisStorageErrorCategory error: " + std::to_string(condition);
97 }
98 }
99
100 std::error_condition AsyncRedisStorageErrorCategory::default_error_condition(int condition) const noexcept
101 {
102 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
103 {
104 case AsyncRedisStorage::ErrorCode::SUCCESS:
105 return InternalError::SUCCESS;
106 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
107 return InternalError::SDL_NOT_READY;
108 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
109 return InternalError::SDL_RECEIVED_INVALID_PARAMETER;
110 case AsyncRedisStorage::ErrorCode::END_MARKER:
111 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
112 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
113 default:
114 std::ostringstream msg;
115 msg << "default error condition missing for AsyncRedisStorageErrorCategory error: "
116 << condition;
117 logErrorOnce(msg.str());
118 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
119 }
120 }
121
122 AsyncStorage::DataMap buildDataMap(const AsyncStorage::Keys& keys, const Reply::ReplyVector& replyVector)
123 {
124 AsyncStorage::DataMap dataMap;
125 auto i(0U);
126 for (const auto& j : keys)
127 {
128 if (replyVector[i]->getType() == Reply::Type::STRING)
129 {
130 AsyncStorage::Data data;
131 auto dataStr(replyVector[i]->getString());
132 for (ReplyStringLength k(0); k < dataStr->len; ++k)
133 data.push_back(static_cast<uint8_t>(dataStr->str[static_cast<size_t>(k)]));
134 dataMap.insert({ j, data });
135 }
136 ++i;
137 }
138 return dataMap;
139 }
140
141 AsyncStorage::Key getKey(const Reply::DataItem& item)
142 {
143 std::string str(item.str.c_str(), static_cast<size_t>(item.len));
144 auto res(str.find(AsyncRedisStorage::SEPARATOR));
145 return str.substr(res + 1);
146 }
147
148 AsyncStorage::Keys getKeys(const Reply::ReplyVector& replyVector)
149 {
150 AsyncStorage::Keys keys;
151 for (const auto& i : replyVector)
152 {
153 if (i->getType() == Reply::Type::STRING)
154 keys.insert(getKey(*i->getString()));
155 }
156 return keys;
157 }
158
159 void escapeRedisSearchPatternCharacters(std::string& stringToProcess)
160 {
161 const std::string redisSearchPatternCharacters = R"(*?[]\)";
162
163 std::size_t foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters);
164
165 while (foundPosition != std::string::npos)
166 {
167 stringToProcess.insert(foundPosition, R"(\)");
168 foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters, foundPosition + 2);
169 }
170 }
171}
172
173AsyncRedisStorage::ErrorCode& shareddatalayer::operator++ (AsyncRedisStorage::ErrorCode& ecEnum)
174{
175 if (ecEnum == AsyncRedisStorage::ErrorCode::END_MARKER)
176 throw std::out_of_range("for AsyncRedisStorage::ErrorCode& operator ++");
177 ecEnum = AsyncRedisStorage::ErrorCode(static_cast<std::underlying_type<AsyncRedisStorage::ErrorCode>::type>(ecEnum) + 1);
178 return ecEnum;
179}
180
181std::error_code shareddatalayer::make_error_code(AsyncRedisStorage::ErrorCode errorCode)
182{
183 return std::error_code(static_cast<int>(errorCode), AsyncRedisStorage::errorCategory());
184}
185
186const std::error_category& AsyncRedisStorage::errorCategory() noexcept
187{
188 static const AsyncRedisStorageErrorCategory theAsyncRedisStorageErrorCategory;
189 return theAsyncRedisStorageErrorCategory;
190}
191
192AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
193 std::shared_ptr<AsyncDatabaseDiscovery> discovery,
194 const boost::optional<PublisherId>& pId,
195 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
196 std::shared_ptr<Logger> logger):
197 AsyncRedisStorage(engine,
198 discovery,
199 pId,
200 namespaceConfigurations,
201 ::asyncCommandDispatcherCreator,
202 std::make_shared<redis::ContentsBuilder>(SEPARATOR),
203 logger)
204{
205}
206
207AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
208 std::shared_ptr<redis::AsyncDatabaseDiscovery> discovery,
209 const boost::optional<PublisherId>& pId,
210 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
211 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
212 std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
213 std::shared_ptr<Logger> logger):
214 engine(engine),
215 dispatcher(nullptr),
216 discovery(discovery),
217 publisherId(pId),
218 asyncCommandDispatcherCreator(asyncCommandDispatcherCreator),
219 contentsBuilder(contentsBuilder),
220 namespaceConfigurations(namespaceConfigurations),
221 logger(logger)
222{
223 if(publisherId && (*publisherId).empty())
224 {
225 throw std::invalid_argument("AsyncRedisStorage: empty publisher ID string given");
226 }
227
228 discovery->setStateChangedCb([this](const redis::DatabaseInfo& databaseInfo)
229 {
230 serviceStateChanged(databaseInfo);
231 });
232}
233
234AsyncRedisStorage::~AsyncRedisStorage()
235{
236 if (discovery)
237 discovery->clearStateChangedCb();
238 if (dispatcher)
239 dispatcher->disableCommandCallbacks();
240}
241
242redis::DatabaseInfo& AsyncRedisStorage::getDatabaseInfo()
243{
244 return dbInfo;
245}
246
247void AsyncRedisStorage::serviceStateChanged(const redis::DatabaseInfo& newDatabaseInfo)
248{
249 dispatcher = asyncCommandDispatcherCreator(*engine,
250 newDatabaseInfo,
251 contentsBuilder,
252 logger);
253 if (readyAck)
254 dispatcher->waitConnectedAsync([this]()
255 {
256 readyAck(std::error_code());
257 readyAck = ReadyAck();
258 });
259 dbInfo = newDatabaseInfo;
260}
261
262int AsyncRedisStorage::fd() const
263{
264 return engine->fd();
265}
266
267void AsyncRedisStorage::handleEvents()
268{
269 engine->handleEvents();
270}
271
272bool AsyncRedisStorage::canOperationBePerformed(const Namespace& ns,
273 boost::optional<bool> noKeysGiven,
274 std::error_code& ecToReturn)
275{
276 if (!::isValidNamespace(ns))
277 {
278 logErrorOnce("Invalid namespace identifier: " + ns + " passed to SDL");
279 ecToReturn = std::error_code(ErrorCode::INVALID_NAMESPACE);
280 return false;
281 }
282 if (noKeysGiven && *noKeysGiven)
283 {
284 ecToReturn = std::error_code();
285 return false;
286 }
287 if (!dispatcher)
288 {
289 ecToReturn = std::error_code(ErrorCode::REDIS_NOT_YET_DISCOVERED);
290 return false;
291 }
292
293 ecToReturn = std::error_code();
294 return true;
295}
296
297void AsyncRedisStorage::waitReadyAsync(const Namespace&,
298 const ReadyAck& readyAck)
299{
300 if (dispatcher)
301 dispatcher->waitConnectedAsync([readyAck]()
302 {
303 readyAck(std::error_code());
304 });
305 else
306 this->readyAck = readyAck;
307}
308
309void AsyncRedisStorage::setAsync(const Namespace& ns,
310 const DataMap& dataMap,
311 const ModifyAck& modifyAck)
312{
313 std::error_code ec;
314
315 if (!canOperationBePerformed(ns, dataMap.empty(), ec))
316 {
317 engine->postCallback(std::bind(modifyAck, ec));
318 return;
319 }
320
321 if (namespaceConfigurations->areNotificationsEnabled(ns))
322 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
323 this,
324 std::placeholders::_1,
325 std::placeholders::_2,
326 modifyAck),
327 ns,
328 contentsBuilder->build("MSETPUB", ns, dataMap, ns, getPublishMessage()));
329 else
330 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
331 this,
332 std::placeholders::_1,
333 std::placeholders::_2,
334 modifyAck),
335 ns,
336 contentsBuilder->build("MSET", ns, dataMap));
337}
338
339void AsyncRedisStorage::modificationCommandCallback(const std::error_code& error,
340 const Reply&,
341 const ModifyAck& modifyAck )
342{
343 modifyAck(error);
344}
345
346void AsyncRedisStorage::conditionalCommandCallback(const std::error_code& error,
347 const Reply& reply,
348 const ModifyIfAck& modifyIfAck)
349{
350 auto type(reply.getType());
351 if (error ||
352 (type == Reply::Type::NIL) || // SETIE(PUB)
353 ((type == Reply::Type::INTEGER) && (reply.getInteger() != 1))) // SETNX(PUB) and DELIE(PUB)
354 modifyIfAck(error, false);
355 else
356 modifyIfAck(error, true);
357}
358
359void AsyncRedisStorage::setIfAsync(const Namespace& ns,
360 const Key& key,
361 const Data& oldData,
362 const Data& newData,
363 const ModifyIfAck& modifyIfAck)
364{
365 std::error_code ec;
366
367 if (!canOperationBePerformed(ns, boost::none, ec))
368 {
369 engine->postCallback(std::bind(modifyIfAck, ec, false));
370 return;
371 }
372
373 if (namespaceConfigurations->areNotificationsEnabled(ns))
374 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
375 this,
376 std::placeholders::_1,
377 std::placeholders::_2,
378 modifyIfAck),
379 ns,
380 contentsBuilder->build("SETIEPUB", ns, key, newData, oldData, ns, getPublishMessage()));
381 else
382 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
383 this,
384 std::placeholders::_1,
385 std::placeholders::_2,
386 modifyIfAck),
387 ns,
388 contentsBuilder->build("SETIE", ns, key, newData, oldData));
389}
390
391void AsyncRedisStorage::removeIfAsync(const Namespace& ns,
392 const Key& key,
393 const Data& data,
394 const ModifyIfAck& modifyIfAck)
395{
396 std::error_code ec;
397
398 if (!canOperationBePerformed(ns, boost::none, ec))
399 {
400 engine->postCallback(std::bind(modifyIfAck, ec, false));
401 return;
402 }
403
404 if (namespaceConfigurations->areNotificationsEnabled(ns))
405 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
406 this,
407 std::placeholders::_1,
408 std::placeholders::_2,
409 modifyIfAck),
410 ns,
411 contentsBuilder->build("DELIEPUB", ns, key, data, ns, getPublishMessage()));
412 else
413 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
414 this,
415 std::placeholders::_1,
416 std::placeholders::_2,
417 modifyIfAck),
418 ns,
419 contentsBuilder->build("DELIE", ns, key, data));
420}
421
422std::string AsyncRedisStorage::getPublishMessage() const
423{
424 if(publisherId)
425 return *publisherId;
426 else
427 return NO_PUBLISHER;
428}
429
430void AsyncRedisStorage::setIfNotExistsAsync(const Namespace& ns,
431 const Key& key,
432 const Data& data,
433 const ModifyIfAck& modifyIfAck)
434{
435 std::error_code ec;
436
437 if (!canOperationBePerformed(ns, boost::none, ec))
438 {
439 engine->postCallback(std::bind(modifyIfAck, ec, false));
440 return;
441 }
442
443 if (namespaceConfigurations->areNotificationsEnabled(ns))
444 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
445 this,
446 std::placeholders::_1,
447 std::placeholders::_2,
448 modifyIfAck),
449 ns,
450 contentsBuilder->build("SETNXPUB", ns, key, data, ns ,getPublishMessage()));
451 else
452 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
453 this,
454 std::placeholders::_1,
455 std::placeholders::_2,
456 modifyIfAck),
457 ns,
458 contentsBuilder->build("SETNX", ns, key, data));
459}
460
461void AsyncRedisStorage::getAsync(const Namespace& ns,
462 const Keys& keys,
463 const GetAck& getAck)
464{
465 std::error_code ec;
466
467 if (!canOperationBePerformed(ns, keys.empty(), ec))
468 {
469 engine->postCallback(std::bind(getAck, ec, DataMap()));
470 return;
471 }
472
473 dispatcher->dispatchAsync([getAck, keys](const std::error_code& error,
474 const Reply& reply)
475 {
476 if (error)
477 getAck(error, DataMap());
478 else
479 getAck(std::error_code(), buildDataMap(keys, *reply.getArray()));
480 },
481 ns,
482 contentsBuilder->build("MGET", ns, keys));
483}
484
485void AsyncRedisStorage::removeAsync(const Namespace& ns,
486 const Keys& keys,
487 const ModifyAck& modifyAck)
488{
489 std::error_code ec;
490
491 if (!canOperationBePerformed(ns, keys.empty(), ec))
492 {
493 engine->postCallback(std::bind(modifyAck, ec));
494 return;
495 }
496
497 if (namespaceConfigurations->areNotificationsEnabled(ns))
498 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
499 this,
500 std::placeholders::_1,
501 std::placeholders::_2,
502 modifyAck),
503 ns,
504 contentsBuilder->build("DELPUB", ns, keys, ns, getPublishMessage()));
505 else
506 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
507 this,
508 std::placeholders::_1,
509 std::placeholders::_2,
510 modifyAck),
511 ns,
512 contentsBuilder->build("DEL", ns, keys));
513}
514
515void AsyncRedisStorage::findKeysAsync(const Namespace& ns,
516 const std::string& keyPrefix,
517 const FindKeysAck& findKeysAck)
518{
519 //TODO: update to more optimal solution than current KEYS-based one.
520 std::error_code ec;
521
522 if (!canOperationBePerformed(ns, boost::none, ec))
523 {
524 engine->postCallback(std::bind(findKeysAck, ec, Keys()));
525 return;
526 }
527
528 dispatcher->dispatchAsync([findKeysAck](const std::error_code& error, const Reply& reply)
529 {
530 if (error)
531 findKeysAck(error, Keys());
532 else
533 findKeysAck(std::error_code(), getKeys(*reply.getArray()));
534 },
535 ns,
536 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, keyPrefix)));
537}
538
539void AsyncRedisStorage::removeAllAsync(const Namespace& ns,
540 const ModifyAck& modifyAck)
541{
542 std::error_code ec;
543
544 if (!canOperationBePerformed(ns, boost::none, ec))
545 {
546 engine->postCallback(std::bind(modifyAck, ec));
547 return;
548 }
549
550 dispatcher->dispatchAsync([this, modifyAck, ns](const std::error_code& error, const Reply& reply)
551 {
552 if (error)
553 {
554 modifyAck(error);
555 return;
556 }
557 const auto& array(*reply.getArray());
558 if (array.empty())
559 modifyAck(std::error_code());
560 else
561 {
562 removeAsync(ns, getKeys(array), modifyAck);
563 }
564 },
565 ns,
566 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, "")));
567}
568
569std::string AsyncRedisStorage::buildKeyPrefixSearchPattern(const Namespace& ns, const std::string& keyPrefix) const
570{
571 std::string escapedKeyPrefix = keyPrefix;
572 escapeRedisSearchPatternCharacters(escapedKeyPrefix);
573 std::ostringstream oss;
574 oss << '{' << ns << '}' << SEPARATOR << escapedKeyPrefix << "*";
575 return oss.str();
576}