/*
   Copyright (c) 2018-2019 Nokia.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/

#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
#include <algorithm>
#include <cstring>
#include <cerrno>
#include <sstream>
#include "private/abort.hpp"
#include "private/createlogger.hpp"
#include "private/error.hpp"
#include "private/logger.hpp"
#include "private/redis/asyncredisreply.hpp"
#include "private/redis/reply.hpp"
#include "private/redis/hiredisclustersystem.hpp"
#include "private/engine.hpp"
#include "private/redis/hiredisclusterepolladapter.hpp"
#include "private/redis/contents.hpp"
#include "private/redis/redisgeneral.hpp"

using namespace shareddatalayer;
using namespace shareddatalayer::redis;

namespace
{
    void connectCb(const redisClusterAsyncContext*, const redisAsyncContext* ac, int status)
    {
        if (!status)
        {
            std::ostringstream msg;
            msg << "redis cluster instance connected, fd: " << ac->c.fd;
            logDebugOnce(msg.str());
        }
    }

    void disconnectCb(const redisClusterAsyncContext* acc, const redisAsyncContext* ac, int status)
    {
        if (status)
        {
            std::ostringstream msg;
            msg << "redis cluster instance disconnected, fd: " << ac->c.fd
                << ", status: " << ac->err;
            logDebugOnce(msg.str());
        }
        auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
        instance->handleDisconnect(ac);
    }

    void cb(redisClusterAsyncContext* acc, void* rr, void* pd)
    {
        auto instance(static_cast<AsyncHiredisClusterCommandDispatcher*>(acc->data));
        auto reply(static_cast<redisReply*>(rr));
        auto cb(static_cast<AsyncHiredisClusterCommandDispatcher::CommandCb*>(pd));
        if (instance->isClientCallbacksEnabled())
            instance->handleReply(*cb, getRedisError(acc->err, acc->errstr, reply), reply);
    }
}

AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
                                                                           const boost::optional<std::string>& ns,
                                                                           const DatabaseConfiguration::Addresses& addresses,
                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
                                                                           bool usePermanentCommandCallbacks,
                                                                           std::shared_ptr<Logger> logger):
    AsyncHiredisClusterCommandDispatcher(engine,
                                         ns,
                                         addresses,
                                         contentsBuilder,
                                         usePermanentCommandCallbacks,
                                         HiredisClusterSystem::getInstance(),
                                         std::make_shared<HiredisClusterEpollAdapter>(engine),
                                         logger)
{
}

AsyncHiredisClusterCommandDispatcher::AsyncHiredisClusterCommandDispatcher(Engine& engine,
                                                                           const boost::optional<std::string>& ns,
                                                                           const DatabaseConfiguration::Addresses& addresses,
                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
                                                                           bool usePermanentCommandCallbacks,
                                                                           HiredisClusterSystem& hiredisClusterSystem,
                                                                           std::shared_ptr<HiredisClusterEpollAdapter> adapter,
                                                                           std::shared_ptr<Logger> logger):
    engine(engine),
    initialNamespace(ns),
    addresses(addresses),
    contentsBuilder(contentsBuilder),
    usePermanentCommandCallbacks(usePermanentCommandCallbacks),
    hiredisClusterSystem(hiredisClusterSystem),
    adapter(adapter),
    acc(nullptr),
    serviceState(ServiceState::DISCONNECTED),
    clientCallbacksEnabled(true),
    connectionRetryTimer(engine),
    connectionRetryTimerDuration(std::chrono::seconds(1)),
    logger(logger)
{
    connect();
}

AsyncHiredisClusterCommandDispatcher::~AsyncHiredisClusterCommandDispatcher()
{
    disconnectHiredisCluster();
}

void AsyncHiredisClusterCommandDispatcher::connect()
{
    // Disconnect and free possibly (in re-connecting scenarios) already existing Redis cluster context.
    disconnectHiredisCluster();
    acc = hiredisClusterSystem.redisClusterAsyncConnect(formatToClusterSyntax(addresses).c_str(),
                                                        HIRCLUSTER_FLAG_ROUTE_USE_SLOTS);
    if (acc == nullptr)
    {
        logger->error() << "SDL: connecting to redis cluster failed, null context returned";
        armConnectionRetryTimer();
        return;
    }
    if (acc->err)
    {
        logger->error() << "SDL: connecting to redis cluster failed, error: " << acc->err;
        armConnectionRetryTimer();
        return;
    }
    acc->data = this;
    adapter->setup(acc);
    hiredisClusterSystem.redisClusterAsyncSetConnectCallback(acc, connectCb);
    hiredisClusterSystem.redisClusterAsyncSetDisconnectCallback(acc, disconnectCb);
    verifyConnection();
}

void AsyncHiredisClusterCommandDispatcher::verifyConnection()
{
    /* redisClusterAsyncConnect only queries available cluster nodes but it does
     * not connect to any cluster node (as it does not know to which node it should connect to).
     * As we use same cluster node for one SDL namespace (namespace is a hash tag), it is already
     * determined to which cluster node this instance will connect to. We do initial operation
     * to get connection to right redis node established already now. This also verifies that
     * connection really works. When Redis has max amount of users, it will still accept new
     * connections but is will close them immediately. Therefore, we need to verify that just
     * established connection really works.
     */
    /* Connection setup/verification is now done by doing redis command list query. Because we anyway
     * need to verify that Redis has required commands, we can now combine these two operations
     * (command list query and connection setup/verification). If either one of the functionalities
     * is not needed in the future and it is removed, remember to still leave the other one.
     */
    /* Non namespace-specific command list query can be used for connection setup purposes,
     * because SDL uses redisClusterAsyncCommandArgvWithKey which adds namespace to all
     * commands dispacthed.
     */

    /* If initial namespace was not given during dispatcher creation (multi namespace API),
     * verification is sent to hardcoded namespace. This works for verification purposes
     * because in our environment cluster is configured to operate only if all nodes
     * are working (so we can do verification to any node). However, this is not optimal
     * because we do not necessarily connect to such cluster node which will later be
     * used by client. Also our cluster configuration can change. This needs to be
     * optimized later (perhaps to connect to all nodes). */
    std::string nsForVerification;
    if (initialNamespace)
        nsForVerification = *initialNamespace;
    else
        nsForVerification = "namespace";

    dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcher::verifyConnectionReply,
                            this,
                            std::placeholders::_1,
                            std::placeholders::_2),
                  nsForVerification,
                  contentsBuilder->build("COMMAND"),
                  false);
}

void AsyncHiredisClusterCommandDispatcher::verifyConnectionReply(const std::error_code& error, const redis::Reply& reply)
{
    if(error)
    {
        logger->error() << "AsyncHiredisClusterCommandDispatcher: connection verification failed: "
                        << error.message();
        armConnectionRetryTimer();
    }
    else
    {
        if (checkRedisModuleCommands(parseCommandListReply(reply)))
            setConnected();
        else
            SHAREDDATALAYER_ABORT("Required Redis module extension commands not available.");
    }
}

void AsyncHiredisClusterCommandDispatcher::waitConnectedAsync(const ConnectAck& connectAck)
{
    this->connectAck = connectAck;
    if (serviceState == ServiceState::CONNECTED)
        engine.postCallback(connectAck);
}

void AsyncHiredisClusterCommandDispatcher::registerDisconnectCb(const DisconnectCb& disconnectCb)
{
    disconnectCallback = disconnectCb;
}

void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
                                                         const AsyncConnection::Namespace& ns,
                                                         const Contents& contents)
{
    dispatchAsync(commandCb, ns, contents, true);
}

void AsyncHiredisClusterCommandDispatcher::dispatchAsync(const CommandCb& commandCb,
                                                         const AsyncConnection::Namespace& ns,
                                                         const Contents& contents,
                                                         bool checkConnectionState)
{
    if (checkConnectionState && serviceState != ServiceState::CONNECTED)
    {
        engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
                                       this,
                                       commandCb,
                                       std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED)));
        return;
    }
    cbs.push_back(commandCb);
    std::vector<const char*> chars;
    std::transform(contents.stack.begin(), contents.stack.end(),
                   std::back_inserter(chars), [](const std::string& str){ return str.c_str(); });
    if (hiredisClusterSystem.redisClusterAsyncCommandArgvWithKey(acc, cb, &cbs.back(), ns.c_str(), static_cast<int>(ns.size()),
                                                                 static_cast<int>(contents.stack.size()), &chars[0],
                                                                 &contents.sizes[0]) != REDIS_OK)
    {
        removeCb(cbs.back());
        engine.postCallback(std::bind(&AsyncHiredisClusterCommandDispatcher::callCommandCbWithError,
                                       this,
                                       commandCb,
                                       getRedisError(acc->err, acc->errstr, nullptr)));
    }
}

void AsyncHiredisClusterCommandDispatcher::disableCommandCallbacks()
{
    clientCallbacksEnabled = false;
}

void AsyncHiredisClusterCommandDispatcher::callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error)
{
    commandCb(error, AsyncRedisReply());
}

void AsyncHiredisClusterCommandDispatcher::setConnected()
{
    serviceState = ServiceState::CONNECTED;

    if (connectAck)
    {
        connectAck();
        connectAck = ConnectAck();
    }
}

void AsyncHiredisClusterCommandDispatcher::armConnectionRetryTimer()
{
    connectionRetryTimer.arm(connectionRetryTimerDuration,
                             [this] () { connect(); });

}

void AsyncHiredisClusterCommandDispatcher::handleReply(const CommandCb& commandCb,
                                                       const std::error_code& error,
                                                       const redisReply* rr)
{
    if (!isValidCb(commandCb))
        SHAREDDATALAYER_ABORT("Invalid callback function.");
    if (error)
        commandCb(error, AsyncRedisReply());
    else
        commandCb(error, AsyncRedisReply(*rr));
    if (!usePermanentCommandCallbacks)
        removeCb(commandCb);
}

bool AsyncHiredisClusterCommandDispatcher::isClientCallbacksEnabled() const
{
    return clientCallbacksEnabled;
}

bool AsyncHiredisClusterCommandDispatcher::isValidCb(const CommandCb& commandCb)
{
    for (auto i(cbs.begin()); i != cbs.end(); ++i)
        if (&*i == &commandCb)
            return true;
    return false;
}

void AsyncHiredisClusterCommandDispatcher::removeCb(const CommandCb& commandCb)
{
    for (auto i(cbs.begin()); i != cbs.end(); ++i)
        if (&*i == &commandCb)
        {
            cbs.erase(i);
            break;
        }
}

void AsyncHiredisClusterCommandDispatcher::handleDisconnect(const redisAsyncContext* ac)
{
    adapter->detach(ac);

    if (disconnectCallback)
        disconnectCallback();
}

void AsyncHiredisClusterCommandDispatcher::disconnectHiredisCluster()
{
    /* hiredis sometimes crashes if redisClusterAsyncFree is called without being connected (even
     * if acc is a valid pointer).
     */
    if (serviceState == ServiceState::CONNECTED)
        hiredisClusterSystem.redisClusterAsyncFree(acc);

    serviceState = ServiceState::DISCONNECTED;
}
