blob: 7a466c90f4f7945546c045073a5c067e0a3cf2be [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef SHAREDDATALAYER_REDIS_ASYNCHIREDISCOMMANDDISPATCHER_HPP_
18#define SHAREDDATALAYER_REDIS_ASYNCHIREDISCOMMANDDISPATCHER_HPP_
19
20#include "private/redis/asynccommanddispatcher.hpp"
21#include <string>
22#include <list>
23#include <vector>
24#include <map>
25#include <memory>
26#include <queue>
27#include "private/logger.hpp"
28#include "private/timer.hpp"
29
30extern "C"
31{
32 struct redisReply;
33 struct redisAsyncContext;
34}
35
36namespace shareddatalayer
37{
38 class Engine;
39
40 namespace redis
41 {
42 class HiredisSystem;
43 class HiredisEpollAdapter;
44 class Reply;
45
46 class AsyncHiredisCommandDispatcher: public AsyncCommandDispatcher
47 {
48 public:
49 AsyncHiredisCommandDispatcher(const AsyncHiredisCommandDispatcher&) = delete;
50
51 AsyncHiredisCommandDispatcher& operator = (const AsyncHiredisCommandDispatcher&) = delete;
52
53 AsyncHiredisCommandDispatcher(Engine& engine,
54 const std::string& address,
55 uint16_t port,
56 std::shared_ptr<ContentsBuilder> contentsBuilder,
57 bool usePermanentCommandCallbacks,
58 std::shared_ptr<Logger> logger);
59
60 AsyncHiredisCommandDispatcher(Engine& engine,
61 const std::string& address,
62 uint16_t port,
63 std::shared_ptr<ContentsBuilder> contentsBuilder,
64 bool usePermanentCommandCallbacks,
65 HiredisSystem& hiredisSystem,
66 std::shared_ptr<HiredisEpollAdapter> adapter,
67 std::shared_ptr<Logger> logger);
68
69 ~AsyncHiredisCommandDispatcher() override;
70
71 void waitConnectedAsync(const ConnectAck& connectAck) override;
72
73 void registerDisconnectCb(const DisconnectCb& disconnectCb) override;
74
75 void dispatchAsync(const CommandCb& commandCb, const AsyncConnection::Namespace& ns,
76 const Contents& contents) override;
77
78 void disableCommandCallbacks() override;
79
80 void setConnected();
81
82 void setDisconnected();
83
84 void handleReply(const CommandCb& commandCb,
85 const std::error_code& error,
86 const redisReply* rr);
87
88 bool isClientCallbacksEnabled() const;
89
90 void verifyConnection();
91
92 void disconnectHiredis();
93
94 void armConnectionRetryTimer(Timer::Duration duration,
95 std::function<void()> retryAction);
96
97 private:
98 enum class ServiceState
99 { DISCONNECTED,
100 CONNECTION_VERIFICATION,
101 CONNECTED
102 };
103
104 using Callback = std::function<void(const Reply&)>;
105
106 Engine& engine;
107 std::string address;
108 uint16_t port;
109 std::shared_ptr<ContentsBuilder> contentsBuilder;
110 bool usePermanentCommandCallbacks;
111 HiredisSystem& hiredisSystem;
112 std::shared_ptr<HiredisEpollAdapter> adapter;
113 redisAsyncContext* ac;
114 ConnectAck connectAck;
115 DisconnectCb disconnectCallback;
116 ServiceState serviceState;
117 std::list<CommandCb> cbs;
118 bool clientCallbacksEnabled;
119 Timer connectionRetryTimer;
120 Timer::Duration connectionRetryTimerDuration;
121 Timer::Duration connectionVerificationRetryTimerDuration;
122 std::shared_ptr<Logger> logger;
123
124 void connect();
125
126 bool isValidCb(const CommandCb& commandCb);
127
128 void removeCb(const CommandCb& commandCb);
129
130 void callCommandCbWithError(const CommandCb& commandCb, const std::error_code& error);
131
132 void dispatchAsync(const CommandCb& commandCb, const Contents& contents, bool checkConnectionState);
133
134 void verifyConnectionReply(const std::error_code& error, const redis::Reply& reply);
135 };
136 }
137}
138
139#endif