blob: 7c06f157ea63255d3f9c0f696cbaaf812affeb5a [file] [log] [blame]
/*
Copyright (c) 2018-2019 Nokia.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <arpa/inet.h>
#include <boost/algorithm/string.hpp>
#include <iostream>
#include <string>
#include <vector>
#include <sdl/asyncstorage.hpp>
#include "private/abort.hpp"
#include "private/hostandport.hpp"
#include "private/redis/asyncsentineldatabasediscovery.hpp"
#include "private/redis/asynccommanddispatcher.hpp"
#include "private/redis/contents.hpp"
#include "private/redis/contentsbuilder.hpp"
#include "private/redis/reply.hpp"
using namespace shareddatalayer;
using namespace shareddatalayer::redis;
namespace
{
std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
const DatabaseInfo& databaseInfo,
std::shared_ptr<ContentsBuilder> contentsBuilder,
std::shared_ptr<Logger> logger,
bool usePermanentCommandCallbacks);
struct SubscribeReply
{
enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
Type type;
std::string message;
SubscribeReply(): type(Type::UNKNOWN) { }
};
std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
}
AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
std::shared_ptr<Logger> logger,
const HostAndPort& sentinelAddress,
const std::string& sentinelMasterName):
AsyncSentinelDatabaseDiscovery(engine,
logger,
sentinelAddress,
sentinelMasterName,
::asyncCommandDispatcherCreator,
std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
{
}
AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
std::shared_ptr<Logger> logger,
const HostAndPort& sentinelAddress,
const std::string& sentinelMasterName,
const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
engine(engine),
logger(logger),
databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({sentinelAddress}),
DatabaseInfo::Type::SINGLE,
boost::none,
DatabaseInfo::Discovery::SENTINEL})),
sentinelMasterName(sentinelMasterName),
contentsBuilder(contentsBuilder),
subscribeRetryTimer(*engine),
subscribeRetryTimerDuration(std::chrono::seconds(1)),
masterInquiryRetryTimer(*engine),
masterInquiryRetryTimerDuration(std::chrono::seconds(1))
{
subscriber = asyncCommandDispatcherCreator(*engine,
databaseInfo,
contentsBuilder,
logger,
true);
dispatcher = asyncCommandDispatcherCreator(*engine,
databaseInfo,
contentsBuilder,
logger,
false);
}
AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
{
if (subscriber)
subscriber->disableCommandCallbacks();
if (dispatcher)
dispatcher->disableCommandCallbacks();
stateChangedCb = nullptr;
}
void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
{
stateChangedCb = cb;
subscriber->registerDisconnectCb([this]()
{
subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
});
subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
}
void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
{
stateChangedCb = nullptr;
}
void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
{
subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
this,
std::placeholders::_1,
std::placeholders::_2),
"dummyNamespace", // Not meaningful for Sentinel
contentsBuilder->build("SUBSCRIBE", "+switch-master"));
}
void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
const Reply& reply)
{
if (!error)
{
auto subscribeReply = parseSubscribeReply(reply, *logger);
if (subscribeReply)
{
switch (subscribeReply->type)
{
case (SubscribeReply::Type::SUBSCRIBE_REPLY):
{
dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
break;
}
case (SubscribeReply::Type::NOTIFICATION):
{
auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
if (hostAndPort)
{
auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
DatabaseInfo::Type::SINGLE,
boost::none,
DatabaseInfo::Discovery::SENTINEL}));
if (stateChangedCb)
stateChangedCb(databaseInfo);
}
else
SHAREDDATALAYER_ABORT("Notification message parsing error.");
break;
}
case (SubscribeReply::Type::UNKNOWN):
{
logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
}
}
}
else
SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
}
else
subscribeRetryTimer.arm(
subscribeRetryTimerDuration,
std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
}
void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
{
dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
this,
std::placeholders::_1,
std::placeholders::_2),
"dummyNamespace", // Not meaningful for Sentinel
contentsBuilder->build("SENTINEL", "get-master-addr-by-name", sentinelMasterName));
}
void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
const Reply& reply)
{
if (!error)
{
auto hostAndPort = parseMasterInquiryReply(reply, *logger);
if (hostAndPort)
{
auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
DatabaseInfo::Type::SINGLE,
boost::none,
DatabaseInfo::Discovery::SENTINEL}));
if (stateChangedCb)
stateChangedCb(databaseInfo);
}
else
SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
}
else
{
masterInquiryRetryTimer.arm(
masterInquiryRetryTimerDuration,
std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
}
}
namespace
{
std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
const DatabaseInfo& databaseInfo,
std::shared_ptr<ContentsBuilder> contentsBuilder,
std::shared_ptr<Logger> logger,
bool usePermanentCommandCallbacks)
{
return AsyncCommandDispatcher::create(engine,
databaseInfo,
contentsBuilder,
usePermanentCommandCallbacks,
logger,
true);
}
std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
{
// refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
auto replyType = reply.getType();
if (replyType == Reply::Type::ARRAY)
{
auto& replyVector(*reply.getArray());
auto firstElementType = replyVector[0]->getType();
if (firstElementType == Reply::Type::STRING)
{
auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
auto kind(replyVector[0]->getString()->str);
if (kind == "subscribe")
{
subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
return subscribeReply;
}
else if (kind == "message")
{
subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
auto thirdElementType = replyVector[2]->getType();
if (thirdElementType == Reply::Type::STRING)
{
subscribeReply->message = replyVector[2]->getString()->str;
return subscribeReply;
}
else
logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
}
else
logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
}
else
logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
<< static_cast<int>(firstElementType) << std::endl;
}
else
logger.debug() << "Invalid SUBSCRIBE reply type: "
<< static_cast<int>(replyType) << std::endl;
return nullptr;
}
std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
{
auto replyType = reply.getType();
if (replyType == Reply::Type::ARRAY)
{
auto& replyVector(*reply.getArray());
auto hostElementType = replyVector[0]->getType();
if (hostElementType == Reply::Type::STRING)
{
auto host(replyVector[0]->getString()->str);
auto portElementType = replyVector[1]->getType();
if (portElementType == Reply::Type::STRING)
{
auto port(replyVector[1]->getString()->str);
try
{
return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
}
catch (const std::exception& e)
{
logger.debug() << "Invalid host or port in master inquiry reply, host: "
<< host << ", port: " << port
<< ", exception: " << e.what() << std::endl;
}
}
else
logger.debug() << "Invalid port element type in master inquiry reply: "
<< static_cast<int>(portElementType) << std::endl;
}
else
logger.debug() << "Invalid host element type in master inquiry reply: "
<< static_cast<int>(hostElementType) << std::endl;
}
else
logger.debug() << "Invalid master inquiry reply type: "
<< static_cast<int>(replyType) << std::endl;
return nullptr;
}
std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
{
std::vector<std::string> splittedMessage;
boost::split(splittedMessage, message, boost::is_any_of(" "));
if (splittedMessage.size() == 5)
{
auto host = splittedMessage[3];
auto port = splittedMessage[4];
try
{
return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
}
catch (const std::exception& e)
{
logger.debug() << "Invalid host or port in notification message, host: "
<< host << ", port: " << port
<< ", exception: " << e.what() << std::endl;
}
}
else
logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
return nullptr;
}
}