blob: 27359f087421045a9415fd62985a31c95293f755 [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
Timo Tietavainena0745d22019-11-28 09:55:22 +020017/*
18 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 * platform project (RICP).
20*/
21
Rolf Badorekef2bf512019-08-20 11:17:15 +030022#include <type_traits>
23#include <memory>
24#include <cstring>
25#include <sys/epoll.h>
26#include <sys/timerfd.h>
27#include <sys/eventfd.h>
28#include <arpa/inet.h>
29#include <gtest/gtest.h>
30#include <async.h>
31#include "private/createlogger.hpp"
32#include "private/error.hpp"
33#include "private/logger.hpp"
34#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
35#include "private/redis/redisgeneral.hpp"
36#include "private/redis/reply.hpp"
37#include "private/redis/contents.hpp"
38#include "private/tst/hiredisclustersystemmock.hpp"
39#include "private/timer.hpp"
40#include "private/tst/contentsbuildermock.hpp"
41#include "private/tst/enginemock.hpp"
42#include "private/tst/hiredisclusterepolladaptermock.hpp"
43#include "private/tst/redisreplybuilder.hpp"
44
45using namespace shareddatalayer;
46using namespace shareddatalayer::redis;
47using namespace shareddatalayer::tst;
48using namespace testing;
49
50namespace
51{
52 class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test
53 {
54 public:
55 std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
56 StrictMock<EngineMock> engineMock;
57 HiredisClusterSystemMock hiredisClusterSystemMock;
58 std::shared_ptr<HiredisClusterEpollAdapterMock> adapterMock;
59 redisClusterAsyncContext acc;
60 redisAsyncContext ac;
61 int hiredisFd;
62 std::unique_ptr<AsyncHiredisClusterCommandDispatcher> dispatcher;
63 void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
64 void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
65 Timer::Callback savedConnectionRetryTimerCallback;
66 Timer::Duration expectedRetryTimerDuration;
67 Contents contents;
68 Contents clusterConnectionSetupContents;
69 RedisReplyBuilder redisReplyBuilder;
70 const AsyncConnection::Namespace defaultNamespace;
71 std::shared_ptr<Logger> logger;
72
73 AsyncHiredisClusterCommandDispatcherBaseTest():
74 contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
75 adapterMock(std::make_shared<HiredisClusterEpollAdapterMock>(engineMock, hiredisClusterSystemMock)),
76 acc { },
77 ac { },
78 hiredisFd(3),
79 connected(nullptr),
80 disconnected(nullptr),
81 expectedRetryTimerDuration(std::chrono::seconds(1)),
82 contents { { "CMD", "key1", "value1", "key2", "value2" },
83 { 3, 4, 6, 4, 6 } },
84 redisReplyBuilder { },
85 defaultNamespace("namespace"),
86 logger(createLogger(SDL_LOG_PREFIX))
87 {
88 }
89
90 virtual ~AsyncHiredisClusterCommandDispatcherBaseTest()
91 {
92 }
93
94 MOCK_METHOD0(connectAck, void());
95
96 MOCK_METHOD0(disconnectCallback, void());
97
98 MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
99
100 void expectationsUntilConnect()
101 {
102 expectationsUntilConnect(acc);
103 }
104
105 void expectationsUntilConnect(redisClusterAsyncContext& acc)
106 {
107 expectRedisClusterAsyncConnect(acc);
108 }
109
110 void expectRedisClusterAsyncConnect()
111 {
112 expectRedisClusterAsyncConnect(acc);
113 }
114
115 void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc)
116 {
117 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
118 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
119 .Times(1)
120 .WillOnce(InvokeWithoutArgs([this, &acc]()
121 {
122 return &acc;
123 }));
124 }
125
126 void expectRedisClusterAsyncConnectReturnNullptr()
127 {
128 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
129 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
130 .Times(1)
131 .WillOnce(InvokeWithoutArgs([this]()
132 {
133 return nullptr;
134 }));
135 }
136
137 void expectRedisClusterAsyncSetConnectCallback()
138 {
139 expectRedisClusterAsyncSetConnectCallback(acc);
140 }
141
142 void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc)
143 {
144 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _))
145 .Times(1)
146 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb)
147 {
148 connected = cb;
149 return REDIS_OK;
150 }));
151 }
152
153 void expectRedisClusterAsyncSetDisconnectCallback()
154 {
155 expectRedisClusterAsyncSetDisconnectCallback(acc);
156 }
157
158 void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc)
159 {
160 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _))
161 .Times(1)
162 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb)
163 {
164 disconnected = cb;
165 return REDIS_OK;
166 }));
167 }
168
169 void expectCommandListQuery()
170 {
171 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
172 }
173
174 void expectCommandListQueryReturnError()
175 {
176 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
177 }
178
179 void expectAdapterSetup()
180 {
181 expectAdapterSetup(acc);
182 }
183
184 void expectAdapterSetup(redisClusterAsyncContext& acc)
185 {
186 EXPECT_CALL(*adapterMock, setup(&acc))
187 .Times(1);
188 }
189
190 void expectAdapterDetach()
191 {
192 EXPECT_CALL(*adapterMock, detach(&ac))
193 .Times(1);
194 }
195
196 void expectConnectAck()
197 {
198 EXPECT_CALL(*this, connectAck())
199 .Times(1);
200 }
201
202 void expectDisconnectCallback()
203 {
204 EXPECT_CALL(*this, disconnectCallback())
205 .Times(1);
206 }
207
208 void expectRedisClusterAsyncFree()
209 {
210 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
211 .Times(1);
212 }
213
214 void expectRedisClusterAsyncDisconnect()
215 {
216 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc))
217 .Times(1);
218 }
219
220 void verifyAckErrorReply(const Reply& reply)
221 {
222 EXPECT_EQ(Reply::Type::NIL, reply.getType());
223 EXPECT_EQ(0, reply.getInteger());
224 EXPECT_TRUE(reply.getString()->str.empty());
225 EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
226 EXPECT_TRUE(reply.getArray()->empty());
227 }
228
229 void expectAckError()
230 {
231 EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
232 .Times(1)
233 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
234 {
235 verifyAckErrorReply(reply);
236 }));
237 }
238
239 void expectAckError(const std::error_code& ec)
240 {
241 EXPECT_CALL(*this, ack(ec, _))
242 .Times(1)
243 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
244 {
245 verifyAckErrorReply(reply);
246 }));
247 }
248
249 void expectArmConnectionRetryTimer()
250 {
251 EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _))
252 .Times(1)
253 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
254 }
255
256 void expectDisarmConnectionRetryTimer()
257 {
258 EXPECT_CALL(engineMock, disarmTimer(_))
259 .Times(1);
260 }
261
262 void expectRedisClusterAsyncCommandArgv(redisReply& rr)
263 {
264 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
265 .Times(1)
266 .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
267 int, const char**, const size_t*)
268 {
269 cb(acc, &rr, pd);
270 return REDIS_OK;
271 }));
272 }
273
274 void expectAck()
275 {
276 EXPECT_CALL(*this, ack(std::error_code(), _))
277 .Times(1);
278 }
279
280 void expectReplyError(const std::string& msg)
281 {
282 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
283 .Times(1)
284 .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*,
285 int, int, const char**, const size_t*)
286 {
287 cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd);
288 return REDIS_OK;
289 }));
290 }
291
292 void expectContextError(int code)
293 {
294 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
295 .Times(1)
296 .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
297 int, const char**, const size_t*)
298 {
299 acc->err = code;
300 cb(acc, nullptr, pd);
301 return REDIS_OK;
302 }));
303 }
304
305 void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd)
306 {
307 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
308 .Times(1)
309 .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc)
310 {
311 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
312 }));
313 }
314
315 void expectAckNotCalled()
316 {
317 EXPECT_CALL(*this, ack(_,_))
318 .Times(0);
319 }
320
321 void expectionsForSuccessfullConnectionSetup()
322 {
323 expectationsUntilConnect();
324 expectAdapterSetup();
325 expectRedisClusterAsyncSetConnectCallback();
326 expectRedisClusterAsyncSetDisconnectCallback();
327 expectCommandListQuery();
328 }
329
330 void callConnectionRetryTimerCallback()
331 {
332 ASSERT_NE(savedConnectionRetryTimerCallback, nullptr);
333 savedConnectionRetryTimerCallback();
334 }
335 };
336
337 class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
338 {
339 public:
340 AsyncHiredisClusterCommandDispatcherDisconnectedTest()
341 {
342 InSequence dummy;
343 expectationsUntilConnect();
344 expectAdapterSetup();
345 expectRedisClusterAsyncSetConnectCallback();
346 expectRedisClusterAsyncSetDisconnectCallback();
347 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
348 .Times(1);
349 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
350 defaultNamespace,
351 { { "addr1", 111 }, { "addr2", 222 } },
352 contentsBuilderMock,
353 false,
354 hiredisClusterSystemMock,
355 adapterMock,
356 logger));
357 }
358
359 ~AsyncHiredisClusterCommandDispatcherDisconnectedTest()
360 {
361 }
362 };
363
364 class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest
365 {
366 public:
367 AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
368 {
369 InSequence dummy;
370 expectionsForSuccessfullConnectionSetup();
371 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
372 defaultNamespace,
373 { { "addr1", 111 }, { "addr2", 222 } },
374 contentsBuilderMock,
375 true,
376 hiredisClusterSystemMock,
377 adapterMock,
378 logger));
379 }
380
381 ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
382 {
383 expectRedisClusterAsyncFree();
384 }
385 };
386
387 class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
388 {
389 public:
390 redisClusterCallbackFn* savedCb;
391 void* savedPd;
392
393 AsyncHiredisClusterCommandDispatcherConnectedTest():
394 savedCb(nullptr),
395 savedPd(nullptr)
396 {
397 InSequence dummy;
398 expectionsForSuccessfullConnectionSetup();
399 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
400 defaultNamespace,
401 { { "addr1", 111 }, { "addr2", 222 } },
402 contentsBuilderMock,
403 false,
404 hiredisClusterSystemMock,
405 adapterMock,
406 logger));
407 connected(&acc, &ac, 0);
408 }
409
410 ~AsyncHiredisClusterCommandDispatcherConnectedTest()
411 {
412 expectRedisClusterAsyncFree();
413 }
414
415 void expectRedisClusterAsyncCommandArgvWithKey_SaveCb()
416 {
417 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
418 .Times(1)
419 .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
420 const char*, int, int, const char**, const size_t*)
421 {
422 savedCb = cb;
423 savedPd = pd;
424 return REDIS_OK;
425 }));
426 }
427 };
428
429 using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest;
430}
431
432TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable)
433{
434 EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisClusterCommandDispatcher>::value);
435 EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisClusterCommandDispatcher>::value);
436}
437
438TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
439{
440 EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisClusterCommandDispatcher>::value));
441}
442
443TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
444{
445 Engine::Callback storedCallback;
446 EXPECT_CALL(engineMock, postCallback(_))
447 .Times(1)
448 .WillOnce(SaveArg<0>(&storedCallback));
449 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack,
450 this,
451 std::placeholders::_1,
452 std::placeholders::_2),
453 defaultNamespace,
454 { });
455 expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED);
456 storedCallback();
457}
458
459TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer)
460{
461 InSequence dummy;
462 acc.err = 123;
463 expectationsUntilConnect();
464 expectArmConnectionRetryTimer();
465 expectDisarmConnectionRetryTimer();
466
467 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
468 defaultNamespace,
469 { { "addr1", 111 }, { "addr2", 222 } },
470 contentsBuilderMock,
471 false,
472 hiredisClusterSystemMock,
473 adapterMock,
474 logger));
475}
476
477TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
478{
479 InSequence dummy;
480 expectRedisClusterAsyncConnectReturnNullptr();
481 expectArmConnectionRetryTimer();
482 expectDisarmConnectionRetryTimer();
483
484 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
485 defaultNamespace,
486 { { "addr1", 111 }, { "addr2", 222 } },
487 contentsBuilderMock,
488 false,
489 hiredisClusterSystemMock,
490 adapterMock,
491 logger));
492}
493
494TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer)
495{
496 InSequence dummy;
497 Engine::Callback storedCallback;
498 expectationsUntilConnect();
499 expectAdapterSetup();
500 expectRedisClusterAsyncSetConnectCallback();
501 expectRedisClusterAsyncSetDisconnectCallback();
502 expectCommandListQueryReturnError();
503 expectArmConnectionRetryTimer();
504
505 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
506 defaultNamespace,
507 { { "addr1", 111 }, { "addr2", 222 } },
508 contentsBuilderMock,
509 false,
510 hiredisClusterSystemMock,
511 adapterMock,
512 logger));
513
514 expectDisarmConnectionRetryTimer();
515}
516
517TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer)
518{
519 InSequence dummy;
520 expectRedisClusterAsyncConnectReturnNullptr();
521 expectArmConnectionRetryTimer();
522
523 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
524 defaultNamespace,
525 { { "addr1", 111 }, { "addr2", 222 } },
526 contentsBuilderMock,
527 false,
528 hiredisClusterSystemMock,
529 adapterMock,
530 logger));
531
532 expectionsForSuccessfullConnectionSetup();
533 expectRedisClusterAsyncFree();
534
535 callConnectionRetryTimerCallback();
536}
537
538TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected)
539{
540 Engine::Callback storedCallback;
541 EXPECT_CALL(engineMock, postCallback(_))
542 .Times(1)
543 .WillOnce(SaveArg<0>(&storedCallback));
544 dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck,
545 this));
546 expectConnectAck();
547 storedCallback();
548}
549
550TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands)
551{
552 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
553 .Times(1)
554 .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key,
555 int keylen, int argc, const char** argv, const size_t* argvlen)
556 {
557 EXPECT_STREQ(defaultNamespace.c_str(), key);
558 EXPECT_EQ(9, keylen);
559 EXPECT_EQ((int)contents.stack.size(), argc);
560 EXPECT_EQ(contents.sizes[0], argvlen[0]);
561 EXPECT_EQ(contents.sizes[1], argvlen[1]);
562 EXPECT_EQ(contents.sizes[2], argvlen[2]);
563 EXPECT_EQ(contents.sizes[3], argvlen[3]);
564 EXPECT_EQ(contents.sizes[4], argvlen[4]);
565 EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
566 EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
567 EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
568 EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
569 EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
570 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
571 return REDIS_OK;
572 }));
573 expectAck();
574 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
575 this,
576 std::placeholders::_1,
577 std::placeholders::_2),
578 defaultNamespace,
579 contents);
580}
581
582TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply)
583{
584 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply());
585 EXPECT_CALL(*this, ack(std::error_code(), _))
586 .Times(1)
587 .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
588 {
589 EXPECT_EQ(Reply::Type::NIL, reply.getType());
590 EXPECT_EQ(0, reply.getInteger());
591 EXPECT_TRUE(reply.getString()->str.empty());
592 EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
593 EXPECT_TRUE(reply.getArray()->empty());
594 }));
595 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
596 this,
597 std::placeholders::_1,
598 std::placeholders::_2),
599 defaultNamespace,
600 contents);
601}
602
603TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply)
604{
605 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
606 EXPECT_CALL(*this, ack(std::error_code(), _))
607 .Times(1)
608 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
609 {
610 auto expected(redisReplyBuilder.buildIntegerReply());
611 EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
612 EXPECT_EQ(expected.integer, reply.getInteger());
613 }));
614 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
615 this,
616 std::placeholders::_1,
617 std::placeholders::_2),
618 defaultNamespace,
619 contents);
620}
621
622TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply)
623{
624 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
625 EXPECT_CALL(*this, ack(std::error_code(), _))
626 .Times(1)
627 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
628 {
629 auto expected(redisReplyBuilder.buildStatusReply());
630 EXPECT_EQ(Reply::Type::STATUS, reply.getType());
631 EXPECT_EQ(expected.len, reply.getString()->len);
632 EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
633 }));
634 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
635 this,
636 std::placeholders::_1,
637 std::placeholders::_2),
638 defaultNamespace,
639 contents);
640}
641
642TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply)
643{
644 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply());
645 EXPECT_CALL(*this, ack(std::error_code(), _))
646 .Times(1)
647 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
648 {
649 auto expected(redisReplyBuilder.buildStringReply());
650 EXPECT_EQ(Reply::Type::STRING, reply.getType());
651 EXPECT_EQ(expected.len, reply.getString()->len);
652 EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
653 }));
654 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
655 this,
656 std::placeholders::_1,
657 std::placeholders::_2),
658 defaultNamespace,
659 contents);
660}
661
662TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply)
663{
664 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
665 EXPECT_CALL(*this, ack(std::error_code(), _))
666 .Times(1)
667 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
668 {
669 auto array(reply.getArray());
670 EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
671 EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
672 EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
673 }));
674 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
675 this,
676 std::placeholders::_1,
677 std::placeholders::_2),
678 defaultNamespace,
679 contents);
680}
681
682TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
683{
684 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
685 .Times(1)
686 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int,
687 const char**, const size_t*)
688 {
689 acc->err = REDIS_ERR;
690 return REDIS_ERR;
691 }));
692 Engine::Callback storedCallback;
693 EXPECT_CALL(engineMock, postCallback(_))
694 .Times(1)
695 .WillOnce(SaveArg<0>(&storedCallback));
696 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
697 this,
698 std::placeholders::_1,
699 std::placeholders::_2),
700 defaultNamespace,
701 contents);
702 expectAckError();
703 storedCallback();
704}
705
706TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
707{
708 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
709 .Times(1)
710 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
711 const char**, const size_t*)
712 {
713 cb(acc, nullptr, pd);
714 return REDIS_OK;
715 }));
716 expectAckError();
717 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
718 this,
719 std::placeholders::_1,
720 std::placeholders::_2),
721 defaultNamespace,
722 contents);
723}
724
725TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
726{
727 expectReplyError("LOADING Redis is loading the dataset in memory");
728 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
729 .Times(1);
730 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
731 this,
732 std::placeholders::_1,
733 std::placeholders::_2),
734 defaultNamespace,
735 contents);
736}
737
738TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply)
739{
740 //SDL checks only that reply starts with CLUSTERDOWN string
741 expectReplyError("CLUSTERDOWN");
742 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
743 .Times(1);
744 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
745 this,
746 std::placeholders::_1,
747 std::placeholders::_2),
748 defaultNamespace,
749 contents);
750
751 expectReplyError("CLUSTERDOWN The cluster is down");
752 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
753 .Times(1);
754 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
755 this,
756 std::placeholders::_1,
757 std::placeholders::_2),
758 defaultNamespace,
759 contents);
760
761 expectReplyError("CLUSTERDOW");
762 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
763 .Times(1);
764 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
765 this,
766 std::placeholders::_1,
767 std::placeholders::_2),
768 defaultNamespace,
769 contents);
770}
771
772TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
773{
774 expectReplyError("ERR Protocol error: invalid bulk length");
775 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
776 .Times(1);
777 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
778 this,
779 std::placeholders::_1,
780 std::placeholders::_2),
781 defaultNamespace,
782 contents);
783}
784
785TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
786{
787 expectReplyError("something sinister");
788 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
789 .Times(1);
790 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
791 this,
792 std::placeholders::_1,
793 std::placeholders::_2),
794 defaultNamespace,
795 contents);
796}
797
798TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError)
799{
800 expectReplyError("");
801 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
802 .Times(1);
803 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
804 this,
805 std::placeholders::_1,
806 std::placeholders::_2),
807 defaultNamespace,
808 contents);
809}
810
811TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext)
812{
813 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
814 .Times(1)
815 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
816 const char**, const size_t*)
817 {
818 acc->err = REDIS_ERR_IO;
819 errno = EINVAL;
820 cb(acc, nullptr, pd);
821 return REDIS_OK;
822 }));
823 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
824 .Times(1);
825 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
826 this,
827 std::placeholders::_1,
828 std::placeholders::_2),
829 defaultNamespace,
830 contents);
831}
832
833TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext)
834{
835 expectContextError(REDIS_ERR_EOF);
836 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
837 .Times(1);
838 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
839 this,
840 std::placeholders::_1,
841 std::placeholders::_2),
842 defaultNamespace,
843 contents);
844}
845
846TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext)
847{
848 expectContextError(REDIS_ERR_PROTOCOL);
849 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
850 .Times(1);
851 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
852 this,
853 std::placeholders::_1,
854 std::placeholders::_2),
855 defaultNamespace,
856 contents);
857}
858
859TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext)
860{
861 expectContextError(REDIS_ERR_OOM);
862 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
863 .Times(1);
864 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
865 this,
866 std::placeholders::_1,
867 std::placeholders::_2),
868 defaultNamespace,
869 contents);
870}
871
872TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext)
873{
874 expectContextError(CLUSTER_ERROR_NOT_CONNECTED);
875 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
876 .Times(1);
877 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
878 this,
879 std::placeholders::_1,
880 std::placeholders::_2),
881 defaultNamespace,
882 contents);
883}
884
885TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext)
886{
887 expectContextError(CLUSTER_ERROR_CONNECTION_LOST);
888 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
889 .Times(1);
890 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
891 this,
892 std::placeholders::_1,
893 std::placeholders::_2),
894 defaultNamespace,
895 contents);
896}
897
898TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
899{
900 expectContextError(REDIS_ERR_OTHER);
901 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
902 .Times(1);
903 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
904 this,
905 std::placeholders::_1,
906 std::placeholders::_2),
907 defaultNamespace,
908 contents);
909}
910
911TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
912{
913 InSequence dummy;
914 expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
915 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
916 this,
917 std::placeholders::_1,
918 std::placeholders::_2),
919 defaultNamespace,
920 contents);
921 expectAck();
922 savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
923 dispatcher->disableCommandCallbacks();
924 expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
925 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
926 this,
927 std::placeholders::_1,
928 std::placeholders::_2),
929 defaultNamespace,
930 contents);
931 expectAckNotCalled();
932 savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
933}
934
935TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter)
936{
937 InSequence dummy;
938 expectAdapterDetach();
939 disconnected(&acc, &ac, 0);
940}
941
942TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
943{
944 InSequence dummy;
945 dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback,
946 this));
947 expectAdapterDetach();
948 expectDisconnectCallback();
949 disconnected(&acc, &ac, 0);
950}
951
952TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
953{
954 InSequence dummy;
955 redisClusterCallbackFn* savedCb;
956 void* savedPd;
957 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
958 .Times(1)
959 .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
960 const char*, int, int, const char**, const size_t*)
961 {
962 savedCb = cb;
963 savedPd = pd;
964 return REDIS_OK;
965 }));
966 Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
967 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
968 this,
969 std::placeholders::_1,
970 std::placeholders::_2),
971 defaultNamespace,
972 contents);
973 EXPECT_CALL(*this, ack(std::error_code(), _))
974 .Times(3);
975 redisReply rr;
976 rr.type = REDIS_REPLY_NIL;
977 savedCb(&acc, &rr, savedPd);
978 savedCb(&acc, &rr, savedPd);
979 savedCb(&acc, &rr, savedPd);
980}
981
982TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
983{
984 InSequence dummy;
985 redisClusterCallbackFn* savedCb;
986 void* savedPd;
987 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
988 .Times(1)
989 .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd,
990 const char*, int, int, const char**, const size_t*)
991 {
992 savedCb = cb;
993 savedPd = pd;
994 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
995 return REDIS_OK;
996 }));
997 expectAck();
998 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
999 this,
1000 std::placeholders::_1,
1001 std::placeholders::_2),
1002 defaultNamespace,
1003 contents);
1004 EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1005}
1006
1007TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts)
1008{
1009 InSequence dummy;
1010 redisClusterCallbackFn* savedCb;
1011 void* savedPd;
1012 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
1013 .Times(1)
1014 .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
1015 const char*, int, int, const char**, const size_t*)
1016 {
1017 savedCb = cb;
1018 savedPd = pd;
1019 return REDIS_OK;
1020 }));
1021 Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
1022 expectAck();
1023 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
1024 this,
1025 std::placeholders::_1,
1026 std::placeholders::_2),
1027 defaultNamespace,
1028 contents);
1029 savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd);
1030 EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1031}