blob: 10d3a57f8c24a290c7393e2d8a9d88c6504d79e0 [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "private/redis/hiredisclusterepolladapter.hpp"
18#include <sys/epoll.h>
19#include "private/engine.hpp"
20#include "private/redis/hiredisclustersystem.hpp"
21
22using namespace shareddatalayer;
23using namespace shareddatalayer::redis;
24
25namespace
26{
27 int attachFunction(redisAsyncContext* ac, void* data)
28 {
29 auto instance(static_cast<HiredisClusterEpollAdapter*>(data));
30 instance->attach(ac);
31 return REDIS_OK;
32 }
33
34 void addReadWrap(void* data)
35 {
36 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
37 instance->addRead();
38 }
39
40 void addWriteWrap(void* data)
41 {
42 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
43 instance->addWrite();
44 }
45
46 void delReadWrap(void* data)
47 {
48 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
49 instance->delRead();
50 }
51
52 void delWriteWrap(void* data)
53 {
54 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
55 instance->delWrite();
56 }
57
58 void cleanupWrap(void* data)
59 {
60 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
61 instance->cleanup();
62 }
63}
64
65HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine):
66 HiredisClusterEpollAdapter(engine, HiredisClusterSystem::getInstance())
67{
68}
69
70HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine, HiredisClusterSystem& hiredisClusterSystem):
71 engine(engine),
72 hiredisClusterSystem(hiredisClusterSystem)
73{
74}
75
76void HiredisClusterEpollAdapter::setup(redisClusterAsyncContext* acc)
77{
78 acc->adapter = this;
79 acc->attach_fn = attachFunction;
80}
81
82void HiredisClusterEpollAdapter::attach(redisAsyncContext* ac)
83{
84 detach(ac);
85 nodes.insert(std::make_pair(ac->c.fd,
86 std::unique_ptr<Node>(new Node(engine,
87 ac,
88 hiredisClusterSystem))));
89}
90
91void HiredisClusterEpollAdapter::detach(const redisAsyncContext* ac)
92{
93 auto it = nodes.find(ac->c.fd);
94 if (it != nodes.end())
95 nodes.erase(it);
96}
97
98HiredisClusterEpollAdapter::Node::Node(Engine& engine,
99 redisAsyncContext* ac,
100 HiredisClusterSystem& hiredisClusterSystem):
101 engine(engine),
102 hiredisClusterSystem(hiredisClusterSystem),
103 ac(ac),
104 eventState(0),
105 reading(false),
106 writing(false)
107{
108 this->ac->ev.data = this;
109 this->ac->ev.addRead = addReadWrap;
110 this->ac->ev.addWrite = addWriteWrap;
111 this->ac->ev.delRead = delReadWrap;
112 this->ac->ev.delWrite = delWriteWrap;
113 this->ac->ev.cleanup = cleanupWrap;
114 engine.addMonitoredFD(ac->c.fd,
115 eventState,
116 std::bind(&HiredisClusterEpollAdapter::Node::eventHandler,
117 this,
118 std::placeholders::_1));
119 isMonitoring = true;
120}
121
122HiredisClusterEpollAdapter::Node::~Node()
123{
124 if (isMonitoring)
125 cleanup();
126}
127
128void HiredisClusterEpollAdapter::Node::eventHandler(unsigned int events)
129{
130 if (events & Engine::EVENT_IN)
131 if (reading && isMonitoring)
132 hiredisClusterSystem.redisAsyncHandleRead(ac);
133 if (events & Engine::EVENT_OUT)
134 if (writing && isMonitoring)
135 hiredisClusterSystem.redisAsyncHandleWrite(ac);
136}
137
138void HiredisClusterEpollAdapter::Node::addRead()
139{
140 if (reading)
141 return;
142 reading = true;
143 eventState |= Engine::EVENT_IN;
144 engine.modifyMonitoredFD(ac->c.fd, eventState);
145}
146
147void HiredisClusterEpollAdapter::Node::addWrite()
148{
149 if (writing)
150 return;
151 writing = true;
152 eventState |= Engine::EVENT_OUT;
153 engine.modifyMonitoredFD(ac->c.fd, eventState);
154}
155
156void HiredisClusterEpollAdapter::Node::delRead()
157{
158 reading = false;
159 eventState &= ~Engine::EVENT_IN;
160 engine.modifyMonitoredFD(ac->c.fd, eventState);
161}
162
163void HiredisClusterEpollAdapter::Node::delWrite()
164{
165 writing = false;
166 eventState &= ~Engine::EVENT_OUT;
167 engine.modifyMonitoredFD(ac->c.fd, eventState);
168}
169
170void HiredisClusterEpollAdapter::Node::cleanup()
171{
172 reading = false;
173 writing = false;
174 eventState = 0;
175 engine.deleteMonitoredFD(ac->c.fd);
176 isMonitoring = false;
177}