Add Redis Sentinel based database discovery

This is first step to support forthcoming Redis HA (Sentinel) DBaaS
deployment.

If sentinel-based database discovery is used (currently still disabled
by configure option), current master is asked from Sentinel.

In case that Sentinel can't be connected, re-try will be triggered
after one second delay. If reply parsing fails, it is considered as
non-recoverable bug and execution is aborted.

Currently, Sentinel address and Redis master name are still hard coded,
will be made configurable in a separate commit soon. Also ordering
change notifications from Sentinel will be implemented separately.

Added new discovery type "SENTINEL" to 'sdltool test-connectivity'
command output.

Refactoring for 'AsyncStorageImpl' class unit tests, so that those will
use database discovery mock implementation. Earlier implementation did
have assumptions for database discovery behavior, which were not
fulfilled any more when sentinel database discovery is used.

Added option to 'AsyncCommandDispatcher' which defines if commands
will be sent to Redis or to Sentinel. In latter case existence checking
for Redis module extension commands is skipped.

Signed-off-by: Rolf Badorek <rolf.badorek@nokia.com>
Change-Id: Id7507844c9b74115e52d6f8eaf9cb18198c5dc63
diff --git a/src/asyncstorageimpl.cpp b/src/asyncstorageimpl.cpp
index b17fcbd..59d7b22 100644
--- a/src/asyncstorageimpl.cpp
+++ b/src/asyncstorageimpl.cpp
@@ -23,11 +23,24 @@
 #include "private/engine.hpp"
 #include "private/logger.hpp"
 #if HAVE_REDIS
-#include "private/redis/asyncdatabasediscovery.hpp"
 #include "private/redis/asyncredisstorage.hpp"
 #endif
 
 using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+        std::shared_ptr<AsyncDatabaseDiscovery> asyncDatabaseDiscoveryCreator(std::shared_ptr<Engine> engine,
+                                                                              const DatabaseConfiguration& databaseConfiguration,
+                                                                              std::shared_ptr<Logger> logger)
+        {
+            return AsyncDatabaseDiscovery::create(engine,
+                                                  boost::none,
+                                                  databaseConfiguration,
+                                                  logger);
+        }
+}
 
 AsyncStorageImpl::AsyncStorageImpl(std::shared_ptr<Engine> engine,
                                    const boost::optional<PublisherId>& pId,
@@ -36,7 +49,8 @@
     databaseConfiguration(std::make_shared<DatabaseConfigurationImpl>()),
     namespaceConfigurations(std::make_shared<NamespaceConfigurationsImpl>()),
     publisherId(pId),
-    logger(logger)
+    logger(logger),
+    asyncDatabaseDiscoveryCreator(::asyncDatabaseDiscoveryCreator)
 {
     ConfigurationReader configurationReader(logger);
     configurationReader.readDatabaseConfiguration(std::ref(*databaseConfiguration));
@@ -48,12 +62,14 @@
                                    const boost::optional<PublisherId>& pId,
                                    std::shared_ptr<DatabaseConfiguration> databaseConfiguration,
                                    std::shared_ptr<NamespaceConfigurations> namespaceConfigurations,
-                                   std::shared_ptr<Logger> logger):
+                                   std::shared_ptr<Logger> logger,
+                                   const AsyncDatabaseDiscoveryCreator& asyncDatabaseDiscoveryCreator):
     engine(engine),
     databaseConfiguration(databaseConfiguration),
     namespaceConfigurations(namespaceConfigurations),
     publisherId(pId),
-    logger(logger)
+    logger(logger),
+    asyncDatabaseDiscoveryCreator(asyncDatabaseDiscoveryCreator)
 {
 }
 
@@ -61,9 +77,8 @@
 {
 #if HAVE_REDIS
     static AsyncRedisStorage redisHandler{engine,
-                                          redis::AsyncDatabaseDiscovery::create(
+                                          asyncDatabaseDiscoveryCreator(
                                               engine,
-                                              boost::none,
                                               std::ref(*databaseConfiguration),
                                               logger),
                                           publisherId,
diff --git a/src/cli/testconnectivitycommand.cpp b/src/cli/testconnectivitycommand.cpp
index 7c4b2ad..87ac8c0 100644
--- a/src/cli/testconnectivitycommand.cpp
+++ b/src/cli/testconnectivitycommand.cpp
@@ -131,6 +131,10 @@
                 out << "Discovery type:: HIREDIS" << std::endl;
                 PrintStaticConfiguration(out);
                 break;
+            case DatabaseInfo::Discovery::SENTINEL:
+                out << "Discovery type:: SENTINEL" << std::endl;
+                PrintStaticConfiguration(out);
+                break;
         }
     }
 
diff --git a/src/redis/asynccommanddispatcher.cpp b/src/redis/asynccommanddispatcher.cpp
index 25ec890..1a40f90 100644
--- a/src/redis/asynccommanddispatcher.cpp
+++ b/src/redis/asynccommanddispatcher.cpp
@@ -33,9 +33,11 @@
                                                                        const DatabaseInfo& databaseInfo,
                                                                        std::shared_ptr<ContentsBuilder> contentsBuilder,
                                                                        bool usePermanentCommandCallbacks,
-                                                                       std::shared_ptr<Logger> logger)
+                                                                       std::shared_ptr<Logger> logger,
+                                                                       bool usedForSentinel)
 {
 #if HAVE_HIREDIS_VIP
+    static_cast<void>(usedForSentinel);
     if (databaseInfo.type == DatabaseInfo::Type::CLUSTER)
     {
         return std::make_shared<AsyncHiredisClusterCommandDispatcher>(engine,
@@ -60,7 +62,8 @@
                                                            databaseInfo.hosts.at(0).getPort(),
                                                            contentsBuilder,
                                                            usePermanentCommandCallbacks,
-                                                           logger);
+                                                           logger,
+                                                           usedForSentinel);
 #else
     SHAREDDATALAYER_ABORT("Not implemented.");
 #endif
diff --git a/src/redis/asyncdatabasediscovery.cpp b/src/redis/asyncdatabasediscovery.cpp
index 892e7cc..63f2f12 100644
--- a/src/redis/asyncdatabasediscovery.cpp
+++ b/src/redis/asyncdatabasediscovery.cpp
@@ -22,6 +22,9 @@
 #if HAVE_HIREDIS
 #include "private/redis/asynchiredisdatabasediscovery.hpp"
 #endif
+#if HAVE_SENTINEL
+#include "private/redis/asyncsentineldatabasediscovery.hpp"
+#endif
 #include "private/abort.hpp"
 
 using namespace shareddatalayer::redis;
@@ -49,15 +52,22 @@
         SHAREDDATALAYER_ABORT("No Hiredis vip for Redis cluster configuration");
 #endif
     else
+    {
 #if HAVE_HIREDIS
+#if HAVE_SENTINEL
+        static_cast<void>(ns);
+        return std::make_shared<AsyncSentinelDatabaseDiscovery>(engine,
+                                                                logger);
+#else
         return std::make_shared<AsyncHiredisDatabaseDiscovery>(engine,
                                                                ns,
                                                                DatabaseInfo::Type::SINGLE,
                                                                staticAddresses,
                                                                logger);
+#endif
 #else
         static_cast<void>(logger);
         SHAREDDATALAYER_ABORT("No Hiredis");
 #endif
+    }
 }
-
diff --git a/src/redis/asynchirediscommanddispatcher.cpp b/src/redis/asynchirediscommanddispatcher.cpp
index e4195a9..026da34 100644
--- a/src/redis/asynchirediscommanddispatcher.cpp
+++ b/src/redis/asynchirediscommanddispatcher.cpp
@@ -80,7 +80,8 @@
                                                              uint16_t port,
                                                              std::shared_ptr<ContentsBuilder> contentsBuilder,
                                                              bool usePermanentCommandCallbacks,
-                                                             std::shared_ptr<Logger> logger):
+                                                             std::shared_ptr<Logger> logger,
+                                                             bool usedForSentinel):
     AsyncHiredisCommandDispatcher(engine,
                                   address,
                                   port,
@@ -88,7 +89,8 @@
                                   usePermanentCommandCallbacks,
                                   HiredisSystem::getHiredisSystem(),
                                   std::make_shared<HiredisEpollAdapter>(engine),
-                                  logger)
+                                  logger,
+                                  usedForSentinel)
 {
 }
 
@@ -99,7 +101,8 @@
                                                              bool usePermanentCommandCallbacks,
                                                              HiredisSystem& hiredisSystem,
                                                              std::shared_ptr<HiredisEpollAdapter> adapter,
-                                                             std::shared_ptr<Logger> logger):
+                                                             std::shared_ptr<Logger> logger,
+                                                             bool usedForSentinel):
     engine(engine),
     address(address),
     port(ntohs(port)),
@@ -113,7 +116,8 @@
     connectionRetryTimer(engine),
     connectionRetryTimerDuration(std::chrono::seconds(1)),
     connectionVerificationRetryTimerDuration(std::chrono::seconds(10)),
-    logger(logger)
+    logger(logger),
+    usedForSentinel(usedForSentinel)
 
 {
     connect();
@@ -140,28 +144,33 @@
 
 void AsyncHiredisCommandDispatcher::verifyConnection()
 {
-   /* When Redis has max amount of users, it will still accept new connections but will
-    * close them immediately. Therefore, we need to verify that just established connection
-    * really works. This prevents calling client readyAck callback for a connection that
-    * will be terminated immediately.
-    */
-    /* Connection verification is now done by doing redis command list query. Because we anyway
-     * need to verify that Redis has required commands, we can now combine these two operations
-     * (command list query and connection verification). If either one of the functionalities
-     * is not needed in the future and it is removed, remember to still leave the other one.
-     */
-    serviceState = ServiceState::CONNECTION_VERIFICATION;
-    /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
-     * we are spontaneously connected to redis while timer is running. If connection verification
-     * fails, timer is armed again (normal handling in connection verification).
-     */
-    connectionRetryTimer.disarm();
-    dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
-                            this,
-                            std::placeholders::_1,
-                            std::placeholders::_2),
-                  contentsBuilder->build("COMMAND"),
-                  false);
+    if (usedForSentinel)
+        setConnected();
+    else
+    {
+       /* When Redis has max amount of users, it will still accept new connections but will
+        * close them immediately. Therefore, we need to verify that just established connection
+        * really works. This prevents calling client readyAck callback for a connection that
+        * will be terminated immediately.
+        */
+        /* Connection verification is now done by doing redis command list query. Because we anyway
+         * need to verify that Redis has required commands, we can now combine these two operations
+         * (command list query and connection verification). If either one of the functionalities
+         * is not needed in the future and it is removed, remember to still leave the other one.
+         */
+        serviceState = ServiceState::CONNECTION_VERIFICATION;
+        /* Disarm retry timer as now we are connected to hiredis. This ensures timer disarm if
+         * we are spontaneously connected to redis while timer is running. If connection verification
+         * fails, timer is armed again (normal handling in connection verification).
+         */
+        connectionRetryTimer.disarm();
+        dispatchAsync(std::bind(&AsyncHiredisCommandDispatcher::verifyConnectionReply,
+                                this,
+                                std::placeholders::_1,
+                                std::placeholders::_2),
+                      contentsBuilder->build("COMMAND"),
+                      false);
+    }
 }
 
 void AsyncHiredisCommandDispatcher::verifyConnectionReply(const std::error_code& error,
diff --git a/src/redis/asyncredisstorage.cpp b/src/redis/asyncredisstorage.cpp
index 22d4b2b..f6bf63d 100644
--- a/src/redis/asyncredisstorage.cpp
+++ b/src/redis/asyncredisstorage.cpp
@@ -53,7 +53,8 @@
                                               databaseInfo,
                                               contentsBuilder,
                                               false,
-                                              logger);
+                                              logger,
+                                              false);
     }
 
     class AsyncRedisStorageErrorCategory: public std::error_category
diff --git a/src/redis/asyncsentineldatabasediscovery.cpp b/src/redis/asyncsentineldatabasediscovery.cpp
new file mode 100644
index 0000000..05815e2
--- /dev/null
+++ b/src/redis/asyncsentineldatabasediscovery.cpp
@@ -0,0 +1,172 @@
+/*
+   Copyright (c) 2018-2019 Nokia.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+#include <arpa/inet.h>
+#include <iostream>
+#include <string>
+#include <sdl/asyncstorage.hpp>
+#include "private/abort.hpp"
+#include "private/hostandport.hpp"
+#include "private/redis/asyncsentineldatabasediscovery.hpp"
+#include "private/redis/asynccommanddispatcher.hpp"
+#include "private/redis/contents.hpp"
+#include "private/redis/contentsbuilder.hpp"
+#include "private/redis/reply.hpp"
+
+using namespace shareddatalayer;
+using namespace shareddatalayer::redis;
+
+namespace
+{
+    std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
+                                                                          const DatabaseInfo& databaseInfo,
+                                                                          std::shared_ptr<ContentsBuilder> contentsBuilder,
+                                                                          std::shared_ptr<Logger> logger);
+
+    std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
+}
+
+AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
+                                                               std::shared_ptr<Logger> logger):
+        AsyncSentinelDatabaseDiscovery(engine,
+                                       logger,
+                                       ::asyncCommandDispatcherCreator,
+                                       std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
+{
+}
+
+AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
+                                                               std::shared_ptr<Logger> logger,
+                                                               const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
+                                                               std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
+        engine(engine),
+        logger(logger),
+        // @TODO Make configurable.
+        databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({HostAndPort("dbaas-ha", htons(26379U))}),
+                     DatabaseInfo::Type::SINGLE,
+                     boost::none,
+                     DatabaseInfo::Discovery::SENTINEL})),
+        contentsBuilder(contentsBuilder),
+        masterInquiryRetryTimer(*engine),
+        masterInquiryRetryTimerDuration(std::chrono::seconds(1))
+{
+    dispatcher = asyncCommandDispatcherCreator(*engine,
+                                               databaseInfo,
+                                               contentsBuilder,
+                                               logger);
+}
+
+void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
+{
+    stateChangedCb = cb;
+    dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+}
+
+void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
+{
+    stateChangedCb = nullptr;
+}
+
+void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
+{
+    dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
+                                        this,
+                                        std::placeholders::_1,
+                                        std::placeholders::_2),
+                              "dummyNamespace", // Not meaningful for SENTINEL commands
+                              contentsBuilder->build("SENTINEL", "get-master-addr-by-name", "mymaster")); //@TODO Make master name configurable
+}
+
+void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
+                                                      const Reply& reply)
+{
+    if (!error)
+    {
+        auto hostAndPort = parseMasterInquiryReply(reply, *logger);
+        if (hostAndPort)
+        {
+            auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
+                                           DatabaseInfo::Type::SINGLE,
+                                           boost::none,
+                                           DatabaseInfo::Discovery::SENTINEL}));
+            if (stateChangedCb)
+                stateChangedCb(databaseInfo);
+        }
+        else
+            SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
+    }
+    else
+    {
+        masterInquiryRetryTimer.arm(
+                masterInquiryRetryTimerDuration,
+                std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
+    }
+}
+
+namespace
+{
+    std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
+                                                                          const DatabaseInfo& databaseInfo,
+                                                                          std::shared_ptr<ContentsBuilder> contentsBuilder,
+                                                                          std::shared_ptr<Logger> logger)
+    {
+        return AsyncCommandDispatcher::create(engine,
+                                              databaseInfo,
+                                              contentsBuilder,
+                                              false,
+                                              logger,
+                                              true);
+    }
+
+    std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
+    {
+        auto replyType = reply.getType();
+        if (replyType == Reply::Type::ARRAY)
+        {
+            auto& replyVector(*reply.getArray());
+            auto hostElementType = replyVector[0]->getType();
+            if (hostElementType == Reply::Type::STRING)
+            {
+                auto host(replyVector[0]->getString()->str);
+                auto portElementType = replyVector[1]->getType();
+                if (portElementType == Reply::Type::STRING)
+                {
+                    auto port(replyVector[1]->getString()->str);
+                    try
+                    {
+                        return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
+                    }
+                    catch (const std::exception& e)
+                    {
+                        logger.debug() << "Invalid host or port in master inquiry reply, host: "
+                                       << host << ", port: " << port
+                                       << ", exception: " << e.what() << std::endl;
+                    }
+                }
+                else
+                    logger.debug() << "Invalid port element type in master inquiry reply: "
+                                   << static_cast<int>(portElementType) << std::endl;
+            }
+            else
+                logger.debug() << "Invalid host element type in master inquiry reply: "
+                               << static_cast<int>(hostElementType) << std::endl;
+        }
+        else
+            logger.debug() << "Invalid master inquiry reply type: "
+                           << static_cast<int>(replyType) << std::endl;
+        return nullptr;
+    }
+}