blob: 14421f85e3b6402759346bc2246622300a6ba3c1 [file] [log] [blame]
Rolf Badorekef2bf512019-08-20 11:17:15 +03001/*
2 Copyright (c) 2018-2019 Nokia.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include <type_traits>
18#include <memory>
19#include <cstring>
20#include <sys/epoll.h>
21#include <sys/timerfd.h>
22#include <sys/eventfd.h>
23#include <arpa/inet.h>
24#include <gtest/gtest.h>
25#include <async.h>
26#include "private/createlogger.hpp"
27#include "private/error.hpp"
28#include "private/logger.hpp"
29#include "private/redis/asynchiredisclustercommanddispatcher.hpp"
30#include "private/redis/redisgeneral.hpp"
31#include "private/redis/reply.hpp"
32#include "private/redis/contents.hpp"
33#include "private/tst/hiredisclustersystemmock.hpp"
34#include "private/timer.hpp"
35#include "private/tst/contentsbuildermock.hpp"
36#include "private/tst/enginemock.hpp"
37#include "private/tst/hiredisclusterepolladaptermock.hpp"
38#include "private/tst/redisreplybuilder.hpp"
39
40using namespace shareddatalayer;
41using namespace shareddatalayer::redis;
42using namespace shareddatalayer::tst;
43using namespace testing;
44
45namespace
46{
47 class AsyncHiredisClusterCommandDispatcherBaseTest: public testing::Test
48 {
49 public:
50 std::shared_ptr<ContentsBuilderMock> contentsBuilderMock;
51 StrictMock<EngineMock> engineMock;
52 HiredisClusterSystemMock hiredisClusterSystemMock;
53 std::shared_ptr<HiredisClusterEpollAdapterMock> adapterMock;
54 redisClusterAsyncContext acc;
55 redisAsyncContext ac;
56 int hiredisFd;
57 std::unique_ptr<AsyncHiredisClusterCommandDispatcher> dispatcher;
58 void (*connected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
59 void (*disconnected)(const redisClusterAsyncContext*, const redisAsyncContext*, int);
60 Timer::Callback savedConnectionRetryTimerCallback;
61 Timer::Duration expectedRetryTimerDuration;
62 Contents contents;
63 Contents clusterConnectionSetupContents;
64 RedisReplyBuilder redisReplyBuilder;
65 const AsyncConnection::Namespace defaultNamespace;
66 std::shared_ptr<Logger> logger;
67
68 AsyncHiredisClusterCommandDispatcherBaseTest():
69 contentsBuilderMock(std::make_shared<ContentsBuilderMock>(AsyncConnection::SEPARATOR)),
70 adapterMock(std::make_shared<HiredisClusterEpollAdapterMock>(engineMock, hiredisClusterSystemMock)),
71 acc { },
72 ac { },
73 hiredisFd(3),
74 connected(nullptr),
75 disconnected(nullptr),
76 expectedRetryTimerDuration(std::chrono::seconds(1)),
77 contents { { "CMD", "key1", "value1", "key2", "value2" },
78 { 3, 4, 6, 4, 6 } },
79 redisReplyBuilder { },
80 defaultNamespace("namespace"),
81 logger(createLogger(SDL_LOG_PREFIX))
82 {
83 }
84
85 virtual ~AsyncHiredisClusterCommandDispatcherBaseTest()
86 {
87 }
88
89 MOCK_METHOD0(connectAck, void());
90
91 MOCK_METHOD0(disconnectCallback, void());
92
93 MOCK_METHOD2(ack, void(const std::error_code&, const Reply&));
94
95 void expectationsUntilConnect()
96 {
97 expectationsUntilConnect(acc);
98 }
99
100 void expectationsUntilConnect(redisClusterAsyncContext& acc)
101 {
102 expectRedisClusterAsyncConnect(acc);
103 }
104
105 void expectRedisClusterAsyncConnect()
106 {
107 expectRedisClusterAsyncConnect(acc);
108 }
109
110 void expectRedisClusterAsyncConnect(redisClusterAsyncContext& acc)
111 {
112 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
113 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
114 .Times(1)
115 .WillOnce(InvokeWithoutArgs([this, &acc]()
116 {
117 return &acc;
118 }));
119 }
120
121 void expectRedisClusterAsyncConnectReturnNullptr()
122 {
123 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncConnect(StrEq("addr1:28416,addr2:56832"),
124 HIRCLUSTER_FLAG_ROUTE_USE_SLOTS))
125 .Times(1)
126 .WillOnce(InvokeWithoutArgs([this]()
127 {
128 return nullptr;
129 }));
130 }
131
132 void expectRedisClusterAsyncSetConnectCallback()
133 {
134 expectRedisClusterAsyncSetConnectCallback(acc);
135 }
136
137 void expectRedisClusterAsyncSetConnectCallback(redisClusterAsyncContext& acc)
138 {
139 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetConnectCallback(&acc, _))
140 .Times(1)
141 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceConnectCallback* cb)
142 {
143 connected = cb;
144 return REDIS_OK;
145 }));
146 }
147
148 void expectRedisClusterAsyncSetDisconnectCallback()
149 {
150 expectRedisClusterAsyncSetDisconnectCallback(acc);
151 }
152
153 void expectRedisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext& acc)
154 {
155 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncSetDisconnectCallback(&acc, _))
156 .Times(1)
157 .WillOnce(Invoke([this](const redisClusterAsyncContext*, redisClusterInstanceDisconnectCallback* cb)
158 {
159 disconnected = cb;
160 return REDIS_OK;
161 }));
162 }
163
164 void expectCommandListQuery()
165 {
166 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildCommandListQueryReply());
167 }
168
169 void expectCommandListQueryReturnError()
170 {
171 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildErrorReply("SomeErrorForCommandListQuery"));
172 }
173
174 void expectAdapterSetup()
175 {
176 expectAdapterSetup(acc);
177 }
178
179 void expectAdapterSetup(redisClusterAsyncContext& acc)
180 {
181 EXPECT_CALL(*adapterMock, setup(&acc))
182 .Times(1);
183 }
184
185 void expectAdapterDetach()
186 {
187 EXPECT_CALL(*adapterMock, detach(&ac))
188 .Times(1);
189 }
190
191 void expectConnectAck()
192 {
193 EXPECT_CALL(*this, connectAck())
194 .Times(1);
195 }
196
197 void expectDisconnectCallback()
198 {
199 EXPECT_CALL(*this, disconnectCallback())
200 .Times(1);
201 }
202
203 void expectRedisClusterAsyncFree()
204 {
205 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
206 .Times(1);
207 }
208
209 void expectRedisClusterAsyncDisconnect()
210 {
211 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncDisconnect(&acc))
212 .Times(1);
213 }
214
215 void verifyAckErrorReply(const Reply& reply)
216 {
217 EXPECT_EQ(Reply::Type::NIL, reply.getType());
218 EXPECT_EQ(0, reply.getInteger());
219 EXPECT_TRUE(reply.getString()->str.empty());
220 EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
221 EXPECT_TRUE(reply.getArray()->empty());
222 }
223
224 void expectAckError()
225 {
226 EXPECT_CALL(*this, ack(Ne(std::error_code()), _))
227 .Times(1)
228 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
229 {
230 verifyAckErrorReply(reply);
231 }));
232 }
233
234 void expectAckError(const std::error_code& ec)
235 {
236 EXPECT_CALL(*this, ack(ec, _))
237 .Times(1)
238 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
239 {
240 verifyAckErrorReply(reply);
241 }));
242 }
243
244 void expectArmConnectionRetryTimer()
245 {
246 EXPECT_CALL(engineMock, armTimer(_, expectedRetryTimerDuration, _))
247 .Times(1)
248 .WillOnce(SaveArg<2>(&savedConnectionRetryTimerCallback));
249 }
250
251 void expectDisarmConnectionRetryTimer()
252 {
253 EXPECT_CALL(engineMock, disarmTimer(_))
254 .Times(1);
255 }
256
257 void expectRedisClusterAsyncCommandArgv(redisReply& rr)
258 {
259 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
260 .Times(1)
261 .WillOnce(Invoke([&rr](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
262 int, const char**, const size_t*)
263 {
264 cb(acc, &rr, pd);
265 return REDIS_OK;
266 }));
267 }
268
269 void expectAck()
270 {
271 EXPECT_CALL(*this, ack(std::error_code(), _))
272 .Times(1);
273 }
274
275 void expectReplyError(const std::string& msg)
276 {
277 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
278 .Times(1)
279 .WillOnce(Invoke([this, msg](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*,
280 int, int, const char**, const size_t*)
281 {
282 cb(acc, &redisReplyBuilder.buildErrorReply(msg), pd);
283 return REDIS_OK;
284 }));
285 }
286
287 void expectContextError(int code)
288 {
289 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
290 .Times(1)
291 .WillOnce(Invoke([code](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int,
292 int, const char**, const size_t*)
293 {
294 acc->err = code;
295 cb(acc, nullptr, pd);
296 return REDIS_OK;
297 }));
298 }
299
300 void expectRedisClusterAsyncFreeCallPendingCallback(redisClusterCallbackFn* cb, void* pd)
301 {
302 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncFree(&acc))
303 .Times(1)
304 .WillOnce(Invoke([this, cb, pd](redisClusterAsyncContext* acc)
305 {
306 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
307 }));
308 }
309
310 void expectAckNotCalled()
311 {
312 EXPECT_CALL(*this, ack(_,_))
313 .Times(0);
314 }
315
316 void expectionsForSuccessfullConnectionSetup()
317 {
318 expectationsUntilConnect();
319 expectAdapterSetup();
320 expectRedisClusterAsyncSetConnectCallback();
321 expectRedisClusterAsyncSetDisconnectCallback();
322 expectCommandListQuery();
323 }
324
325 void callConnectionRetryTimerCallback()
326 {
327 ASSERT_NE(savedConnectionRetryTimerCallback, nullptr);
328 savedConnectionRetryTimerCallback();
329 }
330 };
331
332 class AsyncHiredisClusterCommandDispatcherDisconnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
333 {
334 public:
335 AsyncHiredisClusterCommandDispatcherDisconnectedTest()
336 {
337 InSequence dummy;
338 expectationsUntilConnect();
339 expectAdapterSetup();
340 expectRedisClusterAsyncSetConnectCallback();
341 expectRedisClusterAsyncSetDisconnectCallback();
342 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
343 .Times(1);
344 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
345 defaultNamespace,
346 { { "addr1", 111 }, { "addr2", 222 } },
347 contentsBuilderMock,
348 false,
349 hiredisClusterSystemMock,
350 adapterMock,
351 logger));
352 }
353
354 ~AsyncHiredisClusterCommandDispatcherDisconnectedTest()
355 {
356 }
357 };
358
359 class AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest: public AsyncHiredisClusterCommandDispatcherBaseTest
360 {
361 public:
362 AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
363 {
364 InSequence dummy;
365 expectionsForSuccessfullConnectionSetup();
366 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
367 defaultNamespace,
368 { { "addr1", 111 }, { "addr2", 222 } },
369 contentsBuilderMock,
370 true,
371 hiredisClusterSystemMock,
372 adapterMock,
373 logger));
374 }
375
376 ~AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest()
377 {
378 expectRedisClusterAsyncFree();
379 }
380 };
381
382 class AsyncHiredisClusterCommandDispatcherConnectedTest: public AsyncHiredisClusterCommandDispatcherBaseTest
383 {
384 public:
385 redisClusterCallbackFn* savedCb;
386 void* savedPd;
387
388 AsyncHiredisClusterCommandDispatcherConnectedTest():
389 savedCb(nullptr),
390 savedPd(nullptr)
391 {
392 InSequence dummy;
393 expectionsForSuccessfullConnectionSetup();
394 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
395 defaultNamespace,
396 { { "addr1", 111 }, { "addr2", 222 } },
397 contentsBuilderMock,
398 false,
399 hiredisClusterSystemMock,
400 adapterMock,
401 logger));
402 connected(&acc, &ac, 0);
403 }
404
405 ~AsyncHiredisClusterCommandDispatcherConnectedTest()
406 {
407 expectRedisClusterAsyncFree();
408 }
409
410 void expectRedisClusterAsyncCommandArgvWithKey_SaveCb()
411 {
412 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
413 .Times(1)
414 .WillOnce(Invoke([this](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
415 const char*, int, int, const char**, const size_t*)
416 {
417 savedCb = cb;
418 savedPd = pd;
419 return REDIS_OK;
420 }));
421 }
422 };
423
424 using AsyncHiredisClusterCommandDispatcherDeathTest = AsyncHiredisClusterCommandDispatcherConnectedTest;
425}
426
427TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, IsNotCopyable)
428{
429 EXPECT_FALSE(std::is_copy_constructible<AsyncHiredisClusterCommandDispatcher>::value);
430 EXPECT_FALSE(std::is_copy_assignable<AsyncHiredisClusterCommandDispatcher>::value);
431}
432
433TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, ImplementsAsyncRedisCommandDispatcher)
434{
435 EXPECT_TRUE((std::is_base_of<AsyncCommandDispatcher, AsyncHiredisClusterCommandDispatcher>::value));
436}
437
438TEST_F(AsyncHiredisClusterCommandDispatcherDisconnectedTest, CannotDispatchCommandsIfDisconnected)
439{
440 Engine::Callback storedCallback;
441 EXPECT_CALL(engineMock, postCallback(_))
442 .Times(1)
443 .WillOnce(SaveArg<0>(&storedCallback));
444 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::ack,
445 this,
446 std::placeholders::_1,
447 std::placeholders::_2),
448 defaultNamespace,
449 { });
450 expectAckError(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED);
451 storedCallback();
452}
453
454TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ContextErrorInConnectArmsRetryTimer)
455{
456 InSequence dummy;
457 acc.err = 123;
458 expectationsUntilConnect();
459 expectArmConnectionRetryTimer();
460 expectDisarmConnectionRetryTimer();
461
462 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
463 defaultNamespace,
464 { { "addr1", 111 }, { "addr2", 222 } },
465 contentsBuilderMock,
466 false,
467 hiredisClusterSystemMock,
468 adapterMock,
469 logger));
470}
471
472TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, NullRedisContextInConnectArmsRetryTimer)
473{
474 InSequence dummy;
475 expectRedisClusterAsyncConnectReturnNullptr();
476 expectArmConnectionRetryTimer();
477 expectDisarmConnectionRetryTimer();
478
479 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
480 defaultNamespace,
481 { { "addr1", 111 }, { "addr2", 222 } },
482 contentsBuilderMock,
483 false,
484 hiredisClusterSystemMock,
485 adapterMock,
486 logger));
487}
488
489TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, FailedCommandListQueryArmsRetryTimer)
490{
491 InSequence dummy;
492 Engine::Callback storedCallback;
493 expectationsUntilConnect();
494 expectAdapterSetup();
495 expectRedisClusterAsyncSetConnectCallback();
496 expectRedisClusterAsyncSetDisconnectCallback();
497 expectCommandListQueryReturnError();
498 expectArmConnectionRetryTimer();
499
500 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
501 defaultNamespace,
502 { { "addr1", 111 }, { "addr2", 222 } },
503 contentsBuilderMock,
504 false,
505 hiredisClusterSystemMock,
506 adapterMock,
507 logger));
508
509 expectDisarmConnectionRetryTimer();
510}
511
512TEST_F(AsyncHiredisClusterCommandDispatcherBaseTest, ConnectionSucceedsWithRetryTimer)
513{
514 InSequence dummy;
515 expectRedisClusterAsyncConnectReturnNullptr();
516 expectArmConnectionRetryTimer();
517
518 dispatcher.reset(new AsyncHiredisClusterCommandDispatcher(engineMock,
519 defaultNamespace,
520 { { "addr1", 111 }, { "addr2", 222 } },
521 contentsBuilderMock,
522 false,
523 hiredisClusterSystemMock,
524 adapterMock,
525 logger));
526
527 expectionsForSuccessfullConnectionSetup();
528 expectRedisClusterAsyncFree();
529
530 callConnectionRetryTimerCallback();
531}
532
533TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ConnectAckCalledIfConnected)
534{
535 Engine::Callback storedCallback;
536 EXPECT_CALL(engineMock, postCallback(_))
537 .Times(1)
538 .WillOnce(SaveArg<0>(&storedCallback));
539 dispatcher->waitConnectedAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDisconnectedTest::connectAck,
540 this));
541 expectConnectAck();
542 storedCallback();
543}
544
545TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanDispatchCommands)
546{
547 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
548 .Times(1)
549 .WillOnce(Invoke([this](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char *key,
550 int keylen, int argc, const char** argv, const size_t* argvlen)
551 {
552 EXPECT_STREQ(defaultNamespace.c_str(), key);
553 EXPECT_EQ(9, keylen);
554 EXPECT_EQ((int)contents.stack.size(), argc);
555 EXPECT_EQ(contents.sizes[0], argvlen[0]);
556 EXPECT_EQ(contents.sizes[1], argvlen[1]);
557 EXPECT_EQ(contents.sizes[2], argvlen[2]);
558 EXPECT_EQ(contents.sizes[3], argvlen[3]);
559 EXPECT_EQ(contents.sizes[4], argvlen[4]);
560 EXPECT_FALSE(std::memcmp(argv[0], contents.stack[0].c_str(), contents.sizes[0]));
561 EXPECT_FALSE(std::memcmp(argv[1], contents.stack[1].c_str(), contents.sizes[1]));
562 EXPECT_FALSE(std::memcmp(argv[2], contents.stack[2].c_str(), contents.sizes[2]));
563 EXPECT_FALSE(std::memcmp(argv[3], contents.stack[3].c_str(), contents.sizes[3]));
564 EXPECT_FALSE(std::memcmp(argv[4], contents.stack[4].c_str(), contents.sizes[4]));
565 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
566 return REDIS_OK;
567 }));
568 expectAck();
569 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
570 this,
571 std::placeholders::_1,
572 std::placeholders::_2),
573 defaultNamespace,
574 contents);
575}
576
577TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseNilReply)
578{
579 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildNilReply());
580 EXPECT_CALL(*this, ack(std::error_code(), _))
581 .Times(1)
582 .WillOnce(Invoke([](const std::error_code&, const Reply& reply)
583 {
584 EXPECT_EQ(Reply::Type::NIL, reply.getType());
585 EXPECT_EQ(0, reply.getInteger());
586 EXPECT_TRUE(reply.getString()->str.empty());
587 EXPECT_EQ(static_cast<ReplyStringLength>(0), reply.getString()->len);
588 EXPECT_TRUE(reply.getArray()->empty());
589 }));
590 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
591 this,
592 std::placeholders::_1,
593 std::placeholders::_2),
594 defaultNamespace,
595 contents);
596}
597
598TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseIntegerReply)
599{
600 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildIntegerReply());
601 EXPECT_CALL(*this, ack(std::error_code(), _))
602 .Times(1)
603 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
604 {
605 auto expected(redisReplyBuilder.buildIntegerReply());
606 EXPECT_EQ(Reply::Type::INTEGER, reply.getType());
607 EXPECT_EQ(expected.integer, reply.getInteger());
608 }));
609 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
610 this,
611 std::placeholders::_1,
612 std::placeholders::_2),
613 defaultNamespace,
614 contents);
615}
616
617TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStatusReply)
618{
619 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStatusReply());
620 EXPECT_CALL(*this, ack(std::error_code(), _))
621 .Times(1)
622 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
623 {
624 auto expected(redisReplyBuilder.buildStatusReply());
625 EXPECT_EQ(Reply::Type::STATUS, reply.getType());
626 EXPECT_EQ(expected.len, reply.getString()->len);
627 EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
628 }));
629 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
630 this,
631 std::placeholders::_1,
632 std::placeholders::_2),
633 defaultNamespace,
634 contents);
635}
636
637TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseStringReply)
638{
639 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildStringReply());
640 EXPECT_CALL(*this, ack(std::error_code(), _))
641 .Times(1)
642 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
643 {
644 auto expected(redisReplyBuilder.buildStringReply());
645 EXPECT_EQ(Reply::Type::STRING, reply.getType());
646 EXPECT_EQ(expected.len, reply.getString()->len);
647 EXPECT_FALSE(std::memcmp(reply.getString()->str.c_str(), expected.str, expected.len));
648 }));
649 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
650 this,
651 std::placeholders::_1,
652 std::placeholders::_2),
653 defaultNamespace,
654 contents);
655}
656
657TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanParseArrayReply)
658{
659 expectRedisClusterAsyncCommandArgv(redisReplyBuilder.buildArrayReply());
660 EXPECT_CALL(*this, ack(std::error_code(), _))
661 .Times(1)
662 .WillOnce(Invoke([this](const std::error_code&, const Reply& reply)
663 {
664 auto array(reply.getArray());
665 EXPECT_EQ(Reply::Type::ARRAY, reply.getType());
666 EXPECT_EQ(Reply::Type::STRING, (*array)[0]->getType());
667 EXPECT_EQ(Reply::Type::NIL, (*array)[1]->getType());
668 }));
669 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
670 this,
671 std::placeholders::_1,
672 std::placeholders::_2),
673 defaultNamespace,
674 contents);
675}
676
677TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisBufferErrors)
678{
679 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
680 .Times(1)
681 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn*, void*, const char*, int, int,
682 const char**, const size_t*)
683 {
684 acc->err = REDIS_ERR;
685 return REDIS_ERR;
686 }));
687 Engine::Callback storedCallback;
688 EXPECT_CALL(engineMock, postCallback(_))
689 .Times(1)
690 .WillOnce(SaveArg<0>(&storedCallback));
691 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
692 this,
693 std::placeholders::_1,
694 std::placeholders::_2),
695 defaultNamespace,
696 contents);
697 expectAckError();
698 storedCallback();
699}
700
701TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, CanHandleDispatchHiredisCbErrors)
702{
703 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
704 .Times(1)
705 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
706 const char**, const size_t*)
707 {
708 cb(acc, nullptr, pd);
709 return REDIS_OK;
710 }));
711 expectAckError();
712 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
713 this,
714 std::placeholders::_1,
715 std::placeholders::_2),
716 defaultNamespace,
717 contents);
718}
719
720TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DatasetStillBeingLoadedInMemoryIsRecognizedFromReply)
721{
722 expectReplyError("LOADING Redis is loading the dataset in memory");
723 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::DATASET_LOADING), _))
724 .Times(1);
725 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
726 this,
727 std::placeholders::_1,
728 std::placeholders::_2),
729 defaultNamespace,
730 contents);
731}
732
733TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterDownIsRecognizedFromReply)
734{
735 //SDL checks only that reply starts with CLUSTERDOWN string
736 expectReplyError("CLUSTERDOWN");
737 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
738 .Times(1);
739 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
740 this,
741 std::placeholders::_1,
742 std::placeholders::_2),
743 defaultNamespace,
744 contents);
745
746 expectReplyError("CLUSTERDOWN The cluster is down");
747 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
748 .Times(1);
749 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
750 this,
751 std::placeholders::_1,
752 std::placeholders::_2),
753 defaultNamespace,
754 contents);
755
756 expectReplyError("CLUSTERDOW");
757 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
758 .Times(1);
759 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
760 this,
761 std::placeholders::_1,
762 std::placeholders::_2),
763 defaultNamespace,
764 contents);
765}
766
767TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorIsRecognizedFromReply)
768{
769 expectReplyError("ERR Protocol error: invalid bulk length");
770 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
771 .Times(1);
772 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
773 this,
774 std::placeholders::_1,
775 std::placeholders::_2),
776 defaultNamespace,
777 contents);
778}
779
780TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedReplyErrorIsConvertedToUnknownError)
781{
782 expectReplyError("something sinister");
783 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
784 .Times(1);
785 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
786 this,
787 std::placeholders::_1,
788 std::placeholders::_2),
789 defaultNamespace,
790 contents);
791}
792
793TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EmptyReplyErrorIsConvertedToUnknownError)
794{
795 expectReplyError("");
796 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
797 .Times(1);
798 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
799 this,
800 std::placeholders::_1,
801 std::placeholders::_2),
802 defaultNamespace,
803 contents);
804}
805
806TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, IOErrorInContext)
807{
808 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
809 .Times(1)
810 .WillOnce(Invoke([](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd, const char*, int, int,
811 const char**, const size_t*)
812 {
813 acc->err = REDIS_ERR_IO;
814 errno = EINVAL;
815 cb(acc, nullptr, pd);
816 return REDIS_OK;
817 }));
818 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::IO_ERROR), _))
819 .Times(1);
820 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
821 this,
822 std::placeholders::_1,
823 std::placeholders::_2),
824 defaultNamespace,
825 contents);
826}
827
828TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, EofErrorInContext)
829{
830 expectContextError(REDIS_ERR_EOF);
831 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
832 .Times(1);
833 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
834 this,
835 std::placeholders::_1,
836 std::placeholders::_2),
837 defaultNamespace,
838 contents);
839}
840
841TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ProtocolErrorInContext)
842{
843 expectContextError(REDIS_ERR_PROTOCOL);
844 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::PROTOCOL_ERROR), _))
845 .Times(1);
846 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
847 this,
848 std::placeholders::_1,
849 std::placeholders::_2),
850 defaultNamespace,
851 contents);
852}
853
854TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, OomErrorInContext)
855{
856 expectContextError(REDIS_ERR_OOM);
857 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::OUT_OF_MEMORY), _))
858 .Times(1);
859 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
860 this,
861 std::placeholders::_1,
862 std::placeholders::_2),
863 defaultNamespace,
864 contents);
865}
866
867TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorNotConnectedInContext)
868{
869 expectContextError(CLUSTER_ERROR_NOT_CONNECTED);
870 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::NOT_CONNECTED), _))
871 .Times(1);
872 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
873 this,
874 std::placeholders::_1,
875 std::placeholders::_2),
876 defaultNamespace,
877 contents);
878}
879
880TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, ClusterErrorConnectionLostInContext)
881{
882 expectContextError(CLUSTER_ERROR_CONNECTION_LOST);
883 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::CONNECTION_LOST), _))
884 .Times(1);
885 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
886 this,
887 std::placeholders::_1,
888 std::placeholders::_2),
889 defaultNamespace,
890 contents);
891}
892
893TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, UnrecognizedContextErrorIsConvertedToUnknownError)
894{
895 expectContextError(REDIS_ERR_OTHER);
896 EXPECT_CALL(*this, ack(std::error_code(AsyncRedisCommandDispatcherErrorCode::UNKNOWN_ERROR), _))
897 .Times(1);
898 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::ack,
899 this,
900 std::placeholders::_1,
901 std::placeholders::_2),
902 defaultNamespace,
903 contents);
904}
905
906TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, PendingClientCallbacksAreNotCalledAfterDisabled)
907{
908 InSequence dummy;
909 expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
910 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
911 this,
912 std::placeholders::_1,
913 std::placeholders::_2),
914 defaultNamespace,
915 contents);
916 expectAck();
917 savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
918 dispatcher->disableCommandCallbacks();
919 expectRedisClusterAsyncCommandArgvWithKey_SaveCb();
920 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
921 this,
922 std::placeholders::_1,
923 std::placeholders::_2),
924 defaultNamespace,
925 contents);
926 expectAckNotCalled();
927 savedCb(&acc, &redisReplyBuilder.buildStringReply(), savedPd);
928}
929
930TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, DisconnectCallbackDetachesContextFromAdapter)
931{
932 InSequence dummy;
933 expectAdapterDetach();
934 disconnected(&acc, &ac, 0);
935}
936
937TEST_F(AsyncHiredisClusterCommandDispatcherConnectedTest, RegisteredClientDisconnectCallbackIsCalled)
938{
939 InSequence dummy;
940 dispatcher->registerDisconnectCb(std::bind(&AsyncHiredisClusterCommandDispatcherConnectedTest::disconnectCallback,
941 this));
942 expectAdapterDetach();
943 expectDisconnectCallback();
944 disconnected(&acc, &ac, 0);
945}
946
947TEST_F(AsyncHiredisClusterCommandDispatcherWithPermanentCommandCallbacksTest, CanHandleMultipleRepliesForSameRedisCommand)
948{
949 InSequence dummy;
950 redisClusterCallbackFn* savedCb;
951 void* savedPd;
952 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
953 .Times(1)
954 .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
955 const char*, int, int, const char**, const size_t*)
956 {
957 savedCb = cb;
958 savedPd = pd;
959 return REDIS_OK;
960 }));
961 Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
962 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
963 this,
964 std::placeholders::_1,
965 std::placeholders::_2),
966 defaultNamespace,
967 contents);
968 EXPECT_CALL(*this, ack(std::error_code(), _))
969 .Times(3);
970 redisReply rr;
971 rr.type = REDIS_REPLY_NIL;
972 savedCb(&acc, &rr, savedPd);
973 savedCb(&acc, &rr, savedPd);
974 savedCb(&acc, &rr, savedPd);
975}
976
977TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, CbRemovedAfterHiredisCb)
978{
979 InSequence dummy;
980 redisClusterCallbackFn* savedCb;
981 void* savedPd;
982 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
983 .Times(1)
984 .WillOnce(Invoke([this, &savedCb, &savedPd](redisClusterAsyncContext* acc, redisClusterCallbackFn* cb, void* pd,
985 const char*, int, int, const char**, const size_t*)
986 {
987 savedCb = cb;
988 savedPd = pd;
989 cb(acc, &redisReplyBuilder.buildNilReply(), pd);
990 return REDIS_OK;
991 }));
992 expectAck();
993 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
994 this,
995 std::placeholders::_1,
996 std::placeholders::_2),
997 defaultNamespace,
998 contents);
999 EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1000}
1001
1002TEST_F(AsyncHiredisClusterCommandDispatcherDeathTest, TooManyRepliesAborts)
1003{
1004 InSequence dummy;
1005 redisClusterCallbackFn* savedCb;
1006 void* savedPd;
1007 EXPECT_CALL(hiredisClusterSystemMock, redisClusterAsyncCommandArgvWithKey(&acc, _, _, _, _, _, _, _))
1008 .Times(1)
1009 .WillOnce(Invoke([&savedCb, &savedPd](redisClusterAsyncContext*, redisClusterCallbackFn* cb, void* pd,
1010 const char*, int, int, const char**, const size_t*)
1011 {
1012 savedCb = cb;
1013 savedPd = pd;
1014 return REDIS_OK;
1015 }));
1016 Contents contents({ { "cmd", "key", "value" }, { 3, 3, 5 } });
1017 expectAck();
1018 dispatcher->dispatchAsync(std::bind(&AsyncHiredisClusterCommandDispatcherDeathTest::ack,
1019 this,
1020 std::placeholders::_1,
1021 std::placeholders::_2),
1022 defaultNamespace,
1023 contents);
1024 savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd);
1025 EXPECT_EXIT(savedCb(&acc, &redisReplyBuilder.buildNilReply(), savedPd), KilledBySignal(SIGABRT), "");
1026}