blob: 72708866e3eb95cae22c6e2b97ae58b5ee88ede2 [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
18#include <algorithm>
19#include <cstring>
20#include <cerrno>
21#include <sstream>
22#include "private/abort.hpp"
23#include "private/createlogger.hpp"
24#include "private/error.hpp"
25#include "private/logger.hpp"
26#include "private/redis/asyncredisreply.hpp"
27#include "private/redis/reply.hpp"
28#include "private/redis/hiredisclustersystem.hpp"
29#include "private/engine.hpp"
30#include "private/redis/hiredisclusterepolladapter.hpp"
31#include "private/redis/contents.hpp"
32#include "private/redis/redisgeneral.hpp"
33
34using namespace shareddatalayer;
35using namespace shareddatalayer::redis;
36
37namespace
38{
39 void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
40 {
41 if (!status)
42 {
43 std::ostringstream msg;
44 msg << "redis cluster instance connected, fd: " << ac->c.fd;
45 logDebugOnce(msg.str());
46 }
47 }
48
49 void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
50 {
51 if (status)
52 {
53 std::ostringstream msg;
54 msg << "redis cluster instance disconnected, fd: " << ac->c.fd
55 << ", status: " << ac->err;
56 logDebugOnce(msg.str());
57 }
58 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
59 instance->handleDisconnect(ac);
60 }
61
62 void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
63 {
64 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
65 auto reply(static_cast<redisReply*>(rr));
66 auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
67 if (instance->isClientCallbacksEnabled())
68 instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
69 }
70}
71
72AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
73 const boost::optional<std::string>& ns,
74 const DatabaseConfiguration::Addresses& addresses,
75 std::shared_ptr<ContentsBuilder> contentsBuilder,
76 bool usePermanentCommandCallbacks,
77 std::shared_ptr<Logger> logger):
78 AsyncHiredisClusterCommandDispatcher(engine,
79 ns,
80 addresses,
81 contentsBuilder,
82 usePermanentCommandCallbacks,
83 HiredisClusterSystem::getInstance(),
84 std::make_shared<HiredisClusterEpollAdapter>(engine),
85 logger)
86{
87}
88
89AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
90 const boost::optional<std::string>& ns,
91 const DatabaseConfiguration::Addresses& addresses,
92 std::shared_ptr<ContentsBuilder> contentsBuilder,
93 bool usePermanentCommandCallbacks,
94 HiredisClusterSystem& hiredisClusterSystem,
95 std::shared_ptr<HiredisClusterEpollAdapter> adapter,
96 std::shared_ptr<Logger> logger):
97 engine(engine),
98 initialNamespace(ns),
99 addresses(addresses),
100 contentsBuilder(contentsBuilder),
101 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
102 hiredisClusterSystem(hiredisClusterSystem),
103 adapter(adapter),
104 acc(nullptr),
105 serviceState(ServiceState::DISCONNECTED),
106 clientCallbacksEnabled(true),
107 connectionRetryTimer(engine),
108 connectionRetryTimerDuration(std::chrono::seconds(1)),
109 logger(logger)
110{
111 connect();
112}
113
114AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
115{
116 disconnectHiredisCluster();
117}
118
119void AsyncHiredisClusterCommandDispatcher::connect()
120{
121 // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
122 disconnectHiredisCluster();
123 acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
124 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
125 if (acc == nullptr)
126 {
127 logger->error() << "SDL: connecting to redis cluster failed, null context returned";
128 armConnectionRetryTimer();
129 return;
130 }
131 if (acc->err)
132 {
133 logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
134 armConnectionRetryTimer();
135 return;
136 }
137 acc->data = this;
138 adapter->setup(acc);
139 hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
140 hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
141 verifyConnection();
142}
143
144void AsyncHiredisClusterCommandDispatcher::verifyConnection()
145{
146 /* redisClusterAsyncConnect only queries available cluster nodes but it does
147 * not connect to any cluster node (as it does not know to which node it should connect to).
148 * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
149 * determined to which cluster node this instance will connect to. We do initial operation
150 * to get connection to right redis node established already now. This also verifies that
151 * connection really works. When Redis has max amount of users, it will still accept new
152 * connections but is will close them immediately. Therefore, we need to verify that just
153 * established connection really works.
154 */
155 /* Connection setup/verification is now done by doing redis command list query. Because we anyway
156 * need to verify that Redis has required commands, we can now combine these two operations
157 * (command list query and connection setup/verification). If either one of the functionalities
158 * is not needed in the future and it is removed, remember to still leave the other one.
159 */
160 /* Non namespace-specific command list query can be used for connection setup purposes,
161 * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
162 * commands dispacthed.
163 */
164
165 /* If initial namespace was not given during dispatcher creation (multi namespace API),
166 * verification is sent to hardcoded namespace. This works for verification purposes
167 * because in our environment cluster is configured to operate only if all nodes
168 * are working (so we can do verification to any node). However, this is not optimal
169 * because we do not necessarily connect to such cluster node which will later be
170 * used by client. Also our cluster configuration can change. This needs to be
171 * optimized later (perhaps to connect to all nodes). */
172 std::string nsForVerification;
173 if (initialNamespace)
174 nsForVerification = *initialNamespace;
175 else
176 nsForVerification = "namespace";
177
178 dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
179 this,
180 std::placeholders::_1,
181 std::placeholders::_2),
182 nsForVerification,
183 contentsBuilder->build("COMMAND"),
184 false);
185}
186
187void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
188{
189 if(error)
190 {
191 logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
192 << error.message();
193 armConnectionRetryTimer();
194 }
195 else
196 {
197 if (checkRedisModuleCommands(parseCommandListReply(reply)))
198 setConnected();
199 else
200 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
201 }
202}
203
204void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
205{
206 this->connectAck = connectAck;
207 if (serviceState == ServiceState::CONNECTED)
208 engine.postCallback(connectAck);
209}
210
211void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
212{
213 disconnectCallback = disconnectCb;
214}
215
216void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
217 const AsyncConnection::Namespace& ns,
218 const Contents& contents)
219{
220 dispatchAsync(commandCb, ns, contents, true);
221}
222
223void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
224 const AsyncConnection::Namespace& ns,
225 const Contents& contents,
226 bool checkConnectionState)
227{
228 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
229 {
230 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
231 this,
232 commandCb,
233 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
234 return;
235 }
236 cbs.push_back(commandCb);
237 std::vector<const char*> chars;
238 std::transform(contents.stack.begin(), contents.stack.end(),
239 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
240 if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
241 static_cast<int>(contents.stack.size()), &chars[0],
242 &contents.sizes[0]) != REDIS_OK)
243 {
244 removeCb(cbs.back());
245 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
246 this,
247 commandCb,
248 getRedisError(acc->err, acc->errstr, nullptr)));
249 }
250}
251
252void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
253{
254 clientCallbacksEnabled = false;
255}
256
257void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
258{
259 commandCb(error, AsyncRedisReply());
260}
261
262void AsyncHiredisClusterCommandDispatcher::setConnected()
263{
264 serviceState = ServiceState::CONNECTED;
265
266 if (connectAck)
267 {
268 connectAck();
269 connectAck = ConnectAck();
270 }
271}
272
273void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
274{
275 connectionRetryTimer.arm(connectionRetryTimerDuration,
276 [this] () { connect(); });
277
278}
279
280void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
281 const std::error_code& error,
282 const redisReply* rr)
283{
284 if (!isValidCb(commandCb))
285 SHAREDDATALAYER_ABORT("Invalid callback function.");
286 if (error)
287 commandCb(error, AsyncRedisReply());
288 else
289 commandCb(error, AsyncRedisReply(*rr));
290 if (!usePermanentCommandCallbacks)
291 removeCb(commandCb);
292}
293
294bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
295{
296 return clientCallbacksEnabled;
297}
298
299bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
300{
301 for (auto i(cbs.begin()); i != cbs.end(); ++i)
302 if (&*i == &commandCb)
303 return true;
304 return false;
305}
306
307void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
308{
309 for (auto i(cbs.begin()); i != cbs.end(); ++i)
310 if (&*i == &commandCb)
311 {
312 cbs.erase(i);
313 break;
314 }
315}
316
317void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
318{
319 adapter->detach(ac);
320
321 if (disconnectCallback)
322 disconnectCallback();
323}
324
325void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
326{
327 /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
328 * if acc is a valid pointer).
329 */
330 if (serviceState == ServiceState::CONNECTED)
331 hiredisClusterSystem.redisClusterAsyncFree(acc);
332
333 serviceState = ServiceState::DISCONNECTED;
334}