blob: 3d444dbe1bb83182d985a1c32625eb750f4710cb [file] [log] [blame]
Rolf Badorek8324d022019-09-17 16:47:20 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
Timo Tietavainena0745d22019-11-28 09:55:22 +020017/*
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
20*/
21
Rolf Badorek8324d022019-09-17 16:47:20 +030022#include <arpa/inet.h>
Rolf Badorekb7f49712019-09-23 14:14:56 +030023#include <boost/algorithm/string.hpp>
Rolf Badorek8324d022019-09-17 16:47:20 +030024#include <iostream>
25#include <string>
Rolf Badorekb7f49712019-09-23 14:14:56 +030026#include <vector>
Rolf Badorek8324d022019-09-17 16:47:20 +030027#include <sdl/asyncstorage.hpp>
28#include "private/abort.hpp"
29#include "private/hostandport.hpp"
30#include "private/redis/asyncsentineldatabasediscovery.hpp"
31#include "private/redis/asynccommanddispatcher.hpp"
32#include "private/redis/contents.hpp"
33#include "private/redis/contentsbuilder.hpp"
34#include "private/redis/reply.hpp"
35
36using namespace shareddatalayer;
37using namespace shareddatalayer::redis;
38
39namespace
40{
41 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
42 const DatabaseInfo& databaseInfo,
43 std::shared_ptr<ContentsBuilder> contentsBuilder,
Rolf Badorekb7f49712019-09-23 14:14:56 +030044 std::shared_ptr<Logger> logger,
45 bool usePermanentCommandCallbacks);
46
47 struct SubscribeReply
48 {
49 enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
50 Type type;
51 std::string message;
52
53 SubscribeReply(): type(Type::UNKNOWN) { }
54 };
55
56 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
Rolf Badorek8324d022019-09-17 16:47:20 +030057
58 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
Rolf Badorekb7f49712019-09-23 14:14:56 +030059
60 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
Rolf Badorek8324d022019-09-17 16:47:20 +030061}
62
63AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
Rolf Badorek2dcf9402019-10-01 18:33:58 +030064 std::shared_ptr<Logger> logger,
65 const HostAndPort& sentinelAddress,
66 const std::string& sentinelMasterName):
Rolf Badorek8324d022019-09-17 16:47:20 +030067 AsyncSentinelDatabaseDiscovery(engine,
68 logger,
Rolf Badorek2dcf9402019-10-01 18:33:58 +030069 sentinelAddress,
70 sentinelMasterName,
Rolf Badorek8324d022019-09-17 16:47:20 +030071 ::asyncCommandDispatcherCreator,
72 std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
73{
74}
75
76AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
77 std::shared_ptr<Logger> logger,
Rolf Badorek2dcf9402019-10-01 18:33:58 +030078 const HostAndPort& sentinelAddress,
79 const std::string& sentinelMasterName,
Rolf Badorek8324d022019-09-17 16:47:20 +030080 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
81 std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
82 engine(engine),
83 logger(logger),
Rolf Badorek2dcf9402019-10-01 18:33:58 +030084 databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({sentinelAddress}),
Rolf Badorek8324d022019-09-17 16:47:20 +030085 DatabaseInfo::Type::SINGLE,
86 boost::none,
87 DatabaseInfo::Discovery::SENTINEL})),
Rolf Badorek2dcf9402019-10-01 18:33:58 +030088 sentinelMasterName(sentinelMasterName),
Rolf Badorek8324d022019-09-17 16:47:20 +030089 contentsBuilder(contentsBuilder),
Rolf Badorekb7f49712019-09-23 14:14:56 +030090 subscribeRetryTimer(*engine),
91 subscribeRetryTimerDuration(std::chrono::seconds(1)),
Rolf Badorek8324d022019-09-17 16:47:20 +030092 masterInquiryRetryTimer(*engine),
93 masterInquiryRetryTimerDuration(std::chrono::seconds(1))
94{
Rolf Badorekb7f49712019-09-23 14:14:56 +030095 subscriber = asyncCommandDispatcherCreator(*engine,
96 databaseInfo,
97 contentsBuilder,
98 logger,
99 true);
Rolf Badorek8324d022019-09-17 16:47:20 +0300100 dispatcher = asyncCommandDispatcherCreator(*engine,
101 databaseInfo,
102 contentsBuilder,
Rolf Badorekb7f49712019-09-23 14:14:56 +0300103 logger,
104 false);
105}
106
107AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
108{
109 if (subscriber)
110 subscriber->disableCommandCallbacks();
111 if (dispatcher)
112 dispatcher->disableCommandCallbacks();
113 stateChangedCb = nullptr;
Rolf Badorek8324d022019-09-17 16:47:20 +0300114}
115
116void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
117{
118 stateChangedCb = cb;
Rolf Badorekb7f49712019-09-23 14:14:56 +0300119 subscriber->registerDisconnectCb([this]()
120 {
121 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
122 });
123 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
Rolf Badorek8324d022019-09-17 16:47:20 +0300124}
125
126void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
127{
128 stateChangedCb = nullptr;
129}
130
Rolf Badorekb7f49712019-09-23 14:14:56 +0300131void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
132{
133 subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
134 this,
135 std::placeholders::_1,
136 std::placeholders::_2),
137 "dummyNamespace", // Not meaningful for Sentinel
138 contentsBuilder->build("SUBSCRIBE", "+switch-master"));
139}
140
141void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
142 const Reply& reply)
143{
144 if (!error)
145 {
146 auto subscribeReply = parseSubscribeReply(reply, *logger);
147 if (subscribeReply)
148 {
149 switch (subscribeReply->type)
150 {
151 case (SubscribeReply::Type::SUBSCRIBE_REPLY):
152 {
153 dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
154 break;
155 }
156 case (SubscribeReply::Type::NOTIFICATION):
157 {
158 auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
159 if (hostAndPort)
160 {
161 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
162 DatabaseInfo::Type::SINGLE,
163 boost::none,
164 DatabaseInfo::Discovery::SENTINEL}));
165 if (stateChangedCb)
166 stateChangedCb(databaseInfo);
167 }
168 else
169 SHAREDDATALAYER_ABORT("Notification message parsing error.");
170 break;
171 }
172 case (SubscribeReply::Type::UNKNOWN):
173 {
174 logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
175 SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
176 }
177 }
178 }
179 else
180 SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
181 }
182 else
183 subscribeRetryTimer.arm(
184 subscribeRetryTimerDuration,
185 std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
186}
187
Rolf Badorek8324d022019-09-17 16:47:20 +0300188void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
189{
190 dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
191 this,
192 std::placeholders::_1,
193 std::placeholders::_2),
Rolf Badorekb7f49712019-09-23 14:14:56 +0300194 "dummyNamespace", // Not meaningful for Sentinel
Rolf Badorek2dcf9402019-10-01 18:33:58 +0300195 contentsBuilder->build("SENTINEL", "get-master-addr-by-name", sentinelMasterName));
Rolf Badorek8324d022019-09-17 16:47:20 +0300196}
197
198void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
199 const Reply& reply)
200{
201 if (!error)
202 {
203 auto hostAndPort = parseMasterInquiryReply(reply, *logger);
204 if (hostAndPort)
205 {
206 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
207 DatabaseInfo::Type::SINGLE,
208 boost::none,
209 DatabaseInfo::Discovery::SENTINEL}));
210 if (stateChangedCb)
211 stateChangedCb(databaseInfo);
212 }
213 else
214 SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
215 }
216 else
217 {
218 masterInquiryRetryTimer.arm(
219 masterInquiryRetryTimerDuration,
220 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
221 }
222}
223
224namespace
225{
226 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
227 const DatabaseInfo& databaseInfo,
228 std::shared_ptr<ContentsBuilder> contentsBuilder,
Rolf Badorekb7f49712019-09-23 14:14:56 +0300229 std::shared_ptr<Logger> logger,
230 bool usePermanentCommandCallbacks)
Rolf Badorek8324d022019-09-17 16:47:20 +0300231 {
232 return AsyncCommandDispatcher::create(engine,
233 databaseInfo,
234 contentsBuilder,
Rolf Badorekb7f49712019-09-23 14:14:56 +0300235 usePermanentCommandCallbacks,
Rolf Badorek8324d022019-09-17 16:47:20 +0300236 logger,
237 true);
238 }
239
Rolf Badorekb7f49712019-09-23 14:14:56 +0300240 std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
241 {
242 // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
243 auto replyType = reply.getType();
244 if (replyType == Reply::Type::ARRAY)
245 {
246 auto& replyVector(*reply.getArray());
247 auto firstElementType = replyVector[0]->getType();
248 if (firstElementType == Reply::Type::STRING)
249 {
250 auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
251 auto kind(replyVector[0]->getString()->str);
252 if (kind == "subscribe")
253 {
254 subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
255 return subscribeReply;
256 }
257 else if (kind == "message")
258 {
259 subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
260 auto thirdElementType = replyVector[2]->getType();
261 if (thirdElementType == Reply::Type::STRING)
262 {
263 subscribeReply->message = replyVector[2]->getString()->str;
264 return subscribeReply;
265 }
266 else
267 logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
268 }
269 else
270 logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
271 }
272 else
273 logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
274 << static_cast<int>(firstElementType) << std::endl;
275 }
276 else
277 logger.debug() << "Invalid SUBSCRIBE reply type: "
278 << static_cast<int>(replyType) << std::endl;
279 return nullptr;
280 }
281
Rolf Badorek8324d022019-09-17 16:47:20 +0300282 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
283 {
284 auto replyType = reply.getType();
285 if (replyType == Reply::Type::ARRAY)
286 {
287 auto& replyVector(*reply.getArray());
288 auto hostElementType = replyVector[0]->getType();
289 if (hostElementType == Reply::Type::STRING)
290 {
291 auto host(replyVector[0]->getString()->str);
292 auto portElementType = replyVector[1]->getType();
293 if (portElementType == Reply::Type::STRING)
294 {
295 auto port(replyVector[1]->getString()->str);
296 try
297 {
298 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
299 }
300 catch (const std::exception& e)
301 {
302 logger.debug() << "Invalid host or port in master inquiry reply, host: "
303 << host << ", port: " << port
304 << ", exception: " << e.what() << std::endl;
305 }
306 }
307 else
308 logger.debug() << "Invalid port element type in master inquiry reply: "
309 << static_cast<int>(portElementType) << std::endl;
310 }
311 else
312 logger.debug() << "Invalid host element type in master inquiry reply: "
313 << static_cast<int>(hostElementType) << std::endl;
314 }
315 else
316 logger.debug() << "Invalid master inquiry reply type: "
317 << static_cast<int>(replyType) << std::endl;
318 return nullptr;
319 }
Rolf Badorekb7f49712019-09-23 14:14:56 +0300320
321 std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
322 {
323 std::vector<std::string> splittedMessage;
324 boost::split(splittedMessage, message, boost::is_any_of(" "));
325 if (splittedMessage.size() == 5)
326 {
327 auto host = splittedMessage[3];
328 auto port = splittedMessage[4];
329 try
330 {
331 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
332 }
333 catch (const std::exception& e)
334 {
335 logger.debug() << "Invalid host or port in notification message, host: "
336 << host << ", port: " << port
337 << ", exception: " << e.what() << std::endl;
338 }
339 }
340 else
341 logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
342 return nullptr;
343 }
Rolf Badorek8324d022019-09-17 16:47:20 +0300344}