blob: 03ca0122f24e31cf3cdbc2dc671ec70e58c9690a [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
Timo Tietavainena0745d22019-11-28 09:55:22 +020017/*
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
20*/
21
Rolf Badorekef2bf512019-08-20 11:17:15 +030022#include "private/redis/asynchirediscommanddispatcher.hpp"
23#include <algorithm>
24#include <cstring>
25#include <cerrno>
26#include <sstream>
27#include <arpa/inet.h>
28#include "private/abort.hpp"
29#include "private/createlogger.hpp"
30#include "private/engine.hpp"
31#include "private/error.hpp"
32#include "private/logger.hpp"
33#include "private/redis/asyncredisreply.hpp"
34#include "private/redis/reply.hpp"
35#include "private/redis/hiredissystem.hpp"
36#include "private/redis/hiredisepolladapter.hpp"
37#include "private/redis/contents.hpp"
38#include "private/redis/redisgeneral.hpp"
39
40using namespace shareddatalayer;
41using namespace shareddatalayer::redis;
42
43namespace
44{
45 void connectCb(const redisAsyncContext* ac, int status)
46 {
47 bool isConnected = !status;
48 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
49
50 if (isConnected)
51 {
52 std::ostringstream msg;
53 msg << "redis connected, fd: " << ac->c.fd;
54 logInfoOnce(msg.str());
55 instance->verifyConnection();
56 }
57 else
58 instance->setDisconnected();
59
60 }
61
62 void disconnectCb(const redisAsyncContext* ac, int status)
63 {
64 if (status) {
65 std::ostringstream msg;
66 msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
67 logErrorOnce(msg.str());
68 }
69 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
70 instance->setDisconnected();
71 }
72
73 void cb(redisAsyncContext* ac, void* rr, void* pd)
74 {
75 auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
76 auto reply(static_cast<redisReply*>(rr));
77 auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
78 if (instance->isClientCallbacksEnabled())
79 instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
80 }
81}
82
83AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
84 const std::string& address,
85 uint16_t port,
86 std::shared_ptr<ContentsBuilder> contentsBuilder,
87 bool usePermanentCommandCallbacks,
Rolf Badorek8324d022019-09-17 16:47:20 +030088 std::shared_ptr<Logger> logger,
89 bool usedForSentinel):
Rolf Badorekef2bf512019-08-20 11:17:15 +030090 AsyncHiredisCommandDispatcher(engine,
91 address,
92 port,
93 contentsBuilder,
94 usePermanentCommandCallbacks,
95 HiredisSystem::getHiredisSystem(),
96 std::make_shared<HiredisEpollAdapter>(engine),
Rolf Badorek8324d022019-09-17 16:47:20 +030097 logger,
98 usedForSentinel)
Rolf Badorekef2bf512019-08-20 11:17:15 +030099{
100}
101
102AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
103 const std::string& address,
104 uint16_t port,
105 std::shared_ptr<ContentsBuilder> contentsBuilder,
106 bool usePermanentCommandCallbacks,
107 HiredisSystem& hiredisSystem,
108 std::shared_ptr<HiredisEpollAdapter> adapter,
Rolf Badorek8324d022019-09-17 16:47:20 +0300109 std::shared_ptr<Logger> logger,
110 bool usedForSentinel):
Rolf Badorekef2bf512019-08-20 11:17:15 +0300111 engine(engine),
112 address(address),
113 port(ntohs(port)),
114 contentsBuilder(contentsBuilder),
115 usePermanentCommandCallbacks(usePermanentCommandCallbacks),
116 hiredisSystem(hiredisSystem),
117 adapter(adapter),
118 ac(nullptr),
119 serviceState(ServiceState::DISCONNECTED),
120 clientCallbacksEnabled(true),
121 connectionRetryTimer(engine),
122 connectionRetryTimerDuration(std::chrono::seconds(1)),
123 connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
Rolf Badorek8324d022019-09-17 16:47:20 +0300124 logger(logger),
125 usedForSentinel(usedForSentinel)
Rolf Badorekef2bf512019-08-20 11:17:15 +0300126
127{
128 connect();
129}
130
131AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
132{
133 disconnectHiredis();
134}
135
136void AsyncHiredisCommandDispatcher::connect()
137{
138 ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
139 if (ac == nullptr || ac->err)
140 {
141 setDisconnected();
142 return;
143 }
144 ac->data = this;
145 adapter->attach(ac);
146 hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
147 hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
148}
149
150void AsyncHiredisCommandDispatcher::verifyConnection()
151{
Rolf Badorek8324d022019-09-17 16:47:20 +0300152 if (usedForSentinel)
153 setConnected();
154 else
155 {
156 /* When Redis has max amount of users, it will still accept new connections but will
157 * close them immediately. Therefore, we need to verify that just established connection
158 * really works. This prevents calling client readyAck callback for a connection that
159 * will be terminated immediately.
160 */
161 /* Connection verification is now done by doing redis command list query. Because we anyway
162 * need to verify that Redis has required commands, we can now combine these two operations
163 * (command list query and connection verification). If either one of the functionalities
164 * is not needed in the future and it is removed, remember to still leave the other one.
165 */
166 serviceState = ServiceState::CONNECTION_VERIFICATION;
167 /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
168 * we are spontaneously connected to redis while timer is running. If connection verification
169 * fails, timer is armed again (normal handling in connection verification).
170 */
171 connectionRetryTimer.disarm();
172 dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
173 this,
174 std::placeholders::_1,
175 std::placeholders::_2),
176 contentsBuilder->build("COMMAND"),
177 false);
178 }
Rolf Badorekef2bf512019-08-20 11:17:15 +0300179}
180
181void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
182 const redis::Reply& reply)
183{
184 if(error)
185 {
186 logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
187 << error.message();
188
189 if (!connectionRetryTimer.isArmed())
190 {
191 /* Typically if connection verification fails, hiredis will call disconnect callback and
192 * whole connection establishment procedure will be restarted via that. To ensure that
193 * we will retry verification even if connection would not be disconnected this timer
194 * is set. If connection is later disconnected, this timer is disarmed (when disconnect
195 * callback handling arms this timer again).
196 */
197 armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
198 std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
199 }
200 }
201 else
202 {
203 if (checkRedisModuleCommands(parseCommandListReply(reply)))
204 setConnected();
205 else
206 SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
207 }
208}
209
210void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
211{
212 this->connectAck = connectAck;
213 if (serviceState == ServiceState::CONNECTED)
214 engine.postCallback(connectAck);
215}
216
217void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
218{
219 disconnectCallback = disconnectCb;
220}
221
222void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
223 const AsyncConnection::Namespace&,
224 const Contents& contents)
225{
226 dispatchAsync(commandCb, contents, true);
227}
228
229void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
230 const Contents& contents,
231 bool checkConnectionState)
232{
233 if (checkConnectionState && serviceState != ServiceState::CONNECTED)
234 {
235 engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
236 this,
237 commandCb,
238 std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
239 return;
240 }
241 cbs.push_back(commandCb);
242 std::vector<const char*> chars;
243 std::transform(contents.stack.begin(), contents.stack.end(),
244 std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
245 if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
246 &chars[0], &contents.sizes[0]) != REDIS_OK)
247 {
248 removeCb(cbs.back());
249 engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
250 this,
251 commandCb,
252 getRedisError(ac->err, ac->errstr, nullptr)));
253 }
254}
255
256void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
257{
258 clientCallbacksEnabled = false;
259}
260
261void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
262 const std::error_code& error)
263{
264 commandCb(error, AsyncRedisReply());
265}
266
267void AsyncHiredisCommandDispatcher::setConnected()
268{
269 serviceState = ServiceState::CONNECTED;
270
271 if (connectAck)
272 {
273 connectAck();
274 connectAck = ConnectAck();
275 }
276}
277
278void AsyncHiredisCommandDispatcher::setDisconnected()
279{
280 serviceState = ServiceState::DISCONNECTED;
281
282 if (disconnectCallback)
283 disconnectCallback();
284
285 armConnectionRetryTimer(connectionRetryTimerDuration,
286 std::bind(&AsyncHiredisCommandDispatcher::connect, this));
287}
288
289void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
290 const std::error_code& error,
291 const redisReply* rr)
292{
293 if (!isValidCb(commandCb))
294 SHAREDDATALAYER_ABORT("Invalid callback function.");
295 if (error)
296 commandCb(error, AsyncRedisReply());
297 else
298 commandCb(error, AsyncRedisReply(*rr));
299 if (!usePermanentCommandCallbacks)
300 removeCb(commandCb);
301}
302
303bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
304{
305 return clientCallbacksEnabled;
306}
307
308bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
309{
310 for (auto i(cbs.begin()); i != cbs.end(); ++i)
311 if (&*i == &commandCb)
312 return true;
313 return false;
314}
315
316void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
317{
318 for (auto i(cbs.begin()); i != cbs.end(); ++i)
319 if (&*i == &commandCb)
320 {
321 cbs.erase(i);
322 break;
323 }
324}
325
326void AsyncHiredisCommandDispatcher::disconnectHiredis()
327{
328 /* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
329 * if ac is a valid pointer).
330 */
331 if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
332 hiredisSystem.redisAsyncFree(ac);
333
334 //disconnect callback handler will update serviceState
335}
336
337void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
338 std::function<void()> retryAction)
339{
340 connectionRetryTimer.arm(duration,
341 [retryAction] () { retryAction(); });
342}