blob: 05815e24e53e5a8925a999554aa7479a5f941178 [file] [log] [blame]
Rolf Badorek8324d022019-09-17 16:47:20 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include <arpa/inet.h>
18#include <iostream>
19#include <string>
20#include <sdl/asyncstorage.hpp>
21#include "private/abort.hpp"
22#include "private/hostandport.hpp"
23#include "private/redis/asyncsentineldatabasediscovery.hpp"
24#include "private/redis/asynccommanddispatcher.hpp"
25#include "private/redis/contents.hpp"
26#include "private/redis/contentsbuilder.hpp"
27#include "private/redis/reply.hpp"
28
29using namespace shareddatalayer;
30using namespace shareddatalayer::redis;
31
32namespace
33{
34 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
35 const DatabaseInfo& databaseInfo,
36 std::shared_ptr<ContentsBuilder> contentsBuilder,
37 std::shared_ptr<Logger> logger);
38
39 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
40}
41
42AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
43 std::shared_ptr<Logger> logger):
44 AsyncSentinelDatabaseDiscovery(engine,
45 logger,
46 ::asyncCommandDispatcherCreator,
47 std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
48{
49}
50
51AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
52 std::shared_ptr<Logger> logger,
53 const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
54 std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
55 engine(engine),
56 logger(logger),
57 // @TODO Make configurable.
58 databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}),
59 DatabaseInfo::Type::SINGLE,
60 boost::none,
61 DatabaseInfo::Discovery::SENTINEL})),
62 contentsBuilder(contentsBuilder),
63 masterInquiryRetryTimer(*engine),
64 masterInquiryRetryTimerDuration(std::chrono::seconds(1))
65{
66 dispatcher = asyncCommandDispatcherCreator(*engine,
67 databaseInfo,
68 contentsBuilder,
69 logger);
70}
71
72void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
73{
74 stateChangedCb = cb;
75 dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
76}
77
78void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
79{
80 stateChangedCb = nullptr;
81}
82
83void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
84{
85 dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
86 this,
87 std::placeholders::_1,
88 std::placeholders::_2),
89 "dummyNamespace", // Not meaningful for SENTINEL commands
90 contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
91}
92
93void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
94 const Reply& reply)
95{
96 if (!error)
97 {
98 auto hostAndPort = parseMasterInquiryReply(reply, *logger);
99 if (hostAndPort)
100 {
101 auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
102 DatabaseInfo::Type::SINGLE,
103 boost::none,
104 DatabaseInfo::Discovery::SENTINEL}));
105 if (stateChangedCb)
106 stateChangedCb(databaseInfo);
107 }
108 else
109 SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
110 }
111 else
112 {
113 masterInquiryRetryTimer.arm(
114 masterInquiryRetryTimerDuration,
115 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
116 }
117}
118
119namespace
120{
121 std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
122 const DatabaseInfo& databaseInfo,
123 std::shared_ptr<ContentsBuilder> contentsBuilder,
124 std::shared_ptr<Logger> logger)
125 {
126 return AsyncCommandDispatcher::create(engine,
127 databaseInfo,
128 contentsBuilder,
129 false,
130 logger,
131 true);
132 }
133
134 std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
135 {
136 auto replyType = reply.getType();
137 if (replyType == Reply::Type::ARRAY)
138 {
139 auto& replyVector(*reply.getArray());
140 auto hostElementType = replyVector[0]->getType();
141 if (hostElementType == Reply::Type::STRING)
142 {
143 auto host(replyVector[0]->getString()->str);
144 auto portElementType = replyVector[1]->getType();
145 if (portElementType == Reply::Type::STRING)
146 {
147 auto port(replyVector[1]->getString()->str);
148 try
149 {
150 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
151 }
152 catch (const std::exception& e)
153 {
154 logger.debug() << "Invalid host or port in master inquiry reply, host: "
155 << host << ", port: " << port
156 << ", exception: " << e.what() << std::endl;
157 }
158 }
159 else
160 logger.debug() << "Invalid port element type in master inquiry reply: "
161 << static_cast<int>(portElementType) << std::endl;
162 }
163 else
164 logger.debug() << "Invalid host element type in master inquiry reply: "
165 << static_cast<int>(hostElementType) << std::endl;
166 }
167 else
168 logger.debug() << "Invalid master inquiry reply type: "
169 << static_cast<int>(replyType) << std::endl;
170 return nullptr;
171 }
172}