blob: 03ca0122f24e31cf3cdbc2dc671ec70e58c9690a [file] [log] [blame]
/*
Copyright (c) 2018-2019 Nokia.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
* This source code is part of the near-RT RIC (RAN Intelligent Controller)
* platform project (RICP).
*/
#include "private/redis/asynchirediscommanddispatcher.hpp"
#include <algorithm>
#include <cstring>
#include <cerrno>
#include <sstream>
#include <arpa/inet.h>
#include "private/abort.hpp"
#include "private/createlogger.hpp"
#include "private/engine.hpp"
#include "private/error.hpp"
#include "private/logger.hpp"
#include "private/redis/asyncredisreply.hpp"
#include "private/redis/reply.hpp"
#include "private/redis/hiredissystem.hpp"
#include "private/redis/hiredisepolladapter.hpp"
#include "private/redis/contents.hpp"
#include "private/redis/redisgeneral.hpp"
using namespace shareddatalayer;
using namespace shareddatalayer::redis;
namespace
{
void connectCb(const redisAsyncContext* ac, int status)
{
bool isConnected = !status;
auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
if (isConnected)
{
std::ostringstream msg;
msg << "redis connected, fd: " << ac->c.fd;
logInfoOnce(msg.str());
instance->verifyConnection();
}
else
instance->setDisconnected();
}
void disconnectCb(const redisAsyncContext* ac, int status)
{
if (status) {
std::ostringstream msg;
msg << "redis disconnected, status: " << ac->err << ", " << ac->errstr << ", fd: " << ac->c.fd;
logErrorOnce(msg.str());
}
auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
instance->setDisconnected();
}
void cb(redisAsyncContext* ac, void* rr, void* pd)
{
auto instance(static_cast<AsyncHiredisCommandDispatcher*>(ac->data));
auto reply(static_cast<redisReply*>(rr));
auto cb(static_cast<AsyncHiredisCommandDispatcher::CommandCb*>(pd));
if (instance->isClientCallbacksEnabled())
instance->handleReply(*cb, getRedisError(ac->err, ac->errstr, reply), reply);
}
}
AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
const std::string& address,
uint16_t port,
std::shared_ptr<ContentsBuilder> contentsBuilder,
bool usePermanentCommandCallbacks,
std::shared_ptr<Logger> logger,
bool usedForSentinel):
AsyncHiredisCommandDispatcher(engine,
address,
port,
contentsBuilder,
usePermanentCommandCallbacks,
HiredisSystem::getHiredisSystem(),
std::make_shared<HiredisEpollAdapter>(engine),
logger,
usedForSentinel)
{
}
AsyncHiredisCommandDispatcher::AsyncHiredisCommandDispatcher(Engine& engine,
const std::string& address,
uint16_t port,
std::shared_ptr<ContentsBuilder> contentsBuilder,
bool usePermanentCommandCallbacks,
HiredisSystem& hiredisSystem,
std::shared_ptr<HiredisEpollAdapter> adapter,
std::shared_ptr<Logger> logger,
bool usedForSentinel):
engine(engine),
address(address),
port(ntohs(port)),
contentsBuilder(contentsBuilder),
usePermanentCommandCallbacks(usePermanentCommandCallbacks),
hiredisSystem(hiredisSystem),
adapter(adapter),
ac(nullptr),
serviceState(ServiceState::DISCONNECTED),
clientCallbacksEnabled(true),
connectionRetryTimer(engine),
connectionRetryTimerDuration(std::chrono::seconds(1)),
connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
logger(logger),
usedForSentinel(usedForSentinel)
{
connect();
}
AsyncHiredisCommandDispatcher::~AsyncHiredisCommandDispatcher()
{
disconnectHiredis();
}
void AsyncHiredisCommandDispatcher::connect()
{
ac = hiredisSystem.redisAsyncConnect(address.c_str(), port);
if (ac == nullptr || ac->err)
{
setDisconnected();
return;
}
ac->data = this;
adapter->attach(ac);
hiredisSystem.redisAsyncSetConnectCallback(ac, connectCb);
hiredisSystem.redisAsyncSetDisconnectCallback(ac, disconnectCb);
}
void AsyncHiredisCommandDispatcher::verifyConnection()
{
if (usedForSentinel)
setConnected();
else
{
/* When Redis has max amount of users, it will still accept new connections but will
* close them immediately. Therefore, we need to verify that just established connection
* really works. This prevents calling client readyAck callback for a connection that
* will be terminated immediately.
*/
/* Connection verification is now done by doing redis command list query. Because we anyway
* need to verify that Redis has required commands, we can now combine these two operations
* (command list query and connection verification). If either one of the functionalities
* is not needed in the future and it is removed, remember to still leave the other one.
*/
serviceState = ServiceState::CONNECTION_VERIFICATION;
/* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
* we are spontaneously connected to redis while timer is running. If connection verification
* fails, timer is armed again (normal handling in connection verification).
*/
connectionRetryTimer.disarm();
dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
this,
std::placeholders::_1,
std::placeholders::_2),
contentsBuilder->build("COMMAND"),
false);
}
}
void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
const redis::Reply& reply)
{
if(error)
{
logger->error() << "AsyncHiredisCommandDispatcher: connection verification failed: "
<< error.message();
if (!connectionRetryTimer.isArmed())
{
/* Typically if connection verification fails, hiredis will call disconnect callback and
* whole connection establishment procedure will be restarted via that. To ensure that
* we will retry verification even if connection would not be disconnected this timer
* is set. If connection is later disconnected, this timer is disarmed (when disconnect
* callback handling arms this timer again).
*/
armConnectionRetryTimer(connectionVerificationRetryTimerDuration,
std::bind(&AsyncHiredisCommandDispatcher::verifyConnection, this));
}
}
else
{
if (checkRedisModuleCommands(parseCommandListReply(reply)))
setConnected();
else
SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
}
}
void AsyncHiredisCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
{
this->connectAck = connectAck;
if (serviceState == ServiceState::CONNECTED)
engine.postCallback(connectAck);
}
void AsyncHiredisCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
{
disconnectCallback = disconnectCb;
}
void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
const AsyncConnection::Namespace&,
const Contents& contents)
{
dispatchAsync(commandCb, contents, true);
}
void AsyncHiredisCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
const Contents& contents,
bool checkConnectionState)
{
if (checkConnectionState && serviceState != ServiceState::CONNECTED)
{
engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
this,
commandCb,
std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
return;
}
cbs.push_back(commandCb);
std::vector<const char*> chars;
std::transform(contents.stack.begin(), contents.stack.end(),
std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
if (hiredisSystem.redisAsyncCommandArgv(ac, cb, &cbs.back(), static_cast<int>(contents.stack.size()),
&chars[0], &contents.sizes[0]) != REDIS_OK)
{
removeCb(cbs.back());
engine.postCallback(std::bind(&AsyncHiredisCommandDispatcher::callCommandCbWithError,
this,
commandCb,
getRedisError(ac->err, ac->errstr, nullptr)));
}
}
void AsyncHiredisCommandDispatcher::disableCommandCallbacks()
{
clientCallbacksEnabled = false;
}
void AsyncHiredisCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb,
const std::error_code& error)
{
commandCb(error, AsyncRedisReply());
}
void AsyncHiredisCommandDispatcher::setConnected()
{
serviceState = ServiceState::CONNECTED;
if (connectAck)
{
connectAck();
connectAck = ConnectAck();
}
}
void AsyncHiredisCommandDispatcher::setDisconnected()
{
serviceState = ServiceState::DISCONNECTED;
if (disconnectCallback)
disconnectCallback();
armConnectionRetryTimer(connectionRetryTimerDuration,
std::bind(&AsyncHiredisCommandDispatcher::connect, this));
}
void AsyncHiredisCommandDispatcher::handleReply(const CommandCb& commandCb,
const std::error_code& error,
const redisReply* rr)
{
if (!isValidCb(commandCb))
SHAREDDATALAYER_ABORT("Invalid callback function.");
if (error)
commandCb(error, AsyncRedisReply());
else
commandCb(error, AsyncRedisReply(*rr));
if (!usePermanentCommandCallbacks)
removeCb(commandCb);
}
bool AsyncHiredisCommandDispatcher::isClientCallbacksEnabled() const
{
return clientCallbacksEnabled;
}
bool AsyncHiredisCommandDispatcher::isValidCb(const CommandCb& commandCb)
{
for (auto i(cbs.begin()); i != cbs.end(); ++i)
if (&*i == &commandCb)
return true;
return false;
}
void AsyncHiredisCommandDispatcher::removeCb(const CommandCb& commandCb)
{
for (auto i(cbs.begin()); i != cbs.end(); ++i)
if (&*i == &commandCb)
{
cbs.erase(i);
break;
}
}
void AsyncHiredisCommandDispatcher::disconnectHiredis()
{
/* hiredis sometimes crashes if redisAsyncFree is called without being connected (even
* if ac is a valid pointer).
*/
if (serviceState == ServiceState::CONNECTED || serviceState == ServiceState::CONNECTION_VERIFICATION)
hiredisSystem.redisAsyncFree(ac);
//disconnect callback handler will update serviceState
}
void AsyncHiredisCommandDispatcher::armConnectionRetryTimer(Timer::Duration duration,
std::function<void()> retryAction)
{
connectionRetryTimer.arm(duration,
[retryAction] () { retryAction(); });
}