blob: 7c06f157ea63255d3f9c0f696cbaaf812affeb5a [file] [log] [blame]
Rolf Badorek8324d022019-09-17 16:47:20 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include <arpa/inet.h>
Rolf Badorekb7f49712019-09-23 14:14:56 +030018#include <boost/algorithm/string.hpp>
Rolf Badorek8324d022019-09-17 16:47:20 +030019#include <iostream>
20#include <string>
Rolf Badorekb7f49712019-09-23 14:14:56 +030021#include <vector>
Rolf Badorek8324d022019-09-17 16:47:20 +030022#include <sdl/asyncstorage.hpp>
23#include "private/abort.hpp"
24#include "private/hostandport.hpp"
25#include "private/redis/asyncsentineldatabasediscovery.hpp"
26#include "private/redis/asynccommanddispatcher.hpp"
27#include "private/redis/contents.hpp"
28#include "private/redis/contentsbuilder.hpp"
29#include "private/redis/reply.hpp"
30
31using namespace shareddatalayer;
32using namespace shareddatalayer::redis;
33
34namespace
35{
36 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
37 const DatabaseInfo& databaseInfo,
38 std::shared_ptr<ContentsBuilder> contentsBuilder,
Rolf Badorekb7f49712019-09-23 14:14:56 +030039 std::shared_ptr<Logger> logger,
40 bool usePermanentCommandCallbacks);
41
42 struct SubscribeReply
43 {
44 enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
45 Type type;
46 std::string message;
47
48 SubscribeReply(): type(Type::UNKNOWN) { }
49 };
50
51 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
Rolf Badorek8324d022019-09-17 16:47:20 +030052
53 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
Rolf Badorekb7f49712019-09-23 14:14:56 +030054
55 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
Rolf Badorek8324d022019-09-17 16:47:20 +030056}
57
58AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
Rolf Badorek2dcf9402019-10-01 18:33:58 +030059 std::shared_ptr<Logger> logger,
60 const HostAndPort& sentinelAddress,
61 const std::string& sentinelMasterName):
Rolf Badorek8324d022019-09-17 16:47:20 +030062 AsyncSentinelDatabaseDiscovery(engine,
63 logger,
Rolf Badorek2dcf9402019-10-01 18:33:58 +030064 sentinelAddress,
65 sentinelMasterName,
Rolf Badorek8324d022019-09-17 16:47:20 +030066 ::asyncCommandDispatcherCreator,
67 std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
68{
69}
70
71AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
72 std::shared_ptr<Logger> logger,
Rolf Badorek2dcf9402019-10-01 18:33:58 +030073 const HostAndPort& sentinelAddress,
74 const std::string& sentinelMasterName,
Rolf Badorek8324d022019-09-17 16:47:20 +030075 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
76 std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
77 engine(engine),
78 logger(logger),
Rolf Badorek2dcf9402019-10-01 18:33:58 +030079 databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({sentinelAddress}),
Rolf Badorek8324d022019-09-17 16:47:20 +030080 DatabaseInfo::Type::SINGLE,
81 boost::none,
82 DatabaseInfo::Discovery::SENTINEL})),
Rolf Badorek2dcf9402019-10-01 18:33:58 +030083 sentinelMasterName(sentinelMasterName),
Rolf Badorek8324d022019-09-17 16:47:20 +030084 contentsBuilder(contentsBuilder),
Rolf Badorekb7f49712019-09-23 14:14:56 +030085 subscribeRetryTimer(*engine),
86 subscribeRetryTimerDuration(std::chrono::seconds(1)),
Rolf Badorek8324d022019-09-17 16:47:20 +030087 masterInquiryRetryTimer(*engine),
88 masterInquiryRetryTimerDuration(std::chrono::seconds(1))
89{
Rolf Badorekb7f49712019-09-23 14:14:56 +030090 subscriber = asyncCommandDispatcherCreator(*engine,
91 databaseInfo,
92 contentsBuilder,
93 logger,
94 true);
Rolf Badorek8324d022019-09-17 16:47:20 +030095 dispatcher = asyncCommandDispatcherCreator(*engine,
96 databaseInfo,
97 contentsBuilder,
Rolf Badorekb7f49712019-09-23 14:14:56 +030098 logger,
99 false);
100}
101
102AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
103{
104 if (subscriber)
105 subscriber->disableCommandCallbacks();
106 if (dispatcher)
107 dispatcher->disableCommandCallbacks();
108 stateChangedCb = nullptr;
Rolf Badorek8324d022019-09-17 16:47:20 +0300109}
110
111void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
112{
113 stateChangedCb = cb;
Rolf Badorekb7f49712019-09-23 14:14:56 +0300114 subscriber->registerDisconnectCb([this]()
115 {
116 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
117 });
118 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
Rolf Badorek8324d022019-09-17 16:47:20 +0300119}
120
121void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
122{
123 stateChangedCb = nullptr;
124}
125
Rolf Badorekb7f49712019-09-23 14:14:56 +0300126void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
127{
128 subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
129 this,
130 std::placeholders::_1,
131 std::placeholders::_2),
132 "dummyNamespace", // Not meaningful for Sentinel
133 contentsBuilder->build("SUBSCRIBE", "+switch-master"));
134}
135
136void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
137 const Reply& reply)
138{
139 if (!error)
140 {
141 auto subscribeReply = parseSubscribeReply(reply, *logger);
142 if (subscribeReply)
143 {
144 switch (subscribeReply->type)
145 {
146 case (SubscribeReply::Type::SUBSCRIBE_REPLY):
147 {
148 dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
149 break;
150 }
151 case (SubscribeReply::Type::NOTIFICATION):
152 {
153 auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
154 if (hostAndPort)
155 {
156 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
157 DatabaseInfo::Type::SINGLE,
158 boost::none,
159 DatabaseInfo::Discovery::SENTINEL}));
160 if (stateChangedCb)
161 stateChangedCb(databaseInfo);
162 }
163 else
164 SHAREDDATALAYER_ABORT("Notification message parsing error.");
165 break;
166 }
167 case (SubscribeReply::Type::UNKNOWN):
168 {
169 logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
170 SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
171 }
172 }
173 }
174 else
175 SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
176 }
177 else
178 subscribeRetryTimer.arm(
179 subscribeRetryTimerDuration,
180 std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
181}
182
Rolf Badorek8324d022019-09-17 16:47:20 +0300183void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
184{
185 dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
186 this,
187 std::placeholders::_1,
188 std::placeholders::_2),
Rolf Badorekb7f49712019-09-23 14:14:56 +0300189 "dummyNamespace", // Not meaningful for Sentinel
Rolf Badorek2dcf9402019-10-01 18:33:58 +0300190 contentsBuilder->build("SENTINEL", "get-master-addr-by-name", sentinelMasterName));
Rolf Badorek8324d022019-09-17 16:47:20 +0300191}
192
193void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
194 const Reply& reply)
195{
196 if (!error)
197 {
198 auto hostAndPort = parseMasterInquiryReply(reply, *logger);
199 if (hostAndPort)
200 {
201 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
202 DatabaseInfo::Type::SINGLE,
203 boost::none,
204 DatabaseInfo::Discovery::SENTINEL}));
205 if (stateChangedCb)
206 stateChangedCb(databaseInfo);
207 }
208 else
209 SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
210 }
211 else
212 {
213 masterInquiryRetryTimer.arm(
214 masterInquiryRetryTimerDuration,
215 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
216 }
217}
218
219namespace
220{
221 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
222 const DatabaseInfo& databaseInfo,
223 std::shared_ptr<ContentsBuilder> contentsBuilder,
Rolf Badorekb7f49712019-09-23 14:14:56 +0300224 std::shared_ptr<Logger> logger,
225 bool usePermanentCommandCallbacks)
Rolf Badorek8324d022019-09-17 16:47:20 +0300226 {
227 return AsyncCommandDispatcher::create(engine,
228 databaseInfo,
229 contentsBuilder,
Rolf Badorekb7f49712019-09-23 14:14:56 +0300230 usePermanentCommandCallbacks,
Rolf Badorek8324d022019-09-17 16:47:20 +0300231 logger,
232 true);
233 }
234
Rolf Badorekb7f49712019-09-23 14:14:56 +0300235 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
236 {
237 // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
238 auto replyType = reply.getType();
239 if (replyType == Reply::Type::ARRAY)
240 {
241 auto& replyVector(*reply.getArray());
242 auto firstElementType = replyVector[0]->getType();
243 if (firstElementType == Reply::Type::STRING)
244 {
245 auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
246 auto kind(replyVector[0]->getString()->str);
247 if (kind == "subscribe")
248 {
249 subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
250 return subscribeReply;
251 }
252 else if (kind == "message")
253 {
254 subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
255 auto thirdElementType = replyVector[2]->getType();
256 if (thirdElementType == Reply::Type::STRING)
257 {
258 subscribeReply->message = replyVector[2]->getString()->str;
259 return subscribeReply;
260 }
261 else
262 logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
263 }
264 else
265 logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
266 }
267 else
268 logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
269 << static_cast<int>(firstElementType) << std::endl;
270 }
271 else
272 logger.debug() << "Invalid SUBSCRIBE reply type: "
273 << static_cast<int>(replyType) << std::endl;
274 return nullptr;
275 }
276
Rolf Badorek8324d022019-09-17 16:47:20 +0300277 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
278 {
279 auto replyType = reply.getType();
280 if (replyType == Reply::Type::ARRAY)
281 {
282 auto& replyVector(*reply.getArray());
283 auto hostElementType = replyVector[0]->getType();
284 if (hostElementType == Reply::Type::STRING)
285 {
286 auto host(replyVector[0]->getString()->str);
287 auto portElementType = replyVector[1]->getType();
288 if (portElementType == Reply::Type::STRING)
289 {
290 auto port(replyVector[1]->getString()->str);
291 try
292 {
293 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
294 }
295 catch (const std::exception& e)
296 {
297 logger.debug() << "Invalid host or port in master inquiry reply, host: "
298 << host << ", port: " << port
299 << ", exception: " << e.what() << std::endl;
300 }
301 }
302 else
303 logger.debug() << "Invalid port element type in master inquiry reply: "
304 << static_cast<int>(portElementType) << std::endl;
305 }
306 else
307 logger.debug() << "Invalid host element type in master inquiry reply: "
308 << static_cast<int>(hostElementType) << std::endl;
309 }
310 else
311 logger.debug() << "Invalid master inquiry reply type: "
312 << static_cast<int>(replyType) << std::endl;
313 return nullptr;
314 }
Rolf Badorekb7f49712019-09-23 14:14:56 +0300315
316 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
317 {
318 std::vector<std::string> splittedMessage;
319 boost::split(splittedMessage, message, boost::is_any_of(" "));
320 if (splittedMessage.size() == 5)
321 {
322 auto host = splittedMessage[3];
323 auto port = splittedMessage[4];
324 try
325 {
326 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
327 }
328 catch (const std::exception& e)
329 {
330 logger.debug() << "Invalid host or port in notification message, host: "
331 << host << ", port: " << port
332 << ", exception: " << e.what() << std::endl;
333 }
334 }
335 else
336 logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
337 return nullptr;
338 }
Rolf Badorek8324d022019-09-17 16:47:20 +0300339}