blob: 22d4b2b969335f4955d7aa76178881bebd8283a6 [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "config.h"
18#include <sstream>
19#include "private/error.hpp"
20#include <sdl/emptynamespace.hpp>
21#include <sdl/invalidnamespace.hpp>
22#include <sdl/publisherid.hpp>
23#include "private/abort.hpp"
24#include "private/createlogger.hpp"
25#include "private/engine.hpp"
26#include "private/logger.hpp"
27#include "private/namespacevalidator.hpp"
28#include "private/configurationreader.hpp"
29#include "private/redis/asynccommanddispatcher.hpp"
30#include "private/redis/asyncdatabasediscovery.hpp"
31#include "private/redis/asyncredisstorage.hpp"
32#include "private/redis/contents.hpp"
33#include "private/redis/contentsbuilder.hpp"
34#include "private/redis/redisgeneral.hpp"
35#include "private/redis/reply.hpp"
36
37using namespace shareddatalayer;
38using namespace shareddatalayer::redis;
39
40/* TODO: This implementation contains lot of duplicated code with old API (asyncRedisConnection).
41 * When this new API is fully ready and tested old API implementation could be changed to utilize this
42 * (bit like sync API utilizes async API).
43 */
44
45namespace
46{
47 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
48 const DatabaseInfo& databaseInfo,
49 std::shared_ptr<ContentsBuilder> contentsBuilder,
50 std::shared_ptr<Logger> logger)
51 {
52 return AsyncCommandDispatcher::create(engine,
53 databaseInfo,
54 contentsBuilder,
55 false,
56 logger);
57 }
58
59 class AsyncRedisStorageErrorCategory: public std::error_category
60 {
61 public:
62 AsyncRedisStorageErrorCategory() = default;
63
64 const char* name() const noexcept override;
65
66 std::string message(int condition) const override;
67
68 std::error_condition default_error_condition(int condition) const noexcept override;
69 };
70
71 const char* AsyncRedisStorageErrorCategory::name() const noexcept
72 {
73 return "asyncredisstorage";
74 }
75
76 std::string AsyncRedisStorageErrorCategory::message(int condition) const
77 {
78 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
79 {
80 case AsyncRedisStorage::ErrorCode::SUCCESS:
81 return std::error_code().message();
82 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
83 return "connection to the underlying data storage not yet available";
84 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
85 return "invalid namespace identifier passed to SDL API";
86 case AsyncRedisStorage::ErrorCode::END_MARKER:
87 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be queried (it is only for enum loop control)");
88 return "unsupported error code for message()";
89 default:
90 return "description missing for AsyncRedisStorageErrorCategory error: " + std::to_string(condition);
91 }
92 }
93
94 std::error_condition AsyncRedisStorageErrorCategory::default_error_condition(int condition) const noexcept
95 {
96 switch (static_cast<AsyncRedisStorage::ErrorCode>(condition))
97 {
98 case AsyncRedisStorage::ErrorCode::SUCCESS:
99 return InternalError::SUCCESS;
100 case AsyncRedisStorage::ErrorCode::REDIS_NOT_YET_DISCOVERED:
101 return InternalError::SDL_NOT_READY;
102 case AsyncRedisStorage::ErrorCode::INVALID_NAMESPACE:
103 return InternalError::SDL_RECEIVED_INVALID_PARAMETER;
104 case AsyncRedisStorage::ErrorCode::END_MARKER:
105 logErrorOnce("AsyncRedisStorage::ErrorCode::END_MARKER is not meant to be mapped to InternalError (it is only for enum loop control)");
106 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
107 default:
108 std::ostringstream msg;
109 msg << "default error condition missing for AsyncRedisStorageErrorCategory error: "
110 << condition;
111 logErrorOnce(msg.str());
112 return InternalError::SDL_ERROR_CODE_LOGIC_ERROR;
113 }
114 }
115
116 AsyncStorage::DataMap buildDataMap(const AsyncStorage::Keys& keys, const Reply::ReplyVector& replyVector)
117 {
118 AsyncStorage::DataMap dataMap;
119 auto i(0U);
120 for (const auto& j : keys)
121 {
122 if (replyVector[i]->getType() == Reply::Type::STRING)
123 {
124 AsyncStorage::Data data;
125 auto dataStr(replyVector[i]->getString());
126 for (ReplyStringLength k(0); k < dataStr->len; ++k)
127 data.push_back(static_cast<uint8_t>(dataStr->str[static_cast<size_t>(k)]));
128 dataMap.insert({ j, data });
129 }
130 ++i;
131 }
132 return dataMap;
133 }
134
135 AsyncStorage::Key getKey(const Reply::DataItem& item)
136 {
137 std::string str(item.str.c_str(), static_cast<size_t>(item.len));
138 auto res(str.find(AsyncRedisStorage::SEPARATOR));
139 return str.substr(res + 1);
140 }
141
142 AsyncStorage::Keys getKeys(const Reply::ReplyVector& replyVector)
143 {
144 AsyncStorage::Keys keys;
145 for (const auto& i : replyVector)
146 {
147 if (i->getType() == Reply::Type::STRING)
148 keys.insert(getKey(*i->getString()));
149 }
150 return keys;
151 }
152
153 void escapeRedisSearchPatternCharacters(std::string& stringToProcess)
154 {
155 const std::string redisSearchPatternCharacters = R"(*?[]\)";
156
157 std::size_t foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters);
158
159 while (foundPosition != std::string::npos)
160 {
161 stringToProcess.insert(foundPosition, R"(\)");
162 foundPosition = stringToProcess.find_first_of(redisSearchPatternCharacters, foundPosition + 2);
163 }
164 }
165}
166
167AsyncRedisStorage::ErrorCode& shareddatalayer::operator++ (AsyncRedisStorage::ErrorCode& ecEnum)
168{
169 if (ecEnum == AsyncRedisStorage::ErrorCode::END_MARKER)
170 throw std::out_of_range("for AsyncRedisStorage::ErrorCode& operator ++");
171 ecEnum = AsyncRedisStorage::ErrorCode(static_cast<std::underlying_type<AsyncRedisStorage::ErrorCode>::type>(ecEnum) + 1);
172 return ecEnum;
173}
174
175std::error_code shareddatalayer::make_error_code(AsyncRedisStorage::ErrorCode errorCode)
176{
177 return std::error_code(static_cast<int>(errorCode), AsyncRedisStorage::errorCategory());
178}
179
180const std::error_category& AsyncRedisStorage::errorCategory() noexcept
181{
182 static const AsyncRedisStorageErrorCategory theAsyncRedisStorageErrorCategory;
183 return theAsyncRedisStorageErrorCategory;
184}
185
186AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
187 std::shared_ptr<AsyncDatabaseDiscovery> discovery,
188 const boost::optional<PublisherId>& pId,
189 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
190 std::shared_ptr<Logger> logger):
191 AsyncRedisStorage(engine,
192 discovery,
193 pId,
194 namespaceConfigurations,
195 ::asyncCommandDispatcherCreator,
196 std::make_shared<redis::ContentsBuilder>(SEPARATOR),
197 logger)
198{
199}
200
201AsyncRedisStorage::AsyncRedisStorage(std::shared_ptr<Engine> engine,
202 std::shared_ptr<redis::AsyncDatabaseDiscovery> discovery,
203 const boost::optional<PublisherId>& pId,
204 std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
205 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
206 std::shared_ptr<redis::ContentsBuilder> contentsBuilder,
207 std::shared_ptr<Logger> logger):
208 engine(engine),
209 dispatcher(nullptr),
210 discovery(discovery),
211 publisherId(pId),
212 asyncCommandDispatcherCreator(asyncCommandDispatcherCreator),
213 contentsBuilder(contentsBuilder),
214 namespaceConfigurations(namespaceConfigurations),
215 logger(logger)
216{
217 if(publisherId && (*publisherId).empty())
218 {
219 throw std::invalid_argument("AsyncRedisStorage: empty publisher ID string given");
220 }
221
222 discovery->setStateChangedCb([this](const redis::DatabaseInfo& databaseInfo)
223 {
224 serviceStateChanged(databaseInfo);
225 });
226}
227
228AsyncRedisStorage::~AsyncRedisStorage()
229{
230 if (discovery)
231 discovery->clearStateChangedCb();
232 if (dispatcher)
233 dispatcher->disableCommandCallbacks();
234}
235
236redis::DatabaseInfo& AsyncRedisStorage::getDatabaseInfo()
237{
238 return dbInfo;
239}
240
241void AsyncRedisStorage::serviceStateChanged(const redis::DatabaseInfo& newDatabaseInfo)
242{
243 dispatcher = asyncCommandDispatcherCreator(*engine,
244 newDatabaseInfo,
245 contentsBuilder,
246 logger);
247 if (readyAck)
248 dispatcher->waitConnectedAsync([this]()
249 {
250 readyAck(std::error_code());
251 readyAck = ReadyAck();
252 });
253 dbInfo = newDatabaseInfo;
254}
255
256int AsyncRedisStorage::fd() const
257{
258 return engine->fd();
259}
260
261void AsyncRedisStorage::handleEvents()
262{
263 engine->handleEvents();
264}
265
266bool AsyncRedisStorage::canOperationBePerformed(const Namespace& ns,
267 boost::optional<bool> noKeysGiven,
268 std::error_code& ecToReturn)
269{
270 if (!::isValidNamespace(ns))
271 {
272 logErrorOnce("Invalid namespace identifier: " + ns + " passed to SDL");
273 ecToReturn = std::error_code(ErrorCode::INVALID_NAMESPACE);
274 return false;
275 }
276 if (noKeysGiven && *noKeysGiven)
277 {
278 ecToReturn = std::error_code();
279 return false;
280 }
281 if (!dispatcher)
282 {
283 ecToReturn = std::error_code(ErrorCode::REDIS_NOT_YET_DISCOVERED);
284 return false;
285 }
286
287 ecToReturn = std::error_code();
288 return true;
289}
290
291void AsyncRedisStorage::waitReadyAsync(const Namespace&,
292 const ReadyAck& readyAck)
293{
294 if (dispatcher)
295 dispatcher->waitConnectedAsync([readyAck]()
296 {
297 readyAck(std::error_code());
298 });
299 else
300 this->readyAck = readyAck;
301}
302
303void AsyncRedisStorage::setAsync(const Namespace& ns,
304 const DataMap& dataMap,
305 const ModifyAck& modifyAck)
306{
307 std::error_code ec;
308
309 if (!canOperationBePerformed(ns, dataMap.empty(), ec))
310 {
311 engine->postCallback(std::bind(modifyAck, ec));
312 return;
313 }
314
315 if (namespaceConfigurations->areNotificationsEnabled(ns))
316 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
317 this,
318 std::placeholders::_1,
319 std::placeholders::_2,
320 modifyAck),
321 ns,
322 contentsBuilder->build("MSETPUB", ns, dataMap, ns, getPublishMessage()));
323 else
324 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
325 this,
326 std::placeholders::_1,
327 std::placeholders::_2,
328 modifyAck),
329 ns,
330 contentsBuilder->build("MSET", ns, dataMap));
331}
332
333void AsyncRedisStorage::modificationCommandCallback(const std::error_code& error,
334 const Reply&,
335 const ModifyAck& modifyAck )
336{
337 modifyAck(error);
338}
339
340void AsyncRedisStorage::conditionalCommandCallback(const std::error_code& error,
341 const Reply& reply,
342 const ModifyIfAck& modifyIfAck)
343{
344 auto type(reply.getType());
345 if (error ||
346 (type == Reply::Type::NIL) || // SETIE(PUB)
347 ((type == Reply::Type::INTEGER) && (reply.getInteger() != 1))) // SETNX(PUB) and DELIE(PUB)
348 modifyIfAck(error, false);
349 else
350 modifyIfAck(error, true);
351}
352
353void AsyncRedisStorage::setIfAsync(const Namespace& ns,
354 const Key& key,
355 const Data& oldData,
356 const Data& newData,
357 const ModifyIfAck& modifyIfAck)
358{
359 std::error_code ec;
360
361 if (!canOperationBePerformed(ns, boost::none, ec))
362 {
363 engine->postCallback(std::bind(modifyIfAck, ec, false));
364 return;
365 }
366
367 if (namespaceConfigurations->areNotificationsEnabled(ns))
368 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
369 this,
370 std::placeholders::_1,
371 std::placeholders::_2,
372 modifyIfAck),
373 ns,
374 contentsBuilder->build("SETIEPUB", ns, key, newData, oldData, ns, getPublishMessage()));
375 else
376 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
377 this,
378 std::placeholders::_1,
379 std::placeholders::_2,
380 modifyIfAck),
381 ns,
382 contentsBuilder->build("SETIE", ns, key, newData, oldData));
383}
384
385void AsyncRedisStorage::removeIfAsync(const Namespace& ns,
386 const Key& key,
387 const Data& data,
388 const ModifyIfAck& modifyIfAck)
389{
390 std::error_code ec;
391
392 if (!canOperationBePerformed(ns, boost::none, ec))
393 {
394 engine->postCallback(std::bind(modifyIfAck, ec, false));
395 return;
396 }
397
398 if (namespaceConfigurations->areNotificationsEnabled(ns))
399 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
400 this,
401 std::placeholders::_1,
402 std::placeholders::_2,
403 modifyIfAck),
404 ns,
405 contentsBuilder->build("DELIEPUB", ns, key, data, ns, getPublishMessage()));
406 else
407 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
408 this,
409 std::placeholders::_1,
410 std::placeholders::_2,
411 modifyIfAck),
412 ns,
413 contentsBuilder->build("DELIE", ns, key, data));
414}
415
416std::string AsyncRedisStorage::getPublishMessage() const
417{
418 if(publisherId)
419 return *publisherId;
420 else
421 return NO_PUBLISHER;
422}
423
424void AsyncRedisStorage::setIfNotExistsAsync(const Namespace& ns,
425 const Key& key,
426 const Data& data,
427 const ModifyIfAck& modifyIfAck)
428{
429 std::error_code ec;
430
431 if (!canOperationBePerformed(ns, boost::none, ec))
432 {
433 engine->postCallback(std::bind(modifyIfAck, ec, false));
434 return;
435 }
436
437 if (namespaceConfigurations->areNotificationsEnabled(ns))
438 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
439 this,
440 std::placeholders::_1,
441 std::placeholders::_2,
442 modifyIfAck),
443 ns,
444 contentsBuilder->build("SETNXPUB", ns, key, data, ns ,getPublishMessage()));
445 else
446 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::conditionalCommandCallback,
447 this,
448 std::placeholders::_1,
449 std::placeholders::_2,
450 modifyIfAck),
451 ns,
452 contentsBuilder->build("SETNX", ns, key, data));
453}
454
455void AsyncRedisStorage::getAsync(const Namespace& ns,
456 const Keys& keys,
457 const GetAck& getAck)
458{
459 std::error_code ec;
460
461 if (!canOperationBePerformed(ns, keys.empty(), ec))
462 {
463 engine->postCallback(std::bind(getAck, ec, DataMap()));
464 return;
465 }
466
467 dispatcher->dispatchAsync([getAck, keys](const std::error_code& error,
468 const Reply& reply)
469 {
470 if (error)
471 getAck(error, DataMap());
472 else
473 getAck(std::error_code(), buildDataMap(keys, *reply.getArray()));
474 },
475 ns,
476 contentsBuilder->build("MGET", ns, keys));
477}
478
479void AsyncRedisStorage::removeAsync(const Namespace& ns,
480 const Keys& keys,
481 const ModifyAck& modifyAck)
482{
483 std::error_code ec;
484
485 if (!canOperationBePerformed(ns, keys.empty(), ec))
486 {
487 engine->postCallback(std::bind(modifyAck, ec));
488 return;
489 }
490
491 if (namespaceConfigurations->areNotificationsEnabled(ns))
492 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
493 this,
494 std::placeholders::_1,
495 std::placeholders::_2,
496 modifyAck),
497 ns,
498 contentsBuilder->build("DELPUB", ns, keys, ns, getPublishMessage()));
499 else
500 dispatcher->dispatchAsync(std::bind(&AsyncRedisStorage::modificationCommandCallback,
501 this,
502 std::placeholders::_1,
503 std::placeholders::_2,
504 modifyAck),
505 ns,
506 contentsBuilder->build("DEL", ns, keys));
507}
508
509void AsyncRedisStorage::findKeysAsync(const Namespace& ns,
510 const std::string& keyPrefix,
511 const FindKeysAck& findKeysAck)
512{
513 //TODO: update to more optimal solution than current KEYS-based one.
514 std::error_code ec;
515
516 if (!canOperationBePerformed(ns, boost::none, ec))
517 {
518 engine->postCallback(std::bind(findKeysAck, ec, Keys()));
519 return;
520 }
521
522 dispatcher->dispatchAsync([findKeysAck](const std::error_code& error, const Reply& reply)
523 {
524 if (error)
525 findKeysAck(error, Keys());
526 else
527 findKeysAck(std::error_code(), getKeys(*reply.getArray()));
528 },
529 ns,
530 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, keyPrefix)));
531}
532
533void AsyncRedisStorage::removeAllAsync(const Namespace& ns,
534 const ModifyAck& modifyAck)
535{
536 std::error_code ec;
537
538 if (!canOperationBePerformed(ns, boost::none, ec))
539 {
540 engine->postCallback(std::bind(modifyAck, ec));
541 return;
542 }
543
544 dispatcher->dispatchAsync([this, modifyAck, ns](const std::error_code& error, const Reply& reply)
545 {
546 if (error)
547 {
548 modifyAck(error);
549 return;
550 }
551 const auto& array(*reply.getArray());
552 if (array.empty())
553 modifyAck(std::error_code());
554 else
555 {
556 removeAsync(ns, getKeys(array), modifyAck);
557 }
558 },
559 ns,
560 contentsBuilder->build("KEYS", buildKeyPrefixSearchPattern(ns, "")));
561}
562
563std::string AsyncRedisStorage::buildKeyPrefixSearchPattern(const Namespace& ns, const std::string& keyPrefix) const
564{
565 std::string escapedKeyPrefix = keyPrefix;
566 escapeRedisSearchPatternCharacters(escapedKeyPrefix);
567 std::ostringstream oss;
568 oss << '{' << ns << '}' << SEPARATOR << escapedKeyPrefix << "*";
569 return oss.str();
570}