/*
   Copyright (c) 2018-2019 Nokia.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/

#include <type_traits>
#include <memory>
#include <cstring>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <sys/eventfd.h>
#include <arpa/inet.h>
#include <gtest/gtest.h>
#include <async.h>
#include "private/createlogger.hpp"
#include "private/error.hpp"
#include "private/logger.hpp"
#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
#include "private/redis/redisgeneral.hpp"
#include "private/redis/reply.hpp"
#include "private/redis/contents.hpp"
#include "private/tst/hiredisclustersystemmock.hpp"
#include "private/timer.hpp"
#include "private/tst/contentsbuildermock.hpp"
#include "private/tst/enginemock.hpp"
#include "private/tst/hiredisclusterepolladaptermock.hpp"
#include "private/tst/redisreplybuilder.hpp"

using namespace shareddatalayer;
using namespace shareddatalayer::redis;
using namespace shareddatalayer::tst;
using namespace testing;

namespace
{
    class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test
    {
    public:
        std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
        StrictMock<EngineMock> engineMock;
        HiredisClusterSystemMock hiredisClusterSystemMock;
        std::shared_ptr<HiredisClusterEpollAdapterMock> adapterMock;
        redisClusterAsyncContext acc;
        redisAsyncContext ac;
        int hiredisFd;
        std::unique_ptr<AsyncHiredisClusterCommandDispatcher> dispatcher;
        void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
        void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
        Timer::Callback savedConnectionRetryTimerCallback;
        Timer::Duration expectedRetryTimerDuration;
        Contents contents;
        Contents clusterConnectionSetupContents;
        RedisReplyBuilder redisReplyBuilder;
        const AsyncConnection::Namespace defaultNamespace;
        std::shared_ptr<Logger> logger;

        AsyncHiredisClusterCommandDispatcherBaseTest():
            contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
            adapterMock(std::make_shared<HiredisClusterEpollAdapterMock>(engineMock, hiredisClusterSystemMock)),
            acc { },
            ac { },
            hiredisFd(3),
            connected(nullptr),
            disconnected(nullptr),
            expectedRetryTimerDuration(std::chrono::seconds(1)),
            contents { { "CMD", "key1", "value1", "key2", "value2" },
                       { 3, 4, 6, 4, 6 } },
            redisReplyBuilder { },
            defaultNamespace("namespace"),
            logger(createLogger(SDL_LOG_PREFIX))
        {
        }

        virtual ~AsyncHiredisClusterCommandDispatcherBaseTest()
        {
        }

        MOCK_METHOD0(connectAck, void());

        MOCK_METHOD0(disconnectCallback, void());

        MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));

        void expectationsUntilConnect()
        {
            expectationsUntilConnect(acc);
        }

        void expectationsUntilConnect(redisClusterAsyncContext& acc)
        {
            expectRedisClusterAsyncConnect(acc);
        }

        void expectRedisClusterAsyncConnect()
        {
            expectRedisClusterAsyncConnect(acc);
        }

        void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc)
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
                                                                           HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
                .Times(1)
                .WillOnce(InvokeWithoutArgs([this, &acc]()
                                            {
                                                return &acc;
                                            }));
        }

        void expectRedisClusterAsyncConnectReturnNullptr()
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
                                                                           HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
                .Times(1)
                .WillOnce(InvokeWithoutArgs([this]()
                                            {
                                                return nullptr;
                                            }));
        }

        void expectRedisClusterAsyncSetConnectCallback()
        {
            expectRedisClusterAsyncSetConnectCallback(acc);
        }

        void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc)
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _))
                .Times(1)
                .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb)
                                 {
                                     connected = cb;
                                     return REDIS_OK;
                                 }));
        }

        void expectRedisClusterAsyncSetDisconnectCallback()
        {
            expectRedisClusterAsyncSetDisconnectCallback(acc);
        }

        void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc)
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _))
                .Times(1)
                .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb)
                                 {
                                     disconnected = cb;
                                     return REDIS_OK;
                                 }));
        }

        void expectCommandListQuery()
        {
            expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
        }

        void expectCommandListQueryReturnError()
        {
            expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
        }

        void expectAdapterSetup()
        {
            expectAdapterSetup(acc);
        }

        void expectAdapterSetup(redisClusterAsyncContext& acc)
        {
            EXPECT_CALL(*adapterMock, setup(&acc))
                .Times(1);
        }

        void expectAdapterDetach()
        {
            EXPECT_CALL(*adapterMock, detach(&ac))
                .Times(1);
        }

        void expectConnectAck()
        {
            EXPECT_CALL(*this, connectAck())
                .Times(1);
        }

        void expectDisconnectCallback()
        {
            EXPECT_CALL(*this, disconnectCallback())
                .Times(1);
        }

        void expectRedisClusterAsyncFree()
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
                .Times(1);
        }

        void expectRedisClusterAsyncDisconnect()
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc))
                .Times(1);
        }

        void verifyAckErrorReply(const Reply& reply)
        {
            EXPECT_EQ(Reply::Type::NIL, reply.getType());
            EXPECT_EQ(0, reply.getInteger());
            EXPECT_TRUE(reply.getString()->str.empty());
            EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
            EXPECT_TRUE(reply.getArray()->empty());
        }

        void expectAckError()
        {
            EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
                .Times(1)
                .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
                                 {
                                     verifyAckErrorReply(reply);
                                 }));
        }

        void expectAckError(const std::error_code& ec)
        {
            EXPECT_CALL(*this, ack(ec, _))
                .Times(1)
                .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
                                 {
                                     verifyAckErrorReply(reply);
                                 }));
        }

        void expectArmConnectionRetryTimer()
        {
            EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _))
                .Times(1)
                .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
        }

        void expectDisarmConnectionRetryTimer()
        {
            EXPECT_CALL(engineMock, disarmTimer(_))
                .Times(1);
        }

        void expectRedisClusterAsyncCommandArgv(redisReply& rr)
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
                .Times(1)
                .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
                                       int, const char**, const size_t*)
                                 {
                                     cb(acc, &rr, pd);
                                     return REDIS_OK;
                                 }));
        }

        void expectAck()
        {
            EXPECT_CALL(*this, ack(std::error_code(), _))
                .Times(1);
        }

        void expectReplyError(const std::string& msg)
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
                .Times(1)
                .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*,
                                    int, int, const char**, const size_t*)
                                 {
                                     cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd);
                                     return REDIS_OK;
                                 }));
        }

        void expectContextError(int code)
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
                .Times(1)
                .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
                                        int, const char**, const size_t*)
                                 {
                                     acc->err = code;
                                     cb(acc, nullptr, pd);
                                     return REDIS_OK;
                                 }));
        }

        void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd)
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
                .Times(1)
                .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc)
                                 {
                                     cb(acc, &redisReplyBuilder.buildNilReply(), pd);
                                 }));
        }

        void expectAckNotCalled()
        {
            EXPECT_CALL(*this, ack(_,_))
                .Times(0);
        }

        void expectionsForSuccessfullConnectionSetup()
        {
            expectationsUntilConnect();
            expectAdapterSetup();
            expectRedisClusterAsyncSetConnectCallback();
            expectRedisClusterAsyncSetDisconnectCallback();
            expectCommandListQuery();
        }

        void callConnectionRetryTimerCallback()
        {
            ASSERT_NE(savedConnectionRetryTimerCallback, nullptr);
            savedConnectionRetryTimerCallback();
        }
    };

    class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
    {
    public:
        AsyncHiredisClusterCommandDispatcherDisconnectedTest()
        {
            InSequence dummy;
            expectationsUntilConnect();
            expectAdapterSetup();
            expectRedisClusterAsyncSetConnectCallback();
            expectRedisClusterAsyncSetDisconnectCallback();
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
                .Times(1);
            dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
                                                                      defaultNamespace,
                                                                      { { "addr1", 111 }, { "addr2", 222 } },
                                                                      contentsBuilderMock,
                                                                      false,
                                                                      hiredisClusterSystemMock,
                                                                      adapterMock,
                                                                      logger));
        }

        ~AsyncHiredisClusterCommandDispatcherDisconnectedTest()
        {
        }
    };

    class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest
    {
    public:
        AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
        {
            InSequence dummy;
            expectionsForSuccessfullConnectionSetup();
            dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
                                                                      defaultNamespace,
                                                                      { { "addr1", 111 }, { "addr2", 222 } },
                                                                      contentsBuilderMock,
                                                                      true,
                                                                      hiredisClusterSystemMock,
                                                                      adapterMock,
                                                                      logger));
        }

        ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
        {
            expectRedisClusterAsyncFree();
        }
    };

    class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
    {
    public:
        redisClusterCallbackFn* savedCb;
        void* savedPd;

        AsyncHiredisClusterCommandDispatcherConnectedTest():
            savedCb(nullptr),
            savedPd(nullptr)
        {
            InSequence dummy;
            expectionsForSuccessfullConnectionSetup();
            dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
                                                                      defaultNamespace,
                                                                      { { "addr1", 111 }, { "addr2", 222 } },
                                                                      contentsBuilderMock,
                                                                      false,
                                                                      hiredisClusterSystemMock,
                                                                      adapterMock,
                                                                      logger));
            connected(&acc, &ac, 0);
        }

        ~AsyncHiredisClusterCommandDispatcherConnectedTest()
        {
            expectRedisClusterAsyncFree();
        }

        void expectRedisClusterAsyncCommandArgvWithKey_SaveCb()
        {
            EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
                .Times(1)
                .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
                                        const char*, int, int, const char**, const size_t*)
                                 {
                                     savedCb = cb;
                                     savedPd = pd;
                                     return REDIS_OK;
                                 }));
        }
    };

    using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest;
}

TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable)
{
    EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisClusterCommandDispatcher>::value);
    EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisClusterCommandDispatcher>::value);
}

TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
{
    EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisClusterCommandDispatcher>::value));
}

TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
{
    Engine::Callback storedCallback;
    EXPECT_CALL(engineMock, postCallback(_))
        .Times(1)
        .WillOnce(SaveArg<0>(&storedCallback));
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              { });
    expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED);
    storedCallback();
}

TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer)
{
    InSequence dummy;
    acc.err = 123;
    expectationsUntilConnect();
    expectArmConnectionRetryTimer();
    expectDisarmConnectionRetryTimer();

    dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
                                                              defaultNamespace,
                                                              { { "addr1", 111 }, { "addr2", 222 } },
                                                              contentsBuilderMock,
                                                              false,
                                                              hiredisClusterSystemMock,
                                                              adapterMock,
                                                              logger));
}

TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
{
    InSequence dummy;
    expectRedisClusterAsyncConnectReturnNullptr();
    expectArmConnectionRetryTimer();
    expectDisarmConnectionRetryTimer();

    dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
                                                              defaultNamespace,
                                                              { { "addr1", 111 }, { "addr2", 222 } },
                                                              contentsBuilderMock,
                                                              false,
                                                              hiredisClusterSystemMock,
                                                              adapterMock,
                                                              logger));
}

TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer)
{
    InSequence dummy;
    Engine::Callback storedCallback;
    expectationsUntilConnect();
    expectAdapterSetup();
    expectRedisClusterAsyncSetConnectCallback();
    expectRedisClusterAsyncSetDisconnectCallback();
    expectCommandListQueryReturnError();
    expectArmConnectionRetryTimer();

    dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
                                                              defaultNamespace,
                                                              { { "addr1", 111 }, { "addr2", 222 } },
                                                              contentsBuilderMock,
                                                              false,
                                                              hiredisClusterSystemMock,
                                                              adapterMock,
                                                              logger));

    expectDisarmConnectionRetryTimer();
}

TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer)
{
    InSequence dummy;
    expectRedisClusterAsyncConnectReturnNullptr();
    expectArmConnectionRetryTimer();

    dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
                                                              defaultNamespace,
                                                              { { "addr1", 111 }, { "addr2", 222 } },
                                                              contentsBuilderMock,
                                                              false,
                                                              hiredisClusterSystemMock,
                                                              adapterMock,
                                                              logger));

    expectionsForSuccessfullConnectionSetup();
    expectRedisClusterAsyncFree();

    callConnectionRetryTimerCallback();
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected)
{
    Engine::Callback storedCallback;
    EXPECT_CALL(engineMock, postCallback(_))
        .Times(1)
        .WillOnce(SaveArg<0>(&storedCallback));
    dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck,
                                             this));
    expectConnectAck();
    storedCallback();
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands)
{
    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
        .Times(1)
        .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key,
                                int keylen, int argc, const char** argv, const size_t* argvlen)
                         {
                             EXPECT_STREQ(defaultNamespace.c_str(), key);
                             EXPECT_EQ(9, keylen);
                             EXPECT_EQ((int)contents.stack.size(), argc);
                             EXPECT_EQ(contents.sizes[0], argvlen[0]);
                             EXPECT_EQ(contents.sizes[1], argvlen[1]);
                             EXPECT_EQ(contents.sizes[2], argvlen[2]);
                             EXPECT_EQ(contents.sizes[3], argvlen[3]);
                             EXPECT_EQ(contents.sizes[4], argvlen[4]);
                             EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
                             EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
                             EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
                             EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
                             EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
                             cb(acc, &redisReplyBuilder.buildNilReply(), pd);
                             return REDIS_OK;
                         }));
    expectAck();
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply)
{
    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply());
    EXPECT_CALL(*this, ack(std::error_code(), _))
        .Times(1)
        .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
                         {
                             EXPECT_EQ(Reply::Type::NIL, reply.getType());
                             EXPECT_EQ(0, reply.getInteger());
                             EXPECT_TRUE(reply.getString()->str.empty());
                             EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
                             EXPECT_TRUE(reply.getArray()->empty());
                         }));
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply)
{
    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
    EXPECT_CALL(*this, ack(std::error_code(), _))
        .Times(1)
        .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
                         {
                             auto expected(redisReplyBuilder.buildIntegerReply());
                             EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
                             EXPECT_EQ(expected.integer, reply.getInteger());
                         }));
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply)
{
    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
    EXPECT_CALL(*this, ack(std::error_code(), _))
        .Times(1)
        .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
                         {
                             auto expected(redisReplyBuilder.buildStatusReply());
                             EXPECT_EQ(Reply::Type::STATUS, reply.getType());
                             EXPECT_EQ(expected.len, reply.getString()->len);
                             EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
                         }));
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply)
{
    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply());
    EXPECT_CALL(*this, ack(std::error_code(), _))
        .Times(1)
        .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
                         {
                             auto expected(redisReplyBuilder.buildStringReply());
                             EXPECT_EQ(Reply::Type::STRING, reply.getType());
                             EXPECT_EQ(expected.len, reply.getString()->len);
                             EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
                         }));
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply)
{
    expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
    EXPECT_CALL(*this, ack(std::error_code(), _))
        .Times(1)
        .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
                         {
                             auto array(reply.getArray());
                             EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
                             EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
                             EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
                         }));
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
{
    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
        .Times(1)
        .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int,
                            const char**, const size_t*)
                         {
                             acc->err = REDIS_ERR;
                             return REDIS_ERR;
                         }));
    Engine::Callback storedCallback;
    EXPECT_CALL(engineMock, postCallback(_))
        .Times(1)
        .WillOnce(SaveArg<0>(&storedCallback));
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
    expectAckError();
    storedCallback();
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
{
    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
        .Times(1)
        .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
                            const char**, const size_t*)
                         {
                             cb(acc, nullptr, pd);
                             return REDIS_OK;
                         }));
    expectAckError();
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
{
    expectReplyError("LOADING Redis is loading the dataset in memory");
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply)
{
    //SDL checks only that reply starts with CLUSTERDOWN string
    expectReplyError("CLUSTERDOWN");
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);

    expectReplyError("CLUSTERDOWN The cluster is down");
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);

    expectReplyError("CLUSTERDOW");
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
{
    expectReplyError("ERR Protocol error: invalid bulk length");
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
{
    expectReplyError("something sinister");
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError)
{
    expectReplyError("");
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext)
{
    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
        .Times(1)
        .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
                            const char**, const size_t*)
                         {
                             acc->err = REDIS_ERR_IO;
                             errno = EINVAL;
                             cb(acc, nullptr, pd);
                             return REDIS_OK;
                         }));
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext)
{
    expectContextError(REDIS_ERR_EOF);
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext)
{
    expectContextError(REDIS_ERR_PROTOCOL);
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext)
{
    expectContextError(REDIS_ERR_OOM);
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext)
{
    expectContextError(CLUSTER_ERROR_NOT_CONNECTED);
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext)
{
    expectContextError(CLUSTER_ERROR_CONNECTION_LOST);
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
{
    expectContextError(REDIS_ERR_OTHER);
    EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
        .Times(1);
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
{
    InSequence dummy;
    expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
    expectAck();
    savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
    dispatcher->disableCommandCallbacks();
    expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
    expectAckNotCalled();
    savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter)
{
    InSequence dummy;
    expectAdapterDetach();
    disconnected(&acc, &ac, 0);
}

TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
{
    InSequence dummy;
    dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback,
                                             this));
    expectAdapterDetach();
    expectDisconnectCallback();
    disconnected(&acc, &ac, 0);
}

TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
{
    InSequence dummy;
    redisClusterCallbackFn* savedCb;
    void* savedPd;
    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
        .Times(1)
        .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
                                              const char*, int, int, const char**, const size_t*)
                         {
                             savedCb = cb;
                             savedPd = pd;
                             return REDIS_OK;
                         }));
    Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                              defaultNamespace,
                              contents);
    EXPECT_CALL(*this, ack(std::error_code(), _))
            .Times(3);
    redisReply rr;
    rr.type = REDIS_REPLY_NIL;
    savedCb(&acc, &rr, savedPd);
    savedCb(&acc, &rr, savedPd);
    savedCb(&acc, &rr, savedPd);
}

TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
{
    InSequence dummy;
    redisClusterCallbackFn* savedCb;
    void* savedPd;
    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
        .Times(1)
        .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd,
                                                    const char*, int, int, const char**, const size_t*)
                         {
                             savedCb = cb;
                             savedPd = pd;
                             cb(acc, &redisReplyBuilder.buildNilReply(), pd);
                             return REDIS_OK;
                         }));
    expectAck();
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
    EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
}

TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts)
{
    InSequence dummy;
    redisClusterCallbackFn* savedCb;
    void* savedPd;
    EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
        .Times(1)
        .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
                                              const char*, int, int, const char**, const size_t*)
                         {
                             savedCb = cb;
                             savedPd = pd;
                             return REDIS_OK;
                         }));
    Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
    expectAck();
    dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
                                        this,
                                        std::placeholders::_1,
                                        std::placeholders::_2),
                                        defaultNamespace,
                                        contents);
    savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd);
    EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
}
