blob: b50b971ef3d6d11ee24726cab5fc9bc7e275b8b9 [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
Timo Tietavainena0745d22019-11-28 09:55:22 +020017/*
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
20*/
21
Rolf Badorekef2bf512019-08-20 11:17:15 +030022#ifndef SHAREDDATALAYER_REDIS_ASYNCHIREDISCLUSTERCOMMANDDISPATCHER_HPP_
23#define SHAREDDATALAYER_REDIS_ASYNCHIREDISCLUSTERCOMMANDDISPATCHER_HPP_
24
25#include "private/redis/asynccommanddispatcher.hpp"
26#include "private/databaseconfiguration.hpp"
27#include "private/logger.hpp"
28#include "private/timer.hpp"
29#include <string>
30#include <set>
31#include <list>
32#include <vector>
33#include <map>
34#include <memory>
35#include <queue>
36#include <boost/optional.hpp>
37
38extern "C"
39{
40 struct redisReply;
41 struct redisClusterAsyncContext;
42 struct redisAsyncContext;
43}
44
45namespace shareddatalayer
46{
47 class Engine;
48
49 namespace redis
50 {
51 class HiredisClusterSystem;
52 class HiredisClusterEpollAdapter;
53 class Reply;
54
55 class AsyncHiredisClusterCommandDispatcher: public AsyncCommandDispatcher
56 {
57 public:
58 AsyncHiredisClusterCommandDispatcher(const AsyncHiredisClusterCommandDispatcher&) = delete;
59
60 AsyncHiredisClusterCommandDispatcher& operator = (const AsyncHiredisClusterCommandDispatcher&) = delete;
61
62 AsyncHiredisClusterCommandDispatcher(Engine& engine,
63 const boost::optional<std::string>& ns,
64 const DatabaseConfiguration::Addresses& addresses,
65 std::shared_ptr<ContentsBuilder> contentsBuilder,
66 bool usePermanentCommandCallbacks,
67 std::shared_ptr<Logger> logger);
68
69 AsyncHiredisClusterCommandDispatcher(Engine& engine,
70 const boost::optional<std::string>& ns,
71 const DatabaseConfiguration::Addresses& addresses,
72 std::shared_ptr<ContentsBuilder> contentsBuilder,
73 bool usePermanentCommandCallbacks,
74 HiredisClusterSystem& hiredisClusterSystem,
75 std::shared_ptr<HiredisClusterEpollAdapter> adapter,
76 std::shared_ptr<Logger> logger);
77
78 ~AsyncHiredisClusterCommandDispatcher() override;
79
80 void waitConnectedAsync(const ConnectAck& connectAck) override;
81
82 void registerDisconnectCb(const DisconnectCb& disconnectCb) override;
83
84 void dispatchAsync(const CommandCb& commandCb, const AsyncConnection::Namespace& ns, const Contents& contents) override;
85
86 void disableCommandCallbacks() override;
87
88 void handleReply(const CommandCb& commandCb, const std::error_code& error, const redisReply* rr);
89
90 bool isClientCallbacksEnabled() const;
91
92 void handleDisconnect(const redisAsyncContext* ac);
93
94 private:
95 enum class ServiceState
96 {
97 DISCONNECTED,
98 CONNECTED
99 };
100
101 using Callback = std::function<void(const Reply&)>;
102
103 Engine& engine;
104 const boost::optional<std::string> initialNamespace;
105 const DatabaseConfiguration::Addresses addresses;
106 std::shared_ptr<ContentsBuilder> contentsBuilder;
107 bool usePermanentCommandCallbacks;
108 HiredisClusterSystem& hiredisClusterSystem;
109 std::shared_ptr<HiredisClusterEpollAdapter> adapter;
110 redisClusterAsyncContext* acc;
111 ConnectAck connectAck;
112 DisconnectCb disconnectCallback;
113 ServiceState serviceState;
114 std::list<CommandCb> cbs;
115 bool clientCallbacksEnabled;
116 Timer connectionRetryTimer;
117 Timer::Duration connectionRetryTimerDuration;
118 std::shared_ptr<Logger> logger;
119
120 void connect();
121
122 bool isValidCb(const CommandCb& commandCb);
123
124 void removeCb(const CommandCb& commandCb);
125
126 void callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error);
127
128 void dispatchAsync(const CommandCb& commandCb, const AsyncConnection::Namespace& ns, const Contents& contents, bool checkConnectionState);
129
130 void verifyConnection();
131
132 void verifyConnectionReply(const std::error_code& error, const redis::Reply& reply);
133
134 void setConnected();
135
136 void armConnectionRetryTimer();
137
138 void disconnectHiredisCluster();
139 };
140 }
141}
142
143#endif