blob: 05e282735110bb96e91bca1ee4f733d02f61850a [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
Timo Tietavainena0745d22019-11-28 09:55:22 +020017/*
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
20*/
21
Rolf Badorekef2bf512019-08-20 11:17:15 +030022#include "private/redis/hiredisclusterepolladapter.hpp"
23#include <sys/epoll.h>
24#include "private/engine.hpp"
25#include "private/redis/hiredisclustersystem.hpp"
26
27using namespace shareddatalayer;
28using namespace shareddatalayer::redis;
29
30namespace
31{
32 int attachFunction(redisAsyncContext* ac, void* data)
33 {
34 auto instance(static_cast<HiredisClusterEpollAdapter*>(data));
35 instance->attach(ac);
36 return REDIS_OK;
37 }
38
39 void addReadWrap(void* data)
40 {
41 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
42 instance->addRead();
43 }
44
45 void addWriteWrap(void* data)
46 {
47 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
48 instance->addWrite();
49 }
50
51 void delReadWrap(void* data)
52 {
53 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
54 instance->delRead();
55 }
56
57 void delWriteWrap(void* data)
58 {
59 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
60 instance->delWrite();
61 }
62
63 void cleanupWrap(void* data)
64 {
65 auto instance(static_cast<HiredisClusterEpollAdapter::Node*>(data));
66 instance->cleanup();
67 }
68}
69
70HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine):
71 HiredisClusterEpollAdapter(engine, HiredisClusterSystem::getInstance())
72{
73}
74
75HiredisClusterEpollAdapter::HiredisClusterEpollAdapter(Engine& engine, HiredisClusterSystem& hiredisClusterSystem):
76 engine(engine),
77 hiredisClusterSystem(hiredisClusterSystem)
78{
79}
80
81void HiredisClusterEpollAdapter::setup(redisClusterAsyncContext* acc)
82{
83 acc->adapter = this;
84 acc->attach_fn = attachFunction;
85}
86
87void HiredisClusterEpollAdapter::attach(redisAsyncContext* ac)
88{
89 detach(ac);
90 nodes.insert(std::make_pair(ac->c.fd,
91 std::unique_ptr<Node>(new Node(engine,
92 ac,
93 hiredisClusterSystem))));
94}
95
96void HiredisClusterEpollAdapter::detach(const redisAsyncContext* ac)
97{
98 auto it = nodes.find(ac->c.fd);
99 if (it != nodes.end())
100 nodes.erase(it);
101}
102
103HiredisClusterEpollAdapter::Node::Node(Engine& engine,
104 redisAsyncContext* ac,
105 HiredisClusterSystem& hiredisClusterSystem):
106 engine(engine),
107 hiredisClusterSystem(hiredisClusterSystem),
108 ac(ac),
109 eventState(0),
110 reading(false),
111 writing(false)
112{
113 this->ac->ev.data = this;
114 this->ac->ev.addRead = addReadWrap;
115 this->ac->ev.addWrite = addWriteWrap;
116 this->ac->ev.delRead = delReadWrap;
117 this->ac->ev.delWrite = delWriteWrap;
118 this->ac->ev.cleanup = cleanupWrap;
119 engine.addMonitoredFD(ac->c.fd,
120 eventState,
121 std::bind(&HiredisClusterEpollAdapter::Node::eventHandler,
122 this,
123 std::placeholders::_1));
124 isMonitoring = true;
125}
126
127HiredisClusterEpollAdapter::Node::~Node()
128{
129 if (isMonitoring)
130 cleanup();
131}
132
133void HiredisClusterEpollAdapter::Node::eventHandler(unsigned int events)
134{
135 if (events & Engine::EVENT_IN)
136 if (reading && isMonitoring)
137 hiredisClusterSystem.redisAsyncHandleRead(ac);
138 if (events & Engine::EVENT_OUT)
139 if (writing && isMonitoring)
140 hiredisClusterSystem.redisAsyncHandleWrite(ac);
141}
142
143void HiredisClusterEpollAdapter::Node::addRead()
144{
145 if (reading)
146 return;
147 reading = true;
148 eventState |= Engine::EVENT_IN;
149 engine.modifyMonitoredFD(ac->c.fd, eventState);
150}
151
152void HiredisClusterEpollAdapter::Node::addWrite()
153{
154 if (writing)
155 return;
156 writing = true;
157 eventState |= Engine::EVENT_OUT;
158 engine.modifyMonitoredFD(ac->c.fd, eventState);
159}
160
161void HiredisClusterEpollAdapter::Node::delRead()
162{
163 reading = false;
164 eventState &= ~Engine::EVENT_IN;
165 engine.modifyMonitoredFD(ac->c.fd, eventState);
166}
167
168void HiredisClusterEpollAdapter::Node::delWrite()
169{
170 writing = false;
171 eventState &= ~Engine::EVENT_OUT;
172 engine.modifyMonitoredFD(ac->c.fd, eventState);
173}
174
175void HiredisClusterEpollAdapter::Node::cleanup()
176{
177 reading = false;
178 writing = false;
179 eventState = 0;
180 engine.deleteMonitoredFD(ac->c.fd);
181 isMonitoring = false;
182}