blob: ac4fc34282b4c48be1b3d72e1b7ae0abee08d7e6 [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
Timo Tietavainena0745d22019-11-28 09:55:22 +020017/*
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
20*/
21
Rolf Badorekef2bf512019-08-20 11:17:15 +030022#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
23#include <algorithm>
24#include <cstring>
25#include <cerrno>
26#include <sstream>
27#include "private/abort.hpp"
28#include "private/createlogger.hpp"
29#include "private/error.hpp"
30#include "private/logger.hpp"
31#include "private/redis/asyncredisreply.hpp"
32#include "private/redis/reply.hpp"
33#include "private/redis/hiredisclustersystem.hpp"
34#include "private/engine.hpp"
35#include "private/redis/hiredisclusterepolladapter.hpp"
36#include "private/redis/contents.hpp"
37#include "private/redis/redisgeneral.hpp"
38
39using namespace shareddatalayer;
40using namespace shareddatalayer::redis;
41
42namespace
43{
44 void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
45 {
46 if (!status)
47 {
48 std::ostringstream msg;
49 msg << "redis cluster instance connected, fd: " << ac->c.fd;
50 logDebugOnce(msg.str());
51 }
52 }
53
54 void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
55 {
56 if (status)
57 {
58 std::ostringstream msg;
59 msg << "redis cluster instance disconnected, fd: " << ac->c.fd
60 << ", status: " << ac->err;
61 logDebugOnce(msg.str());
62 }
63 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
64 instance->handleDisconnect(ac);
65 }
66
67 void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
68 {
69 auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
70 auto reply(static_cast<redisReply*>(rr));
71 auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
72 if (instance->isClientCallbacksEnabled())
73 instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
74 }
75}
76
77AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
78 const boost::optional<std::string>& ns,
79 const DatabaseConfiguration::Addresses& addresses,
80 std::shared_ptr<ContentsBuilder> contentsBuilder,
81 bool usePermanentCommandCallbacks,
82 std::shared_ptr<Logger> logger):
83 AsyncHiredisClusterCommandDispatcher(engine,
84 ns,
85 addresses,
86 contentsBuilder,
87 usePermanentCommandCallbacks,
88 HiredisClusterSystem::getInstance(),
89 std::make_shared<HiredisClusterEpollAdapter>(engine),
90 logger)
91{
92}
93
94AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
95 const boost::optional<std::string>& ns,
96 const DatabaseConfiguration::Addresses& addresses,
97 std::shared_ptr<ContentsBuilder> contentsBuilder,
98 bool usePermanentCommandCallbacks,
99 HiredisClusterSystem& hiredisClusterSystem,
100 std::shared_ptr<HiredisClusterEpollAdapter> adapter,
101 std::shared_ptr<Logger> logger):
102 engine(engine),
103 initialNamespace(ns),
104 addresses(addresses),
105 contentsBuilder(contentsBuilder),
106 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
107 hiredisClusterSystem(hiredisClusterSystem),
108 adapter(adapter),
109 acc(nullptr),
110 serviceState(ServiceState::DISCONNECTED),
111 clientCallbacksEnabled(true),
112 connectionRetryTimer(engine),
113 connectionRetryTimerDuration(std::chrono::seconds(1)),
114 logger(logger)
115{
116 connect();
117}
118
119AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
120{
121 disconnectHiredisCluster();
122}
123
124void AsyncHiredisClusterCommandDispatcher::connect()
125{
126 // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
127 disconnectHiredisCluster();
128 acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
129 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
130 if (acc == nullptr)
131 {
132 logger->error() << "SDL: connecting to redis cluster failed, null context returned";
133 armConnectionRetryTimer();
134 return;
135 }
136 if (acc->err)
137 {
138 logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
139 armConnectionRetryTimer();
140 return;
141 }
142 acc->data = this;
143 adapter->setup(acc);
144 hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
145 hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
146 verifyConnection();
147}
148
149void AsyncHiredisClusterCommandDispatcher::verifyConnection()
150{
151 /* redisClusterAsyncConnect only queries available cluster nodes but it does
152 * not connect to any cluster node (as it does not know to which node it should connect to).
153 * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
154 * determined to which cluster node this instance will connect to. We do initial operation
155 * to get connection to right redis node established already now. This also verifies that
156 * connection really works. When Redis has max amount of users, it will still accept new
157 * connections but is will close them immediately. Therefore, we need to verify that just
158 * established connection really works.
159 */
160 /* Connection setup/verification is now done by doing redis command list query. Because we anyway
161 * need to verify that Redis has required commands, we can now combine these two operations
162 * (command list query and connection setup/verification). If either one of the functionalities
163 * is not needed in the future and it is removed, remember to still leave the other one.
164 */
165 /* Non namespace-specific command list query can be used for connection setup purposes,
166 * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
167 * commands dispacthed.
168 */
169
170 /* If initial namespace was not given during dispatcher creation (multi namespace API),
171 * verification is sent to hardcoded namespace. This works for verification purposes
172 * because in our environment cluster is configured to operate only if all nodes
173 * are working (so we can do verification to any node). However, this is not optimal
174 * because we do not necessarily connect to such cluster node which will later be
175 * used by client. Also our cluster configuration can change. This needs to be
176 * optimized later (perhaps to connect to all nodes). */
177 std::string nsForVerification;
178 if (initialNamespace)
179 nsForVerification = *initialNamespace;
180 else
181 nsForVerification = "namespace";
182
183 dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
184 this,
185 std::placeholders::_1,
186 std::placeholders::_2),
187 nsForVerification,
188 contentsBuilder->build("COMMAND"),
189 false);
190}
191
192void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
193{
194 if(error)
195 {
196 logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
197 << error.message();
198 armConnectionRetryTimer();
199 }
200 else
201 {
202 if (checkRedisModuleCommands(parseCommandListReply(reply)))
203 setConnected();
204 else
205 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
206 }
207}
208
209void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
210{
211 this->connectAck = connectAck;
212 if (serviceState == ServiceState::CONNECTED)
213 engine.postCallback(connectAck);
214}
215
216void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
217{
218 disconnectCallback = disconnectCb;
219}
220
221void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
222 const AsyncConnection::Namespace& ns,
223 const Contents& contents)
224{
225 dispatchAsync(commandCb, ns, contents, true);
226}
227
228void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
229 const AsyncConnection::Namespace& ns,
230 const Contents& contents,
231 bool checkConnectionState)
232{
233 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
234 {
235 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
236 this,
237 commandCb,
238 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
239 return;
240 }
241 cbs.push_back(commandCb);
242 std::vector<const char*> chars;
243 std::transform(contents.stack.begin(), contents.stack.end(),
244 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
245 if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
246 static_cast<int>(contents.stack.size()), &chars[0],
247 &contents.sizes[0]) != REDIS_OK)
248 {
249 removeCb(cbs.back());
250 engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
251 this,
252 commandCb,
253 getRedisError(acc->err, acc->errstr, nullptr)));
254 }
255}
256
257void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
258{
259 clientCallbacksEnabled = false;
260}
261
262void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
263{
264 commandCb(error, AsyncRedisReply());
265}
266
267void AsyncHiredisClusterCommandDispatcher::setConnected()
268{
269 serviceState = ServiceState::CONNECTED;
270
271 if (connectAck)
272 {
273 connectAck();
274 connectAck = ConnectAck();
275 }
276}
277
278void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
279{
280 connectionRetryTimer.arm(connectionRetryTimerDuration,
281 [this] () { connect(); });
282
283}
284
285void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
286 const std::error_code& error,
287 const redisReply* rr)
288{
289 if (!isValidCb(commandCb))
290 SHAREDDATALAYER_ABORT("Invalid callback function.");
291 if (error)
292 commandCb(error, AsyncRedisReply());
293 else
294 commandCb(error, AsyncRedisReply(*rr));
295 if (!usePermanentCommandCallbacks)
296 removeCb(commandCb);
297}
298
299bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
300{
301 return clientCallbacksEnabled;
302}
303
304bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
305{
306 for (auto i(cbs.begin()); i != cbs.end(); ++i)
307 if (&*i == &commandCb)
308 return true;
309 return false;
310}
311
312void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
313{
314 for (auto i(cbs.begin()); i != cbs.end(); ++i)
315 if (&*i == &commandCb)
316 {
317 cbs.erase(i);
318 break;
319 }
320}
321
322void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
323{
324 adapter->detach(ac);
325
326 if (disconnectCallback)
327 disconnectCallback();
328}
329
330void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
331{
332 /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
333 * if acc is a valid pointer).
334 */
335 if (serviceState == ServiceState::CONNECTED)
336 hiredisClusterSystem.redisClusterAsyncFree(acc);
337
338 serviceState = ServiceState::DISCONNECTED;
339}