Rolf Badorek | 8324d02 | 2019-09-17 16:47:20 +0300 | [diff] [blame^] | 1 | /* |
| 2 | Copyright (c) 2018-2019 Nokia. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #include <arpa/inet.h> |
| 18 | #include <iostream> |
| 19 | #include <string> |
| 20 | #include <sdl/asyncstorage.hpp> |
| 21 | #include "private/abort.hpp" |
| 22 | #include "private/hostandport.hpp" |
| 23 | #include "private/redis/asyncsentineldatabasediscovery.hpp" |
| 24 | #include "private/redis/asynccommanddispatcher.hpp" |
| 25 | #include "private/redis/contents.hpp" |
| 26 | #include "private/redis/contentsbuilder.hpp" |
| 27 | #include "private/redis/reply.hpp" |
| 28 | |
| 29 | using namespace shareddatalayer; |
| 30 | using namespace shareddatalayer::redis; |
| 31 | |
| 32 | namespace |
| 33 | { |
| 34 | std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine, |
| 35 | const DatabaseInfo& databaseInfo, |
| 36 | std::shared_ptr<ContentsBuilder> contentsBuilder, |
| 37 | std::shared_ptr<Logger> logger); |
| 38 | |
| 39 | std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger); |
| 40 | } |
| 41 | |
| 42 | AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine, |
| 43 | std::shared_ptr<Logger> logger): |
| 44 | AsyncSentinelDatabaseDiscovery(engine, |
| 45 | logger, |
| 46 | ::asyncCommandDispatcherCreator, |
| 47 | std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR)) |
| 48 | { |
| 49 | } |
| 50 | |
| 51 | AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine, |
| 52 | std::shared_ptr<Logger> logger, |
| 53 | const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator, |
| 54 | std::shared_ptr<redis::ContentsBuilder> contentsBuilder): |
| 55 | engine(engine), |
| 56 | logger(logger), |
| 57 | // @TODO Make configurable. |
| 58 | databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}), |
| 59 | DatabaseInfo::Type::SINGLE, |
| 60 | boost::none, |
| 61 | DatabaseInfo::Discovery::SENTINEL})), |
| 62 | contentsBuilder(contentsBuilder), |
| 63 | masterInquiryRetryTimer(*engine), |
| 64 | masterInquiryRetryTimerDuration(std::chrono::seconds(1)) |
| 65 | { |
| 66 | dispatcher = asyncCommandDispatcherCreator(*engine, |
| 67 | databaseInfo, |
| 68 | contentsBuilder, |
| 69 | logger); |
| 70 | } |
| 71 | |
| 72 | void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb) |
| 73 | { |
| 74 | stateChangedCb = cb; |
| 75 | dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this)); |
| 76 | } |
| 77 | |
| 78 | void AsyncSentinelDatabaseDiscovery::clearStateChangedCb() |
| 79 | { |
| 80 | stateChangedCb = nullptr; |
| 81 | } |
| 82 | |
| 83 | void AsyncSentinelDatabaseDiscovery::sendMasterInquiry() |
| 84 | { |
| 85 | dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck, |
| 86 | this, |
| 87 | std::placeholders::_1, |
| 88 | std::placeholders::_2), |
| 89 | "dummyNamespace", // Not meaningful for SENTINEL commands |
| 90 | contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable |
| 91 | } |
| 92 | |
| 93 | void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error, |
| 94 | const Reply& reply) |
| 95 | { |
| 96 | if (!error) |
| 97 | { |
| 98 | auto hostAndPort = parseMasterInquiryReply(reply, *logger); |
| 99 | if (hostAndPort) |
| 100 | { |
| 101 | auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}), |
| 102 | DatabaseInfo::Type::SINGLE, |
| 103 | boost::none, |
| 104 | DatabaseInfo::Discovery::SENTINEL})); |
| 105 | if (stateChangedCb) |
| 106 | stateChangedCb(databaseInfo); |
| 107 | } |
| 108 | else |
| 109 | SHAREDDATALAYER_ABORT("Master inquiry reply parsing error."); |
| 110 | } |
| 111 | else |
| 112 | { |
| 113 | masterInquiryRetryTimer.arm( |
| 114 | masterInquiryRetryTimerDuration, |
| 115 | std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this)); |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | namespace |
| 120 | { |
| 121 | std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine, |
| 122 | const DatabaseInfo& databaseInfo, |
| 123 | std::shared_ptr<ContentsBuilder> contentsBuilder, |
| 124 | std::shared_ptr<Logger> logger) |
| 125 | { |
| 126 | return AsyncCommandDispatcher::create(engine, |
| 127 | databaseInfo, |
| 128 | contentsBuilder, |
| 129 | false, |
| 130 | logger, |
| 131 | true); |
| 132 | } |
| 133 | |
| 134 | std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger) |
| 135 | { |
| 136 | auto replyType = reply.getType(); |
| 137 | if (replyType == Reply::Type::ARRAY) |
| 138 | { |
| 139 | auto& replyVector(*reply.getArray()); |
| 140 | auto hostElementType = replyVector[0]->getType(); |
| 141 | if (hostElementType == Reply::Type::STRING) |
| 142 | { |
| 143 | auto host(replyVector[0]->getString()->str); |
| 144 | auto portElementType = replyVector[1]->getType(); |
| 145 | if (portElementType == Reply::Type::STRING) |
| 146 | { |
| 147 | auto port(replyVector[1]->getString()->str); |
| 148 | try |
| 149 | { |
| 150 | return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));; |
| 151 | } |
| 152 | catch (const std::exception& e) |
| 153 | { |
| 154 | logger.debug() << "Invalid host or port in master inquiry reply, host: " |
| 155 | << host << ", port: " << port |
| 156 | << ", exception: " << e.what() << std::endl; |
| 157 | } |
| 158 | } |
| 159 | else |
| 160 | logger.debug() << "Invalid port element type in master inquiry reply: " |
| 161 | << static_cast<int>(portElementType) << std::endl; |
| 162 | } |
| 163 | else |
| 164 | logger.debug() << "Invalid host element type in master inquiry reply: " |
| 165 | << static_cast<int>(hostElementType) << std::endl; |
| 166 | } |
| 167 | else |
| 168 | logger.debug() << "Invalid master inquiry reply type: " |
| 169 | << static_cast<int>(replyType) << std::endl; |
| 170 | return nullptr; |
| 171 | } |
| 172 | } |