Add Sentinel change notification handling logic

When new Redis master has been promoted, Sentinel publishes
notification to '+switch-master' channel.

Refer to below web page for further details:
https://redis.io/topics/sentinel

'AsyncSentinelDatabaseDiscovery' will now subscribe notifications for
above mentioned channel. Notification contains information of new Redis
master, which is parsed from message and sent to upper layer via
'StateChangedCb' callback (if callback is set).

When notifications are subscribed from Redis (from Sentinel this case),
connection will go to "subscribed state", and only some pub/sub related
commands are allowed.

Due the above reason, we have two connections (command dispatchers).
One to subscribe notifications and the other for Sentinel commands,
like master inquiry.

Refer to below web page for further details:
https://redis.io/topics/pubsub

In case that subscriber connection goes down, subscription for
notifications is renewed once connection to Sentinel is working again.
Extra master inquiry will be made because we might be missed
notifications during connection cut. Latest master address is refreshed
via 'StateChangedCb', even if has not changed compared to last informed
address. This could be optimized, but as being very rare situation
was not seen worth of extra logic.

In case that the other connection (used for Sentinel commands) is cut,
the related command dispatcher will re-connect in the background.
Possible Sentinel commands during connection cut will fail and trigger
retry after short delay (per already existing implementation).

If some notifications are missed due some other reason than connection
cut, SDL might go to the state that operations will made to Redis slave.
In this situation write operations will fail with a new internal
'AsyncRedisCommandDispatcherErrorCode::WRITING_TO_SLAVE' error code,
which is mapped to 'shareddatalayer::Error::BACKEND_FAILURE'. Recovery
instructions adjusted a bit, so that re-creating SDL API instance is
not optional recovery step (it is the only way to recover from above
mentioned situation currently).

Sentinel support is still disabled, missing implementation will be
added soon as a separate commit(s).

Signed-off-by: Rolf Badorek <rolf.badorek@nokia.com>
Change-Id: I1bb9e121985ee22278e780e50ab13f88acdc65c5
diff --git a/tst/asyncsentineldatabasediscovery_test.cpp b/tst/asyncsentineldatabasediscovery_test.cpp
index dcf35c7..ab48284 100644
--- a/tst/asyncsentineldatabasediscovery_test.cpp
+++ b/tst/asyncsentineldatabasediscovery_test.cpp
@@ -16,6 +16,7 @@
 
 #include <gtest/gtest.h>
 #include <arpa/inet.h>
+#include <string>
 #include <sdl/asyncstorage.hpp>
 #include "private/createlogger.hpp"
 #include "private/hostandport.hpp"
@@ -39,60 +40,131 @@
     public:
         std::unique_ptr<AsyncSentinelDatabaseDiscovery> asyncSentinelDatabaseDiscovery;
         std::shared_ptr<StrictMock<EngineMock>> engineMock;
+        std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> subscriberMock;
         std::shared_ptr<StrictMock<AsyncCommandDispatcherMock>> dispatcherMock;
         std::shared_ptr<StrictMock<ContentsBuilderMock>> contentsBuilderMock;
         std::shared_ptr<Logger> logger;
         Contents contents;
+        AsyncCommandDispatcher::ConnectAck subscriberConnectAck;
+        AsyncCommandDispatcher::DisconnectCb subscriberDisconnectCb;
         AsyncCommandDispatcher::ConnectAck dispatcherConnectAck;
-        AsyncCommandDispatcher::CommandCb savedCommandCb;
-        ReplyMock replyMock;
+        AsyncCommandDispatcher::CommandCb savedSubscriberCommandCb;
+        AsyncCommandDispatcher::CommandCb savedDispatcherCommandCb;
+        ReplyMock masterInquiryReplyMock;
         std::string someHost;
         uint16_t somePort;
+        std::string someOtherHost;
+        uint16_t someOtherPort;
         Reply::DataItem hostDataItem;
         Reply::DataItem portDataItem;
         std::shared_ptr<ReplyMock> masterInquiryReplyHost;
         std::shared_ptr<ReplyMock> masterInquiryReplyPort;
         Reply::ReplyVector masterInquiryReply;
         Timer::Duration expectedMasterInquiryRetryTimerDuration;
-        Timer::Callback savedConnectionRetryTimerCallback;
+        Timer::Callback savedMasterInquiryRetryTimerCallback;
+        // Mocks for SUBSCRIBE command replies are a bit complicated, because reply might have several
+        // meanings/structures: https://redis.io/topics/pubsub#format-of-pushed-messages
+        ReplyMock subscribeReplyMock;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement0;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement1;
+        std::shared_ptr<ReplyMock> subscribeReplyArrayElement2;
+        Reply::ReplyVector subscribeReplyVector;
+        Reply::DataItem subscribeDataItem;
+        ReplyMock notificationReplyMock;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement0;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement1;
+        std::shared_ptr<ReplyMock> notificationReplyArrayElement2;
+        Reply::ReplyVector notificationReplyVector;
+        Reply::DataItem notificationDataItem;
+        std::string notificationMessage;
+        Reply::DataItem notificationMessageDataItem;
+        Timer::Duration expectedSubscribeRetryTimerDuration;
+        Timer::Callback savedSubscribeRetryTimerCallback;
 
         AsyncSentinelDatabaseDiscoveryBaseTest():
             engineMock(std::make_shared<StrictMock<EngineMock>>()),
-            dispatcherMock(std::make_shared<StrictMock<AsyncCommandDispatcherMock>>()),
             contentsBuilderMock(std::make_shared<StrictMock<ContentsBuilderMock>>(AsyncStorage::SEPARATOR)),
             logger(createLogger(SDL_LOG_PREFIX)),
             contents({{"aaa","bbb"},{3,3}}),
             someHost("somehost"),
             somePort(1234),
+            someOtherHost("someotherhost"),
+            someOtherPort(5678),
             hostDataItem({someHost,ReplyStringLength(someHost.length())}),
             portDataItem({std::to_string(somePort),ReplyStringLength(std::to_string(somePort).length())}),
             masterInquiryReplyHost(std::make_shared<ReplyMock>()),
             masterInquiryReplyPort(std::make_shared<ReplyMock>()),
-            expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1))
+            expectedMasterInquiryRetryTimerDuration(std::chrono::seconds(1)),
+            subscribeReplyArrayElement0(std::make_shared<ReplyMock>()),
+            subscribeReplyArrayElement1(std::make_shared<ReplyMock>()),
+            subscribeReplyArrayElement2(std::make_shared<ReplyMock>()),
+            subscribeDataItem({"subscribe",9}),
+            notificationReplyArrayElement0(std::make_shared<ReplyMock>()),
+            notificationReplyArrayElement1(std::make_shared<ReplyMock>()),
+            notificationReplyArrayElement2(std::make_shared<ReplyMock>()),
+            notificationDataItem({"message",7}),
+            notificationMessage("mymaster " + someHost + " " + std::to_string(somePort) + " " + someOtherHost + " " + std::to_string(someOtherPort)),
+            notificationMessageDataItem({notificationMessage, ReplyStringLength(notificationMessage.length())}),
+            expectedSubscribeRetryTimerDuration(std::chrono::seconds(1))
         {
             masterInquiryReply.push_back(masterInquiryReplyHost);
             masterInquiryReply.push_back(masterInquiryReplyPort);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement0);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement1);
+            subscribeReplyVector.push_back(subscribeReplyArrayElement2);
+            notificationReplyVector.push_back(notificationReplyArrayElement0);
+            notificationReplyVector.push_back(notificationReplyArrayElement1);
+            notificationReplyVector.push_back(notificationReplyArrayElement2);
         }
 
         virtual ~AsyncSentinelDatabaseDiscoveryBaseTest()
         {
         }
 
-        std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine&,
-                                                                              const DatabaseInfo&,
-                                                                              std::shared_ptr<ContentsBuilder>)
+        std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator()
         {
             // @TODO Add database info checking when configuration support for sentinel is added.
-            newDispatcherCreated();
-            return dispatcherMock;
+            if (!subscriberMock)
+            {
+                subscriberMock = std::make_shared<StrictMock<AsyncCommandDispatcherMock>>();
+                newDispatcherCreated();
+                return subscriberMock;
+            }
+            if (!dispatcherMock)
+            {
+                dispatcherMock = std::make_shared<StrictMock<AsyncCommandDispatcherMock>>();
+                newDispatcherCreated();
+                return dispatcherMock;
+            }
+            return nullptr;
         }
 
         MOCK_METHOD0(newDispatcherCreated, void());
 
-        void expectNewDispatcherCreated()
+        void expectDispatchersCreated()
         {
             EXPECT_CALL(*this, newDispatcherCreated())
-                .Times(1);
+                .Times(2);
+        }
+
+        void expectSubscriberWaitConnectedAsync()
+        {
+            EXPECT_CALL(*subscriberMock, waitConnectedAsync(_))
+                .Times(1)
+                .WillOnce(Invoke([this](const AsyncCommandDispatcher::ConnectAck& connectAck)
+                        {
+                            subscriberConnectAck = connectAck;
+                        }));
+        }
+
+        void expectSubscriberRegisterDisconnectCb()
+        {
+            EXPECT_CALL(*subscriberMock, registerDisconnectCb(_))
+                .Times(1)
+                .WillOnce(Invoke([this](const AsyncCommandDispatcher::DisconnectCb& disconnectCb)
+                        {
+                            subscriberDisconnectCb = disconnectCb;
+                        }));
         }
 
         void expectDispatcherWaitConnectedAsync()
@@ -106,6 +178,14 @@
         }
 
         void expectContentsBuild(const std::string& string,
+                                 const std::string& string2)
+        {
+            EXPECT_CALL(*contentsBuilderMock, build(string, string2))
+                .Times(1)
+                .WillOnce(Return(contents));
+        }
+
+        void expectContentsBuild(const std::string& string,
                                  const std::string& string2,
                                  const std::string& string3)
         {
@@ -114,28 +194,41 @@
                 .WillOnce(Return(contents));
         }
 
-        void expectDispatchAsync()
+        void expectSubscriberDispatchAsync()
+        {
+            EXPECT_CALL(*subscriberMock, dispatchAsync(_, _, contents))
+                .Times(1)
+                .WillOnce(SaveArg<0>(&savedSubscriberCommandCb));
+        }
+
+        void expectDispatcherDispatchAsync()
         {
             EXPECT_CALL(*dispatcherMock, dispatchAsync(_, _, contents))
                 .Times(1)
-                .WillOnce(SaveArg<0>(&savedCommandCb));
+                .WillOnce(SaveArg<0>(&savedDispatcherCommandCb));
+        }
+
+        void expectSubscribeNotifications()
+        {
+            expectContentsBuild("SUBSCRIBE", "+switch-master");
+            expectSubscriberDispatchAsync();
         }
 
         void expectMasterInquiry()
         {
             expectContentsBuild("SENTINEL", "get-master-addr-by-name", "mymaster");
-            expectDispatchAsync();
+            expectDispatcherDispatchAsync();
         }
 
         MOCK_METHOD1(stateChangedCb, void(const DatabaseInfo&));
 
-        void expectStateChangedCb()
+        void expectStateChangedCb(const std::string& host, uint16_t port)
         {
             EXPECT_CALL(*this, stateChangedCb(_))
                 .Times(1)
-                .WillOnce(Invoke([this](const DatabaseInfo& databaseInfo)
+                .WillOnce(Invoke([this, host, port](const DatabaseInfo& databaseInfo)
                                  {
-                                     EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(someHost, htons(somePort)) }),
+                                     EXPECT_THAT(DatabaseConfiguration::Addresses({ HostAndPort(host, htons(port)) }),
                                                  ContainerEq(databaseInfo.hosts));
                                      EXPECT_EQ(DatabaseInfo::Type::SINGLE, databaseInfo.type);
                                      EXPECT_EQ(boost::none, databaseInfo.ns);
@@ -143,49 +236,50 @@
                                  }));
         }
 
-        void expectGetReplyType(ReplyMock& mock, const Reply::Type& type)
-        {
-            EXPECT_CALL(mock, getType())
-                .Times(1)
-                .WillOnce(Return(type));
-        }
-
-        void expectGetReplyArray_ReturnMasterInquiryReply()
-        {
-            EXPECT_CALL(replyMock, getArray())
-                .Times(1)
-                .WillOnce(Return(&masterInquiryReply));
-        }
-
-        void expectGetReplyString(ReplyMock& mock, const Reply::DataItem& item)
-        {
-            EXPECT_CALL(mock, getString())
-                .Times(1)
-                .WillOnce(Return(&item));
-        }
-
         void expectMasterIquiryReply()
         {
-            expectGetReplyType(replyMock, Reply::Type::ARRAY);
-            expectGetReplyArray_ReturnMasterInquiryReply();
-            expectGetReplyType(*masterInquiryReplyHost, Reply::Type::STRING);
-            expectGetReplyString(*masterInquiryReplyHost, hostDataItem);
-            expectGetReplyType(*masterInquiryReplyPort, Reply::Type::STRING);
-            expectGetReplyString(*masterInquiryReplyPort, portDataItem);
+            expectGetType(masterInquiryReplyMock, Reply::Type::ARRAY);
+            expectGetArray(masterInquiryReplyMock, masterInquiryReply);
+            expectGetType(*masterInquiryReplyHost, Reply::Type::STRING);
+            expectGetString(*masterInquiryReplyHost, hostDataItem);
+            expectGetType(*masterInquiryReplyPort, Reply::Type::STRING);
+            expectGetString(*masterInquiryReplyPort, portDataItem);
         }
 
         void expectMasterInquiryRetryTimer()
         {
             EXPECT_CALL(*engineMock, armTimer(_, expectedMasterInquiryRetryTimerDuration, _))
                 .Times(1)
-                .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
+                .WillOnce(SaveArg<2>(&savedMasterInquiryRetryTimerCallback));
+        }
+
+        void expectSubscribeRetryTimer()
+        {
+            EXPECT_CALL(*engineMock, armTimer(_, expectedSubscribeRetryTimerDuration, _))
+                .Times(1)
+                .WillOnce(SaveArg<2>(&savedSubscribeRetryTimerCallback));
+        }
+
+        void setStateChangedCbExpectsBeforeMasterInquiry()
+        {
+            expectSubscriberRegisterDisconnectCb();
+            expectSubscriberWaitConnectedAsync();
+            asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::stateChangedCb,
+                    this,
+                    std::placeholders::_1));
+            expectSubscribeNotifications();
+            subscriberConnectAck();
+            expectSubscribeReply();
+            expectDispatcherWaitConnectedAsync();
+            savedSubscriberCommandCb(std::error_code(), subscribeReplyMock);
+            expectMasterInquiry();
         }
 
         void setDefaultResponsesForMasterInquiryReplyParsing()
         {
-            ON_CALL(replyMock, getType())
+            ON_CALL(masterInquiryReplyMock, getType())
                 .WillByDefault(Return(Reply::Type::ARRAY));
-            ON_CALL(replyMock, getArray())
+            ON_CALL(masterInquiryReplyMock, getArray())
                 .WillByDefault(Return(&masterInquiryReply));
             ON_CALL(*masterInquiryReplyHost, getType())
                 .WillByDefault(Return(Reply::Type::STRING));
@@ -196,6 +290,68 @@
             ON_CALL(*masterInquiryReplyHost, getString())
                 .WillByDefault(Return(&portDataItem));
         }
+
+        void expectGetType(ReplyMock& mock, const Reply::Type& type)
+        {
+            EXPECT_CALL(mock, getType())
+                .Times(1)
+                .WillOnce(Return(type));
+        }
+
+        void expectGetString(ReplyMock& mock, const Reply::DataItem& item)
+        {
+            EXPECT_CALL(mock, getString())
+                .Times(1)
+                .WillOnce(Return(&item));
+        }
+
+        void expectGetInteger(ReplyMock& mock, int value)
+        {
+            EXPECT_CALL(mock, getInteger())
+                .Times(1)
+                .WillOnce(Return(value));
+        }
+
+        void expectGetArray(ReplyMock& mock, Reply::ReplyVector& replyVector)
+        {
+            EXPECT_CALL(mock, getArray())
+                .Times(1)
+                .WillOnce(Return(&replyVector));
+        }
+
+        void expectSubscribeReply()
+        {
+            expectGetType(subscribeReplyMock, Reply::Type::ARRAY);
+            expectGetArray(subscribeReplyMock, subscribeReplyVector);
+            expectGetType(*subscribeReplyArrayElement0, Reply::Type::STRING);
+            expectGetString(*subscribeReplyArrayElement0, subscribeDataItem);
+        }
+
+        void expectNotificationReply()
+        {
+            expectGetType(notificationReplyMock, Reply::Type::ARRAY);
+            expectGetArray(notificationReplyMock, notificationReplyVector);
+            expectGetType(*notificationReplyArrayElement0, Reply::Type::STRING);
+            expectGetString(*notificationReplyArrayElement0, notificationDataItem);
+            expectGetType(*notificationReplyArrayElement2, Reply::Type::STRING);
+            expectGetString(*notificationReplyArrayElement2, notificationMessageDataItem);
+        }
+
+        void setDefaultResponsesForNotificationReplyParsing()
+        {
+            ON_CALL(notificationReplyMock, getType())
+                .WillByDefault(Return(Reply::Type::ARRAY));
+            ON_CALL(notificationReplyMock, getArray())
+                .WillByDefault(Return(&notificationReplyVector));
+            ON_CALL(*notificationReplyArrayElement0, getType())
+                .WillByDefault(Return(Reply::Type::STRING));
+            ON_CALL(*notificationReplyArrayElement0, getString())
+                .WillByDefault(Return(&notificationDataItem));
+            ON_CALL(*notificationReplyArrayElement2, getType())
+                .WillByDefault(Return(Reply::Type::STRING));
+            ON_CALL(*notificationReplyArrayElement2, getString())
+                .WillByDefault(Return(&notificationMessageDataItem));
+        }
     };
 
     class AsyncSentinelDatabaseDiscoveryTest: public AsyncSentinelDatabaseDiscoveryBaseTest
@@ -203,21 +359,42 @@
     public:
         AsyncSentinelDatabaseDiscoveryTest()
         {
-            expectNewDispatcherCreated();
+            expectDispatchersCreated();
             asyncSentinelDatabaseDiscovery.reset(
                     new AsyncSentinelDatabaseDiscovery(
                             engineMock,
                             logger,
                             std::bind(&AsyncSentinelDatabaseDiscoveryBaseTest::asyncCommandDispatcherCreator,
-                                      this,
-                                      std::placeholders::_1,
-                                      std::placeholders::_2,
-                                      std::placeholders::_3),
+                                      this),
                             contentsBuilderMock));
         }
+
+        ~AsyncSentinelDatabaseDiscoveryTest()
+        {
+            EXPECT_CALL(*subscriberMock, disableCommandCallbacks())
+                .Times(1);
+            EXPECT_CALL(*dispatcherMock, disableCommandCallbacks())
+                .Times(1);
+        }
+    };
+
+    class AsyncSentinelDatabaseDiscoveryInListeningModeTest: public AsyncSentinelDatabaseDiscoveryTest
+    {
+    public:
+        AsyncSentinelDatabaseDiscoveryInListeningModeTest()
+        {
+            InSequence dummy;
+            setStateChangedCbExpectsBeforeMasterInquiry();
+            dispatcherConnectAck();
+            expectMasterIquiryReply();
+            expectStateChangedCb(someHost, somePort);
+            savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+        }
     };
 
     using AsyncSentinelDatabaseDiscoveryDeathTest = AsyncSentinelDatabaseDiscoveryTest;
+
+    using AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest = AsyncSentinelDatabaseDiscoveryInListeningModeTest;
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryBaseTest, IsNotCopyable)
@@ -233,115 +410,185 @@
     EXPECT_TRUE((std::is_base_of<AsyncDatabaseDiscovery, AsyncSentinelDatabaseDiscovery>::value));
 }
 
-TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterIsInquiredFromSentinel)
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, SettingChangedCallbackTriggersSentinelNotificationsSubscriptionAndMasterInquiry)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterIquiryReply();
-    expectStateChangedCb();
-    savedCommandCb(std::error_code(), replyMock);
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }
 
-TEST_F(AsyncSentinelDatabaseDiscoveryTest, RedisMasterInquiryErrorTriggersRetry)
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, MasterInquiryErrorTriggersRetry)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterInquiryRetryTimer();
-    savedCommandCb(getWellKnownErrorCode(), replyMock);
+    savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock);
     expectMasterInquiry();
-    savedConnectionRetryTimerCallback();
+    savedMasterInquiryRetryTimerCallback();
     expectMasterIquiryReply();
-    expectStateChangedCb();
-    savedCommandCb(std::error_code(), replyMock);
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidReplyType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
-    ON_CALL(replyMock, getType())
+    ON_CALL(masterInquiryReplyMock, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidHostElementType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     ON_CALL(*masterInquiryReplyHost, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_InvalidPortElementType)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     ON_CALL(*masterInquiryReplyPort, getType())
         .WillByDefault(Return(Reply::Type::NIL));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryDeathTest, MasterInquiryParsingErrorAborts_PortCantBeCastedToInt)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     setDefaultResponsesForMasterInquiryReplyParsing();
     std::string invalidPort("invalidPort");
     Reply::DataItem invalidPortDataItem({invalidPort,ReplyStringLength(invalidPort.length())});
     ON_CALL(*masterInquiryReplyPort, getString())
         .WillByDefault(Return(&invalidPortDataItem));
-    EXPECT_EXIT(savedCommandCb(std::error_code(), replyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
+    EXPECT_EXIT(savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock), KilledBySignal(SIGABRT), ".*Master inquiry reply parsing error");
 }
 
 TEST_F(AsyncSentinelDatabaseDiscoveryTest, CallbackIsNotCalledAfterCleared)
 {
     InSequence dummy;
-    expectDispatcherWaitConnectedAsync();
-    asyncSentinelDatabaseDiscovery->setStateChangedCb(std::bind(&AsyncSentinelDatabaseDiscoveryTest::stateChangedCb,
-            this,
-            std::placeholders::_1));
-    expectMasterInquiry();
+    setStateChangedCbExpectsBeforeMasterInquiry();
     dispatcherConnectAck();
     expectMasterInquiryRetryTimer();
-    savedCommandCb(getWellKnownErrorCode(), replyMock);
+    savedDispatcherCommandCb(getWellKnownErrorCode(), masterInquiryReplyMock);
     expectMasterInquiry();
-    savedConnectionRetryTimerCallback();
+    savedMasterInquiryRetryTimerCallback();
     expectMasterIquiryReply();
     asyncSentinelDatabaseDiscovery->clearStateChangedCb();
     EXPECT_CALL(*this, stateChangedCb(_))
         .Times(0);
-    savedCommandCb(std::error_code(), replyMock);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryTest, ChangeNotificationFromSentinel)
+{
+    InSequence dummy;
+    setStateChangedCbExpectsBeforeMasterInquiry();
+    dispatcherConnectAck();
+    expectMasterIquiryReply();
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
+    expectNotificationReply();
+    expectStateChangedCb(someOtherHost, someOtherPort);
+    savedSubscriberCommandCb(std::error_code(), notificationReplyMock);
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscribeCommandErrorTriggersRetry)
+{
+    InSequence dummy;
+    expectSubscribeRetryTimer();
+    savedSubscriberCommandCb(getWellKnownErrorCode(), subscribeReplyMock);
+    expectSubscribeNotifications();
+    savedSubscribeRetryTimerCallback();
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidReplyType)
+{
+    InSequence dummy;
+    ON_CALL(notificationReplyMock, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKindElementType)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    ON_CALL(*notificationReplyArrayElement0, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidKind)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidKind("invalidKind");
+    Reply::DataItem invalidKindDataItem({invalidKind,ReplyStringLength(invalidKind.length())});
+    ON_CALL(*notificationReplyArrayElement0, getString())
+        .WillByDefault(Return(&invalidKindDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageElementType)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    ON_CALL(*notificationReplyArrayElement2, getType())
+        .WillByDefault(Return(Reply::Type::NIL));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*SUBSCRIBE command reply parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidMessageStructure)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidMessage("mymaster oldHost 1234 5678");
+    auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())}));
+    ON_CALL(*notificationReplyArrayElement2, getString())
+        .WillByDefault(Return(&invalidMessageDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeDeathTest, SubscribeReplyParsingErrorAborts_InvalidPort)
+{
+    InSequence dummy;
+    setDefaultResponsesForNotificationReplyParsing();
+    std::string invalidMessage("mymaster oldHost 1234 newHost invalidPort");
+    auto invalidMessageDataItem(Reply::DataItem({invalidMessage, ReplyStringLength(invalidMessage.length())}));
+    ON_CALL(*notificationReplyArrayElement2, getString())
+        .WillByDefault(Return(&invalidMessageDataItem));
+    EXPECT_EXIT(savedSubscriberCommandCb(std::error_code(), notificationReplyMock), KilledBySignal(SIGABRT), ".*Notification message parsing error");
+}
+
+TEST_F(AsyncSentinelDatabaseDiscoveryInListeningModeTest, SubscriberDisconnectCallbackTriggersSubscriptionRenewal)
+{
+    InSequence dummy;
+    expectSubscriberWaitConnectedAsync();
+    subscriberDisconnectCb();
+    expectSubscribeNotifications();
+    subscriberConnectAck();
+    expectSubscribeReply();
+    expectDispatcherWaitConnectedAsync();
+    savedSubscriberCommandCb(std::error_code(), subscribeReplyMock);
+    expectMasterInquiry();
+    dispatcherConnectAck();
+    expectMasterIquiryReply();
+    expectStateChangedCb(someHost, somePort);
+    savedDispatcherCommandCb(std::error_code(), masterInquiryReplyMock);
 }