blob: e57962c94de9659b673840973c22aa9034bd92cc [file] [log] [blame]
Ed Warnickecb9cada2015-12-08 15:45:58 -07001/*
2 * mc.c: vlib reliable sequenced multicast distributed applications
3 *
4 * Copyright (c) 2010 Cisco and/or its affiliates.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#include <vlib/vlib.h>
19
Dave Barach9b8ffd92016-07-08 08:13:45 -040020/*
Ed Warnickecb9cada2015-12-08 15:45:58 -070021 * 1 to enable msg id training wheels, which are useful for tracking
22 * down catchup and/or partitioned network problems
23 */
24#define MSG_ID_DEBUG 0
25
26static format_function_t format_mc_stream_state;
27
Dave Barach9b8ffd92016-07-08 08:13:45 -040028static u32
29elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
Ed Warnickecb9cada2015-12-08 15:45:58 -070030{
Dave Barach9b8ffd92016-07-08 08:13:45 -040031 uword *p, r;
32 mhash_t *h = &m->elog_id_by_peer_id;
Ed Warnickecb9cada2015-12-08 15:45:58 -070033
Dave Barach9b8ffd92016-07-08 08:13:45 -040034 if (!m->elog_id_by_peer_id.hash)
Ed Warnickecb9cada2015-12-08 15:45:58 -070035 mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
Dave Barach9b8ffd92016-07-08 08:13:45 -040036
Ed Warnickecb9cada2015-12-08 15:45:58 -070037 p = mhash_get (h, &peer_id);
38 if (p)
39 return p[0];
Dave Barach9b8ffd92016-07-08 08:13:45 -040040 r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
Ed Warnickecb9cada2015-12-08 15:45:58 -070041 mhash_set (h, &peer_id, r, /* old_value */ 0);
42 return r;
43}
44
Dave Barach9b8ffd92016-07-08 08:13:45 -040045static u32
46elog_id_for_msg_name (mc_main_t * m, char *msg_name)
Ed Warnickecb9cada2015-12-08 15:45:58 -070047{
Dave Barach9b8ffd92016-07-08 08:13:45 -040048 uword *p, r;
49 uword *h = m->elog_id_by_msg_name;
Ed Warnickecb9cada2015-12-08 15:45:58 -070050 u8 *name_copy;
51
Dave Barach9b8ffd92016-07-08 08:13:45 -040052 if (!h)
53 h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
Ed Warnickecb9cada2015-12-08 15:45:58 -070054
55 p = hash_get_mem (h, msg_name);
56 if (p)
57 return p[0];
58 r = elog_string (m->elog_main, "%s", msg_name);
59
60 name_copy = format (0, "%s%c", msg_name, 0);
61
62 hash_set_mem (h, name_copy, r);
63 m->elog_id_by_msg_name = h;
64
65 return r;
66}
67
Dave Barach9b8ffd92016-07-08 08:13:45 -040068static void
69elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
70 u32 retry_count)
Ed Warnickecb9cada2015-12-08 15:45:58 -070071{
72 if (MC_EVENT_LOGGING > 0)
73 {
Dave Barach9b8ffd92016-07-08 08:13:45 -040074 /* *INDENT-OFF* */
75 ELOG_TYPE_DECLARE (e) =
76 {
77 .format = "tx-msg: stream %d local seq %d attempt %d",
78 .format_args = "i4i4i4",
79 };
80 /* *INDENT-ON* */
81 struct
82 {
83 u32 stream_id, local_sequence, retry_count;
84 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -070085 ed = ELOG_DATA (m->elog_main, e);
86 ed->stream_id = stream_id;
87 ed->local_sequence = local_sequence;
88 ed->retry_count = retry_count;
89 }
90}
91
92/*
93 * seq_cmp
Dave Barach9b8ffd92016-07-08 08:13:45 -040094 * correctly compare two unsigned sequence numbers.
Ed Warnickecb9cada2015-12-08 15:45:58 -070095 * This function works so long as x and y are within 2**(n-1) of each
96 * other, where n = bits(x, y).
97 *
98 * Magic decoder ring:
99 * seq_cmp == 0 => x and y are equal
100 * seq_cmp < 0 => x is "in the past" with respect to y
101 * seq_cmp > 0 => x is "in the future" with respect to y
102 */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400103always_inline i32
104mc_seq_cmp (u32 x, u32 y)
105{
106 return (i32) x - (i32) y;
107}
Ed Warnickecb9cada2015-12-08 15:45:58 -0700108
Dave Barach9b8ffd92016-07-08 08:13:45 -0400109void *
110mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700111{
112 u32 n_alloc, bi;
Dave Barach9b8ffd92016-07-08 08:13:45 -0400113 vlib_buffer_t *b;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700114
115 n_alloc = vlib_buffer_alloc (vm, &bi, 1);
116 ASSERT (n_alloc == 1);
117
118 b = vlib_get_buffer (vm, bi);
119 b->current_length = n_bytes;
120 *bi_return = bi;
121 return (void *) b->data;
122}
123
124static void
125delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400126 uword index, int notify_application)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700127{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400128 mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700129 ASSERT (p != 0);
130 if (s->config.peer_died && notify_application)
131 s->config.peer_died (mcm, s, p->id);
132
133 s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
134
135 if (MC_EVENT_LOGGING > 0)
136 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400137 /* *INDENT-OFF* */
138 ELOG_TYPE_DECLARE (e) =
139 {
140 .format = "delete peer %s from all_peer_bitmap",
141 .format_args = "T4",
142 };
143 /* *INDENT-ON* */
144 struct
145 {
146 u32 peer;
147 } *ed = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700148
149 ed = ELOG_DATA (mcm->elog_main, e);
150 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
151 }
152 /* Do not delete the pool / hash table entries, or we lose sequence number state */
153}
154
155static mc_stream_peer_t *
156get_or_create_peer_with_id (mc_main_t * mcm,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400157 mc_stream_t * s, mc_peer_id_t id, int *created)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700158{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400159 uword *q = mhash_get (&s->peer_index_by_id, &id);
160 mc_stream_peer_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700161
162 if (q)
163 {
164 p = pool_elt_at_index (s->peers, q[0]);
165 goto done;
166 }
167
168 pool_get (s->peers, p);
169 memset (p, 0, sizeof (p[0]));
170 p->id = id;
171 p->last_sequence_received = ~0;
172 mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
173 if (created)
174 *created = 1;
175
Dave Barach9b8ffd92016-07-08 08:13:45 -0400176done:
Ed Warnickecb9cada2015-12-08 15:45:58 -0700177 if (MC_EVENT_LOGGING > 0)
178 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400179 /* *INDENT-OFF* */
180 ELOG_TYPE_DECLARE (e) =
181 {
182 .format = "get_or_create %s peer %s stream %d seq %d",
183 .format_args = "t4T4i4i4",
184 .n_enum_strings = 2,
185 .enum_strings = {
186 "old", "new",
187 },
188 };
189 /* *INDENT-ON* */
190 struct
191 {
192 u32 is_new, peer, stream_index, rx_sequence;
193 } *ed = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700194
195 ed = ELOG_DATA (mcm->elog_main, e);
196 ed->is_new = q ? 0 : 1;
197 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
198 ed->stream_index = s->index;
199 ed->rx_sequence = p->last_sequence_received;
200 }
201 /* $$$$ Enable or reenable this peer */
202 s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
203 return p;
204}
205
Dave Barach9b8ffd92016-07-08 08:13:45 -0400206static void
207maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700208{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400209 vlib_one_time_waiting_process_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700210
211 if (pool_elts (stream->retry_pool) >= stream->config.window_size)
212 return;
213
214 vec_foreach (p, stream->procs_waiting_for_open_window)
215 vlib_signal_one_time_waiting_process (vm, p);
216
217 if (stream->procs_waiting_for_open_window)
218 _vec_len (stream->procs_waiting_for_open_window) = 0;
219}
220
Dave Barach9b8ffd92016-07-08 08:13:45 -0400221static void
222mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700223{
224 mc_retry_t record, *retp;
Dave Barach9b8ffd92016-07-08 08:13:45 -0400225
Ed Warnickecb9cada2015-12-08 15:45:58 -0700226 if (r->unacked_by_peer_bitmap)
227 _vec_len (r->unacked_by_peer_bitmap) = 0;
228
Dave Barach9b8ffd92016-07-08 08:13:45 -0400229 if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700230 {
231 clib_fifo_sub1 (s->retired_fifo, record);
232 vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
233 }
Dave Barach9b8ffd92016-07-08 08:13:45 -0400234
Ed Warnickecb9cada2015-12-08 15:45:58 -0700235 clib_fifo_add2 (s->retired_fifo, retp);
Dave Barach9b8ffd92016-07-08 08:13:45 -0400236
Ed Warnickecb9cada2015-12-08 15:45:58 -0700237 retp->buffer_index = r->buffer_index;
238 retp->local_sequence = r->local_sequence;
239
240 r->buffer_index = ~0; /* poison buffer index in this retry */
241}
242
Dave Barach9b8ffd92016-07-08 08:13:45 -0400243static void
244mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700245{
246 mc_retry_t *retry;
247
248 if (MC_EVENT_LOGGING > 0)
249 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400250 /* *INDENT-OFF* */
251 ELOG_TYPE_DECLARE (e) =
252 {
253 .format = "resend-retired: search for local seq %d",
254 .format_args = "i4",
255 };
256 /* *INDENT-ON* */
257 struct
258 {
259 u32 local_sequence;
260 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700261 ed = ELOG_DATA (mcm->elog_main, e);
262 ed->local_sequence = local_sequence;
263 }
264
Dave Barach9b8ffd92016-07-08 08:13:45 -0400265 /* *INDENT-OFF* */
266 clib_fifo_foreach (retry, s->retired_fifo,
267 ({
268 if (retry->local_sequence == local_sequence)
269 {
270 elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
271 mcm->transport.tx_buffer (mcm->transport.opaque,
272 MC_TRANSPORT_USER_REQUEST_TO_RELAY,
273 retry->buffer_index);
274 return;
275 }
276 }));
277 /* *INDENT-ON* */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700278
279 if (MC_EVENT_LOGGING > 0)
280 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400281 /* *INDENT-OFF* */
282 ELOG_TYPE_DECLARE (e) =
283 {
284 .format = "resend-retired: FAILED search for local seq %d",
285 .format_args = "i4",
286 };
287 /* *INDENT-ON* */
288 struct
289 {
290 u32 local_sequence;
291 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700292 ed = ELOG_DATA (mcm->elog_main, e);
293 ed->local_sequence = local_sequence;
294 }
295}
296
297static uword *
298delete_retry_fifo_elt (mc_main_t * mcm,
299 mc_stream_t * stream,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400300 mc_retry_t * r, uword * dead_peer_bitmap)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700301{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400302 mc_stream_peer_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700303
Dave Barach9b8ffd92016-07-08 08:13:45 -0400304 /* *INDENT-OFF* */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700305 pool_foreach (p, stream->peers, ({
306 uword pi = p - stream->peers;
307 uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
308
309 if (! is_alive)
310 dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
311
312 if (MC_EVENT_LOGGING > 0)
313 {
314 ELOG_TYPE_DECLARE (e) = {
315 .format = "delete_retry_fifo_elt: peer %s is %s",
316 .format_args = "T4t4",
317 .n_enum_strings = 2,
318 .enum_strings = { "alive", "dead", },
319 };
320 struct { u32 peer, is_alive; } * ed;
321 ed = ELOG_DATA (mcm->elog_main, e);
322 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
323 ed->is_alive = is_alive;
324 }
325 }));
Dave Barach9b8ffd92016-07-08 08:13:45 -0400326 /* *INDENT-ON* */
327
Ed Warnickecb9cada2015-12-08 15:45:58 -0700328 hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
329 mc_retry_free (mcm, stream, r);
330
331 return dead_peer_bitmap;
332}
333
334always_inline mc_retry_t *
335prev_retry (mc_stream_t * s, mc_retry_t * r)
336{
337 return (r->prev_index != ~0
Dave Barach9b8ffd92016-07-08 08:13:45 -0400338 ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700339}
340
341always_inline mc_retry_t *
342next_retry (mc_stream_t * s, mc_retry_t * r)
343{
344 return (r->next_index != ~0
Dave Barach9b8ffd92016-07-08 08:13:45 -0400345 ? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700346}
347
348always_inline void
349remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
350{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400351 mc_retry_t *p = prev_retry (s, r);
352 mc_retry_t *n = next_retry (s, r);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700353
354 if (p)
355 p->next_index = r->next_index;
356 else
357 s->retry_head_index = r->next_index;
358 if (n)
359 n->prev_index = r->prev_index;
360 else
361 s->retry_tail_index = r->prev_index;
362
363 pool_put_index (s->retry_pool, r - s->retry_pool);
364}
365
Dave Barach9b8ffd92016-07-08 08:13:45 -0400366static void
367check_retry (mc_main_t * mcm, mc_stream_t * s)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700368{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400369 mc_retry_t *r;
370 vlib_main_t *vm = mcm->vlib_main;
371 f64 now = vlib_time_now (vm);
372 uword *dead_peer_bitmap = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700373 u32 ri, ri_next;
374
375 for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
376 {
377 r = pool_elt_at_index (s->retry_pool, ri);
378 ri_next = r->next_index;
379
380 if (now < r->sent_at + s->config.retry_interval)
381 continue;
382
383 r->n_retries += 1;
384 if (r->n_retries > s->config.retry_limit)
385 {
386 dead_peer_bitmap =
387 delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
388 remove_retry_from_pool (s, r);
389 }
390 else
391 {
392 if (MC_EVENT_LOGGING > 0)
393 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400394 mc_stream_peer_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700395
Dave Barach9b8ffd92016-07-08 08:13:45 -0400396 /* *INDENT-OFF* */
397 ELOG_TYPE_DECLARE (t) =
398 {
399 .format = "resend local seq %d attempt %d",
400 .format_args = "i4i4",
401 };
402 /* *INDENT-ON* */
403
404 /* *INDENT-OFF* */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700405 pool_foreach (p, s->peers, ({
406 if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
407 {
408 ELOG_TYPE_DECLARE (ev) = {
409 .format = "resend: needed by peer %s local seq %d",
410 .format_args = "T4i4",
411 };
412 struct { u32 peer, rx_sequence; } * ed;
413 ed = ELOG_DATA (mcm->elog_main, ev);
414 ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
415 ed->rx_sequence = r->local_sequence;
416 }
417 }));
Dave Barach9b8ffd92016-07-08 08:13:45 -0400418 /* *INDENT-ON* */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700419
Dave Barach9b8ffd92016-07-08 08:13:45 -0400420 struct
421 {
422 u32 sequence;
423 u32 trail;
424 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700425 ed = ELOG_DATA (mcm->elog_main, t);
426 ed->sequence = r->local_sequence;
427 ed->trail = r->n_retries;
428 }
429
430 r->sent_at = vlib_time_now (vm);
431 s->stats.n_retries += 1;
432
433 elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
434
435 mcm->transport.tx_buffer
Dave Barach9b8ffd92016-07-08 08:13:45 -0400436 (mcm->transport.opaque,
437 MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700438 }
439 }
440
441 maybe_send_window_open_event (mcm->vlib_main, s);
442
443 /* Delete any dead peers we've found. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400444 if (!clib_bitmap_is_zero (dead_peer_bitmap))
Ed Warnickecb9cada2015-12-08 15:45:58 -0700445 {
446 uword i;
447
Dave Barach9b8ffd92016-07-08 08:13:45 -0400448 /* *INDENT-OFF* */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700449 clib_bitmap_foreach (i, dead_peer_bitmap, ({
450 delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
451
452 /* Delete any references to just deleted peer in retry pool. */
453 pool_foreach (r, s->retry_pool, ({
454 r->unacked_by_peer_bitmap =
455 clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
456 }));
457 }));
Dave Barach9b8ffd92016-07-08 08:13:45 -0400458/* *INDENT-ON* */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700459 clib_bitmap_free (dead_peer_bitmap);
460 }
461}
462
463always_inline mc_main_t *
464mc_node_get_main (vlib_node_runtime_t * node)
465{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400466 mc_main_t **p = (void *) node->runtime_data;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700467 return p[0];
468}
469
470static uword
471mc_retry_process (vlib_main_t * vm,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400472 vlib_node_runtime_t * node, vlib_frame_t * f)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700473{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400474 mc_main_t *mcm = mc_node_get_main (node);
475 mc_stream_t *s;
476
Ed Warnickecb9cada2015-12-08 15:45:58 -0700477 while (1)
478 {
479 vlib_process_suspend (vm, 1.0);
480 vec_foreach (s, mcm->stream_vector)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400481 {
482 if (s->state != MC_STREAM_STATE_invalid)
483 check_retry (mcm, s);
484 }
Ed Warnickecb9cada2015-12-08 15:45:58 -0700485 }
Dave Barach9b8ffd92016-07-08 08:13:45 -0400486 return 0; /* not likely */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700487}
488
Dave Barach9b8ffd92016-07-08 08:13:45 -0400489static void
490send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700491{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400492 vlib_main_t *vm = mcm->vlib_main;
493 mc_msg_join_or_leave_request_t *mp;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700494 u32 bi;
495
496 mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
Dave Barach9b8ffd92016-07-08 08:13:45 -0400497 memset (mp, 0, sizeof (*mp));
Ed Warnickecb9cada2015-12-08 15:45:58 -0700498 mp->type = MC_MSG_TYPE_join_or_leave_request;
499 mp->peer_id = mcm->transport.our_ack_peer_id;
500 mp->stream_index = stream_index;
501 mp->is_join = is_join;
502
503 mc_byte_swap_msg_join_or_leave_request (mp);
504
Dave Barach9b8ffd92016-07-08 08:13:45 -0400505 /*
Ed Warnickecb9cada2015-12-08 15:45:58 -0700506 * These msgs are unnumbered, unordered so send on the from-relay
Dave Barach9b8ffd92016-07-08 08:13:45 -0400507 * channel.
Ed Warnickecb9cada2015-12-08 15:45:58 -0700508 */
509 mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
Dave Barach9b8ffd92016-07-08 08:13:45 -0400510}
Ed Warnickecb9cada2015-12-08 15:45:58 -0700511
512static uword
513mc_join_ager_process (vlib_main_t * vm,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400514 vlib_node_runtime_t * node, vlib_frame_t * f)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700515{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400516 mc_main_t *mcm = mc_node_get_main (node);
517
Ed Warnickecb9cada2015-12-08 15:45:58 -0700518 while (1)
519 {
520 if (mcm->joins_in_progress)
521 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400522 mc_stream_t *s;
523 vlib_one_time_waiting_process_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700524 f64 now = vlib_time_now (vm);
525
526 vec_foreach (s, mcm->stream_vector)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400527 {
528 if (s->state != MC_STREAM_STATE_join_in_progress)
529 continue;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700530
Dave Barach9b8ffd92016-07-08 08:13:45 -0400531 if (now > s->join_timeout)
532 {
533 s->state = MC_STREAM_STATE_ready;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700534
Dave Barach9b8ffd92016-07-08 08:13:45 -0400535 if (MC_EVENT_LOGGING > 0)
536 {
537 /* *INDENT-OFF* */
538 ELOG_TYPE_DECLARE (e) =
539 {
540 .format = "stream %d join timeout",
541 };
542 /* *INDENT-ON* */
543 ELOG (mcm->elog_main, e, s->index);
544 }
545 /* Make sure that this app instance exists as a stream peer,
546 or we may answer a catchup request with a NULL
547 all_peer_bitmap... */
548 (void) get_or_create_peer_with_id
549 (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700550
Dave Barach9b8ffd92016-07-08 08:13:45 -0400551 vec_foreach (p, s->procs_waiting_for_join_done)
552 vlib_signal_one_time_waiting_process (vm, p);
553 if (s->procs_waiting_for_join_done)
554 _vec_len (s->procs_waiting_for_join_done) = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700555
Dave Barach9b8ffd92016-07-08 08:13:45 -0400556 mcm->joins_in_progress--;
557 ASSERT (mcm->joins_in_progress >= 0);
558 }
559 else
560 {
561 /* Resent join request which may have been lost. */
562 send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
563
564 /* We're *not* alone, retry for as long as it takes */
565 if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
566 s->join_timeout = vlib_time_now (vm) + 2.0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700567
568
Dave Barach9b8ffd92016-07-08 08:13:45 -0400569 if (MC_EVENT_LOGGING > 0)
570 {
571 /* *INDENT-OFF* */
572 ELOG_TYPE_DECLARE (e) =
573 {
574 .format = "stream %d resend join request",
575 };
576 /* *INDENT-ON* */
577 ELOG (mcm->elog_main, e, s->index);
578 }
579 }
580 }
Ed Warnickecb9cada2015-12-08 15:45:58 -0700581 }
582
583 vlib_process_suspend (vm, .5);
584 }
585
Dave Barach9b8ffd92016-07-08 08:13:45 -0400586 return 0; /* not likely */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700587}
588
Dave Barach9b8ffd92016-07-08 08:13:45 -0400589static void
590serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700591{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400592 char *name = va_arg (*va, char *);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700593 serialize_cstring (m, name);
594}
595
Dave Barach9b8ffd92016-07-08 08:13:45 -0400596static void
597elog_stream_name (char *buf, int n_buf_bytes, char *v)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700598{
Damjan Marionf1213b82016-03-13 02:22:06 +0100599 clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
Ed Warnickecb9cada2015-12-08 15:45:58 -0700600 buf[n_buf_bytes - 1] = 0;
601}
602
Dave Barach9b8ffd92016-07-08 08:13:45 -0400603static void
604unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700605{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400606 mc_main_t *mcm = va_arg (*va, mc_main_t *);
607 char *name;
608 mc_stream_t *s;
609 uword *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700610
611 unserialize_cstring (m, &name);
612
613 if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
614 {
615 if (MC_EVENT_LOGGING > 0)
616 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400617 /* *INDENT-OFF* */
618 ELOG_TYPE_DECLARE (e) =
619 {
620 .format = "stream index %d already named %s",
621 .format_args = "i4s16",
622 };
623 /* *INDENT-ON* */
624 struct
625 {
626 u32 stream_index;
627 char name[16];
628 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700629 ed = ELOG_DATA (mcm->elog_main, e);
630 ed->stream_index = p[0];
631 elog_stream_name (ed->name, sizeof (ed->name), name);
632 }
633
634 vec_free (name);
635 return;
636 }
637
638 vec_add2 (mcm->stream_vector, s, 1);
639 mc_stream_init (s);
640 s->state = MC_STREAM_STATE_name_known;
641 s->index = s - mcm->stream_vector;
642 s->config.name = name;
643
644 if (MC_EVENT_LOGGING > 0)
645 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400646 /* *INDENT-OFF* */
647 ELOG_TYPE_DECLARE (e) =
648 {
649 .format = "stream index %d named %s",
650 .format_args = "i4s16",
651 };
652 /* *INDENT-ON* */
653 struct
654 {
655 u32 stream_index;
656 char name[16];
657 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700658 ed = ELOG_DATA (mcm->elog_main, e);
659 ed->stream_index = s->index;
660 elog_stream_name (ed->name, sizeof (ed->name), name);
661 }
662
663 hash_set_mem (mcm->stream_index_by_name, name, s->index);
664
665 p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
666 if (p)
667 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400668 vlib_one_time_waiting_process_t *wp, **w;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700669 w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
670 vec_foreach (wp, w[0])
671 vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
672 pool_put (mcm->procs_waiting_for_stream_name_pool, w);
673 hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
674 }
675}
676
Dave Barach9b8ffd92016-07-08 08:13:45 -0400677/* *INDENT-OFF* */
678MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
679{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700680 .name = "mc_register_stream_name",
681 .serialize = serialize_mc_register_stream_name,
682 .unserialize = unserialize_mc_register_stream_name,
683};
Dave Barach9b8ffd92016-07-08 08:13:45 -0400684/* *INDENT-ON* */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700685
686void
687mc_rx_buffer_unserialize (mc_main_t * mcm,
688 mc_stream_t * stream,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400689 mc_peer_id_t peer_id, u32 buffer_index)
690{
691 return mc_unserialize (mcm, stream, buffer_index);
692}
Ed Warnickecb9cada2015-12-08 15:45:58 -0700693
694static u8 *
695mc_internal_catchup_snapshot (mc_main_t * mcm,
696 u8 * data_vector,
697 u32 last_global_sequence_processed)
698{
699 serialize_main_t m;
700
701 /* Append serialized data to data vector. */
702 serialize_open_vector (&m, data_vector);
703 m.stream.current_buffer_index = vec_len (data_vector);
704
705 serialize (&m, serialize_mc_main, mcm);
706 return serialize_close_vector (&m);
707}
708
709static void
Dave Barach9b8ffd92016-07-08 08:13:45 -0400710mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700711{
712 serialize_main_t s;
713
714 unserialize_open_data (&s, data, n_data_bytes);
715
716 unserialize (&s, unserialize_mc_main, mcm);
717}
718
719/* Overridden from the application layer, not actually used here */
720void mc_stream_join_process_hold (void) __attribute__ ((weak));
Dave Barach9b8ffd92016-07-08 08:13:45 -0400721void
722mc_stream_join_process_hold (void)
723{
724}
Ed Warnickecb9cada2015-12-08 15:45:58 -0700725
726static u32
727mc_stream_join_helper (mc_main_t * mcm,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400728 mc_stream_config_t * config, u32 is_internal)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700729{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400730 mc_stream_t *s;
731 vlib_main_t *vm = mcm->vlib_main;
732
Ed Warnickecb9cada2015-12-08 15:45:58 -0700733 s = 0;
Dave Barach9b8ffd92016-07-08 08:13:45 -0400734 if (!is_internal)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700735 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400736 uword *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700737
738 /* Already have a stream with given name? */
739 if ((s = mc_stream_by_name (mcm, config->name)))
740 {
741 /* Already joined and ready? */
742 if (s->state == MC_STREAM_STATE_ready)
743 return s->index;
744 }
745
746 /* First join MC internal stream. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400747 if (!mcm->stream_vector
748 || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
749 == MC_STREAM_STATE_invalid))
750 {
Ed Warnickecb9cada2015-12-08 15:45:58 -0700751 static mc_stream_config_t c = {
752 .name = "mc-internal",
753 .rx_buffer = mc_rx_buffer_unserialize,
754 .catchup = mc_internal_catchup,
755 .catchup_snapshot = mc_internal_catchup_snapshot,
756 };
757
Dave Barach9b8ffd92016-07-08 08:13:45 -0400758 c.save_snapshot = config->save_snapshot;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700759
760 mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
761 }
762
763 /* If stream is still unknown register this name and wait for
Dave Barach9b8ffd92016-07-08 08:13:45 -0400764 sequenced message to name stream. This way all peers agree
765 on stream name to index mappings. */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700766 s = mc_stream_by_name (mcm, config->name);
Dave Barach9b8ffd92016-07-08 08:13:45 -0400767 if (!s)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700768 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400769 vlib_one_time_waiting_process_t *wp, **w;
770 u8 *name_copy = format (0, "%s", config->name);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700771
772 mc_serialize_stream (mcm,
773 MC_STREAM_INDEX_INTERNAL,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400774 &mc_register_stream_name_msg, config->name);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700775
776 /* Wait for this stream to be named. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400777 p =
778 hash_get_mem (mcm->procs_waiting_for_stream_name_by_name,
779 name_copy);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700780 if (p)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400781 w =
782 pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool,
783 p[0]);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700784 else
785 {
786 pool_get (mcm->procs_waiting_for_stream_name_pool, w);
Dave Barach9b8ffd92016-07-08 08:13:45 -0400787 if (!mcm->procs_waiting_for_stream_name_by_name)
788 mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
789 sizeof
790 (uword));
Ed Warnickecb9cada2015-12-08 15:45:58 -0700791 hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
792 name_copy,
793 w - mcm->procs_waiting_for_stream_name_pool);
794 w[0] = 0;
795 }
796
797 vec_add2 (w[0], wp, 1);
798 vlib_current_process_wait_for_one_time_event (vm, wp);
799 vec_free (name_copy);
800 }
801
802 /* Name should be known now. */
803 s = mc_stream_by_name (mcm, config->name);
804 ASSERT (s != 0);
805 ASSERT (s->state == MC_STREAM_STATE_name_known);
806 }
807
Dave Barach9b8ffd92016-07-08 08:13:45 -0400808 if (!s)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700809 {
810 vec_add2 (mcm->stream_vector, s, 1);
811 mc_stream_init (s);
812 s->index = s - mcm->stream_vector;
813 }
814
815 {
816 /* Save name since we could have already used it as hash key. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400817 char *name_save = s->config.name;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700818
819 s->config = config[0];
820
821 if (name_save)
822 s->config.name = name_save;
823 }
824
825 if (s->config.window_size == 0)
826 s->config.window_size = 8;
827
828 if (s->config.retry_interval == 0.0)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400829 s->config.retry_interval = 1.0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700830
831 /* Sanity. */
832 ASSERT (s->config.retry_interval < 30);
833
834 if (s->config.retry_limit == 0)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400835 s->config.retry_limit = 7;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700836
837 s->state = MC_STREAM_STATE_join_in_progress;
Dave Barach9b8ffd92016-07-08 08:13:45 -0400838 if (!s->peer_index_by_id.hash)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700839 mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
840
841 /* If we don't hear from someone in 5 seconds, we're alone */
842 s->join_timeout = vlib_time_now (vm) + 5.0;
843 mcm->joins_in_progress++;
844
845 if (MC_EVENT_LOGGING > 0)
846 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400847 /* *INDENT-OFF* */
848 ELOG_TYPE_DECLARE (e) =
849 {
850 .format = "stream index %d join request %s",
851 .format_args = "i4s16",
Ed Warnickecb9cada2015-12-08 15:45:58 -0700852 };
Dave Barach9b8ffd92016-07-08 08:13:45 -0400853 /* *INDENT-ON* */
854 struct
855 {
856 u32 stream_index;
857 char name[16];
858 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700859 ed = ELOG_DATA (mcm->elog_main, e);
860 ed->stream_index = s->index;
861 elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
862 }
863
Dave Barach9b8ffd92016-07-08 08:13:45 -0400864 send_join_or_leave_request (mcm, s->index, 1 /* join */ );
865
Ed Warnickecb9cada2015-12-08 15:45:58 -0700866 vlib_current_process_wait_for_one_time_event_vector
867 (vm, &s->procs_waiting_for_join_done);
Dave Barach9b8ffd92016-07-08 08:13:45 -0400868
Ed Warnickecb9cada2015-12-08 15:45:58 -0700869 if (MC_EVENT_LOGGING)
870 {
871 ELOG_TYPE (e, "join complete stream %d");
872 ELOG (mcm->elog_main, e, s->index);
873 }
874
875 return s->index;
876}
877
Dave Barach9b8ffd92016-07-08 08:13:45 -0400878u32
879mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700880{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400881 return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
882}
883
884void
885mc_stream_leave (mc_main_t * mcm, u32 stream_index)
886{
887 mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
888
889 if (!s)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700890 return;
891
892 if (MC_EVENT_LOGGING)
893 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400894 /* *INDENT-OFF* */
895 ELOG_TYPE_DECLARE (t) =
896 {
897 .format = "leave-stream: %d",.format_args = "i4",
898 };
899 /* *INDENT-ON* */
900 struct
901 {
902 u32 index;
903 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700904 ed = ELOG_DATA (mcm->elog_main, t);
905 ed->index = stream_index;
906 }
907
Dave Barach9b8ffd92016-07-08 08:13:45 -0400908 send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
Ed Warnickecb9cada2015-12-08 15:45:58 -0700909 mc_stream_free (s);
910 s->state = MC_STREAM_STATE_name_known;
911}
912
Dave Barach9b8ffd92016-07-08 08:13:45 -0400913void
914mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
915 mc_msg_join_or_leave_request_t * req,
916 u32 buffer_index)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700917{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400918 mc_stream_t *s;
919 mc_msg_join_reply_t *rep;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700920 u32 bi;
921
922 mc_byte_swap_msg_join_or_leave_request (req);
923
924 s = mc_stream_by_index (mcm, req->stream_index);
Dave Barach9b8ffd92016-07-08 08:13:45 -0400925 if (!s || s->state != MC_STREAM_STATE_ready)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700926 return;
927
928 /* If the peer is joining, create it */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400929 if (req->is_join)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700930 {
Dave Barach9b8ffd92016-07-08 08:13:45 -0400931 mc_stream_t *this_s;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700932
933 /* We're not in a position to catch up a peer until all
934 stream joins are complete. */
935 if (0)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400936 {
937 /* XXX This is hard to test so we've. */
938 vec_foreach (this_s, mcm->stream_vector)
939 {
940 if (this_s->state != MC_STREAM_STATE_ready
941 && this_s->state != MC_STREAM_STATE_name_known)
942 return;
943 }
944 }
945 else if (mcm->joins_in_progress > 0)
946 return;
947
948 (void) get_or_create_peer_with_id (mcm, s, req->peer_id,
949 /* created */ 0);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700950
951 rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
952 memset (rep, 0, sizeof (rep[0]));
953 rep->type = MC_MSG_TYPE_join_reply;
954 rep->stream_index = req->stream_index;
Dave Barach9b8ffd92016-07-08 08:13:45 -0400955
Ed Warnickecb9cada2015-12-08 15:45:58 -0700956 mc_byte_swap_msg_join_reply (rep);
957 /* These two are already in network byte order... */
958 rep->peer_id = mcm->transport.our_ack_peer_id;
959 rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
960
961 mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
962 }
963 else
964 {
965 if (s->config.peer_died)
966 s->config.peer_died (mcm, s, req->peer_id);
967 }
968}
969
Dave Barach9b8ffd92016-07-08 08:13:45 -0400970void
971mc_msg_join_reply_handler (mc_main_t * mcm,
972 mc_msg_join_reply_t * mp, u32 buffer_index)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700973{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400974 mc_stream_t *s;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700975
976 mc_byte_swap_msg_join_reply (mp);
977
978 s = mc_stream_by_index (mcm, mp->stream_index);
979
Dave Barach9b8ffd92016-07-08 08:13:45 -0400980 if (!s || s->state != MC_STREAM_STATE_join_in_progress)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700981 return;
982
Dave Barach9b8ffd92016-07-08 08:13:45 -0400983 /* Switch to catchup state; next join reply
Ed Warnickecb9cada2015-12-08 15:45:58 -0700984 for this stream will be ignored. */
985 s->state = MC_STREAM_STATE_catchup;
986
987 mcm->joins_in_progress--;
988 mcm->transport.catchup_request_fun (mcm->transport.opaque,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400989 mp->stream_index, mp->catchup_peer_id);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700990}
991
Dave Barach9b8ffd92016-07-08 08:13:45 -0400992void
993mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700994{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400995 mc_stream_t *s;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700996
997 while (1)
998 {
999 s = mc_stream_by_name (m, stream_name);
1000 if (s)
1001 break;
1002 vlib_process_suspend (m->vlib_main, .1);
1003 }
1004
1005 /* It's OK to send a message in catchup and ready states. */
1006 if (s->state == MC_STREAM_STATE_catchup
1007 || s->state == MC_STREAM_STATE_ready)
1008 return;
1009
1010 /* Otherwise we are waiting for a join to finish. */
1011 vlib_current_process_wait_for_one_time_event_vector
1012 (m->vlib_main, &s->procs_waiting_for_join_done);
1013}
1014
Dave Barach9b8ffd92016-07-08 08:13:45 -04001015u32
1016mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001017{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001018 mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
1019 vlib_main_t *vm = mcm->vlib_main;
1020 mc_retry_t *r;
1021 mc_msg_user_request_t *mp;
1022 vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001023 u32 ri;
1024
Dave Barach9b8ffd92016-07-08 08:13:45 -04001025 if (!s)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001026 return 0;
1027
1028 if (s->state != MC_STREAM_STATE_ready)
1029 vlib_current_process_wait_for_one_time_event_vector
1030 (vm, &s->procs_waiting_for_join_done);
1031
1032 while (pool_elts (s->retry_pool) >= s->config.window_size)
1033 {
1034 vlib_current_process_wait_for_one_time_event_vector
1035 (vm, &s->procs_waiting_for_open_window);
1036 }
1037
1038 pool_get (s->retry_pool, r);
1039 ri = r - s->retry_pool;
1040
1041 r->prev_index = s->retry_tail_index;
1042 r->next_index = ~0;
1043 s->retry_tail_index = ri;
1044
1045 if (r->prev_index == ~0)
1046 s->retry_head_index = ri;
1047 else
1048 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001049 mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001050 p->next_index = ri;
1051 }
1052
1053 vlib_buffer_advance (b, -sizeof (mp[0]));
1054 mp = vlib_buffer_get_current (b);
1055
1056 mp->peer_id = mcm->transport.our_ack_peer_id;
1057 /* mp->transport.global_sequence set by relay agent. */
1058 mp->global_sequence = 0xdeadbeef;
1059 mp->stream_index = s->index;
1060 mp->local_sequence = s->our_local_sequence++;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001061 mp->n_data_bytes =
1062 vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001063
1064 r->buffer_index = buffer_index;
1065 r->local_sequence = mp->local_sequence;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001066 r->sent_at = vlib_time_now (vm);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001067 r->n_retries = 0;
1068
1069 /* Retry will be freed when all currently known peers have acked. */
1070 vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
1071 vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
1072
Dave Barach9b8ffd92016-07-08 08:13:45 -04001073 hash_set (s->retry_index_by_local_sequence, r->local_sequence,
1074 r - s->retry_pool);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001075
1076 elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
1077
1078 mc_byte_swap_msg_user_request (mp);
1079
Dave Barach9b8ffd92016-07-08 08:13:45 -04001080 mcm->transport.tx_buffer (mcm->transport.opaque,
1081 MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001082
1083 s->user_requests_sent++;
1084
1085 /* return amount of window remaining */
1086 return s->config.window_size - pool_elts (s->retry_pool);
1087}
1088
Dave Barach9b8ffd92016-07-08 08:13:45 -04001089void
1090mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
1091 u32 buffer_index)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001092{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001093 vlib_main_t *vm = mcm->vlib_main;
1094 mc_stream_t *s;
1095 mc_stream_peer_t *peer;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001096 i32 seq_cmp_result;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001097 static int once = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001098
1099 mc_byte_swap_msg_user_request (mp);
1100
1101 s = mc_stream_by_index (mcm, mp->stream_index);
1102
1103 /* Not signed up for this stream? Turf-o-matic */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001104 if (!s || s->state != MC_STREAM_STATE_ready)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001105 {
1106 vlib_buffer_free_one (vm, buffer_index);
1107 return;
1108 }
1109
1110 /* Find peer, including ourselves. */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001111 peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
Ed Warnickecb9cada2015-12-08 15:45:58 -07001112 /* created */ 0);
1113
Dave Barach9b8ffd92016-07-08 08:13:45 -04001114 seq_cmp_result = mc_seq_cmp (mp->local_sequence,
Ed Warnickecb9cada2015-12-08 15:45:58 -07001115 peer->last_sequence_received + 1);
1116
1117 if (MC_EVENT_LOGGING > 0)
1118 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001119 /* *INDENT-OFF* */
1120 ELOG_TYPE_DECLARE (e) =
1121 {
1122 .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
1123 .format_args = "T4i4i4i4",
1124 };
1125 /* *INDENT-ON* */
1126 struct
1127 {
1128 u32 peer, stream_index, rx_sequence;
1129 i32 seq_cmp_result;
1130 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001131 ed = ELOG_DATA (mcm->elog_main, e);
1132 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1133 ed->stream_index = mp->stream_index;
1134 ed->rx_sequence = mp->local_sequence;
1135 ed->seq_cmp_result = seq_cmp_result;
1136 }
1137
Dave Barach9b8ffd92016-07-08 08:13:45 -04001138 if (0 && mp->stream_index == 1 && once == 0)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001139 {
1140 once = 1;
1141 ELOG_TYPE (e, "FAKE lost msg on stream 1");
Dave Barach9b8ffd92016-07-08 08:13:45 -04001142 ELOG (mcm->elog_main, e, 0);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001143 return;
1144 }
1145
1146 peer->last_sequence_received += seq_cmp_result == 0;
1147 s->user_requests_received++;
1148
1149 if (seq_cmp_result > 0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04001150 peer->stats.n_msgs_from_future += 1;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001151
1152 /* Send ack even if msg from future */
1153 if (1)
1154 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001155 mc_msg_user_ack_t *rp;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001156 u32 bi;
1157
1158 rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
1159 rp->peer_id = mcm->transport.our_ack_peer_id;
1160 rp->stream_index = s->index;
1161 rp->local_sequence = mp->local_sequence;
1162 rp->seq_cmp_result = seq_cmp_result;
1163
1164 if (MC_EVENT_LOGGING > 0)
1165 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001166 /* *INDENT-OFF* */
1167 ELOG_TYPE_DECLARE (e) =
1168 {
1169 .format = "tx-ack: stream %d local seq %d",
1170 .format_args = "i4i4",
1171 };
1172 /* *INDENT-ON* */
1173 struct
1174 {
1175 u32 stream_index;
1176 u32 local_sequence;
1177 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001178 ed = ELOG_DATA (mcm->elog_main, e);
1179 ed->stream_index = rp->stream_index;
1180 ed->local_sequence = rp->local_sequence;
1181 }
1182
1183 mc_byte_swap_msg_user_ack (rp);
1184
1185 mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
1186 /* Msg from past? If so, free the buffer... */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001187 if (seq_cmp_result < 0)
1188 {
1189 vlib_buffer_free_one (vm, buffer_index);
1190 peer->stats.n_msgs_from_past += 1;
1191 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07001192 }
Dave Barach9b8ffd92016-07-08 08:13:45 -04001193
Ed Warnickecb9cada2015-12-08 15:45:58 -07001194 if (seq_cmp_result == 0)
1195 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001196 vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001197 switch (s->state)
1198 {
1199 case MC_STREAM_STATE_ready:
1200 vlib_buffer_advance (b, sizeof (mp[0]));
Dave Barach9b8ffd92016-07-08 08:13:45 -04001201 s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001202
1203 /* Stream vector can change address via rx callback for mc-internal
1204 stream. */
1205 s = mc_stream_by_index (mcm, mp->stream_index);
1206 ASSERT (s != 0);
1207 s->last_global_sequence_processed = mp->global_sequence;
1208 break;
1209
1210 case MC_STREAM_STATE_catchup:
1211 clib_fifo_add1 (s->catchup_fifo, buffer_index);
1212 break;
1213
1214 default:
1215 clib_warning ("stream in unknown state %U",
1216 format_mc_stream_state, s->state);
1217 break;
1218 }
1219 }
1220}
1221
Dave Barach9b8ffd92016-07-08 08:13:45 -04001222void
1223mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
1224 u32 buffer_index)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001225{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001226 vlib_main_t *vm = mcm->vlib_main;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001227 uword *p;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001228 mc_stream_t *s;
1229 mc_stream_peer_t *peer;
1230 mc_retry_t *r;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001231 int peer_created = 0;
1232
1233 mc_byte_swap_msg_user_ack (mp);
1234
1235 s = mc_stream_by_index (mcm, mp->stream_index);
1236
1237 if (MC_EVENT_LOGGING > 0)
1238 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001239 /* *INDENT-OFF* */
1240 ELOG_TYPE_DECLARE (t) =
1241 {
1242 .format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
1243 .format_args = "i4T4i4",
1244 };
1245 /* *INDENT-ON* */
1246
1247 struct
1248 {
1249 u32 local_sequence;
1250 u32 peer;
1251 i32 seq_cmp_result;
1252 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001253 ed = ELOG_DATA (mcm->elog_main, t);
1254 ed->local_sequence = mp->local_sequence;
1255 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1256 ed->seq_cmp_result = mp->seq_cmp_result;
1257 }
1258
1259 /* Unknown stream? */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001260 if (!s)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001261 return;
1262
1263 /* Find the peer which just ack'ed. */
1264 peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
1265 /* created */ &peer_created);
1266
Dave Barach9b8ffd92016-07-08 08:13:45 -04001267 /*
Ed Warnickecb9cada2015-12-08 15:45:58 -07001268 * Peer reports message from the future. If it's not in the retry
Dave Barach9b8ffd92016-07-08 08:13:45 -04001269 * fifo, look for a retired message.
Ed Warnickecb9cada2015-12-08 15:45:58 -07001270 */
1271 if (mp->seq_cmp_result > 0)
1272 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001273 p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
1274 mp->seq_cmp_result);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001275 if (p == 0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04001276 mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001277
1278 /* Normal retry should fix it... */
1279 return;
1280 }
1281
Dave Barach9b8ffd92016-07-08 08:13:45 -04001282 /*
Ed Warnickecb9cada2015-12-08 15:45:58 -07001283 * Pointer to the indicated retry fifo entry.
1284 * Worth hashing because we could use a window size of 100 or 1000.
1285 */
1286 p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
1287
Dave Barach9b8ffd92016-07-08 08:13:45 -04001288 /*
Ed Warnickecb9cada2015-12-08 15:45:58 -07001289 * Is this a duplicate ACK, received after we've retired the
1290 * fifo entry. This can happen when learning about new
1291 * peers.
1292 */
1293 if (p == 0)
1294 {
1295 if (MC_EVENT_LOGGING > 0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04001296 {
1297 /* *INDENT-OFF* */
1298 ELOG_TYPE_DECLARE (t) =
Ed Warnickecb9cada2015-12-08 15:45:58 -07001299 {
1300 .format = "ack: for seq %d from peer %s no fifo elt",
1301 .format_args = "i4T4",
Dave Barach9b8ffd92016-07-08 08:13:45 -04001302 };
1303 /* *INDENT-ON* */
1304
1305 struct
1306 {
1307 u32 seq;
1308 u32 peer;
1309 } *ed;
1310 ed = ELOG_DATA (mcm->elog_main, t);
1311 ed->seq = mp->local_sequence;
1312 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1313 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07001314
1315 return;
1316 }
Dave Barach9b8ffd92016-07-08 08:13:45 -04001317
Ed Warnickecb9cada2015-12-08 15:45:58 -07001318 r = pool_elt_at_index (s->retry_pool, p[0]);
1319
1320 /* Make sure that this new peer ACKs our msgs from now on */
1321 if (peer_created)
1322 {
1323 mc_retry_t *later_retry = next_retry (s, r);
1324
1325 while (later_retry)
1326 {
1327 later_retry->unacked_by_peer_bitmap =
1328 clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
1329 peer - s->peers);
1330 later_retry = next_retry (s, later_retry);
1331 }
1332 }
1333
1334 ASSERT (mp->local_sequence == r->local_sequence);
Dave Barach9b8ffd92016-07-08 08:13:45 -04001335
Ed Warnickecb9cada2015-12-08 15:45:58 -07001336 /* If we weren't expecting to hear from this peer */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001337 if (!peer_created &&
1338 !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
Ed Warnickecb9cada2015-12-08 15:45:58 -07001339 {
1340 if (MC_EVENT_LOGGING > 0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04001341 {
1342 /* *INDENT-OFF* */
1343 ELOG_TYPE_DECLARE (t) =
Ed Warnickecb9cada2015-12-08 15:45:58 -07001344 {
1345 .format = "dup-ack: for seq %d from peer %s",
1346 .format_args = "i4T4",
Dave Barach9b8ffd92016-07-08 08:13:45 -04001347 };
1348 /* *INDENT-ON* */
1349 struct
1350 {
1351 u32 seq;
1352 u32 peer;
1353 } *ed;
1354 ed = ELOG_DATA (mcm->elog_main, t);
1355 ed->seq = r->local_sequence;
1356 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1357 }
1358 if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
Ed Warnickecb9cada2015-12-08 15:45:58 -07001359 return;
1360 }
1361
1362 if (MC_EVENT_LOGGING > 0)
1363 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001364 /* *INDENT-OFF* */
1365 ELOG_TYPE_DECLARE (t) =
Ed Warnickecb9cada2015-12-08 15:45:58 -07001366 {
1367 .format = "ack: for seq %d from peer %s",
1368 .format_args = "i4T4",
1369 };
Dave Barach9b8ffd92016-07-08 08:13:45 -04001370 /* *INDENT-ON* */
1371 struct
1372 {
1373 u32 seq;
1374 u32 peer;
1375 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001376 ed = ELOG_DATA (mcm->elog_main, t);
1377 ed->seq = mp->local_sequence;
1378 ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
1379 }
1380
Dave Barach9b8ffd92016-07-08 08:13:45 -04001381 r->unacked_by_peer_bitmap =
Ed Warnickecb9cada2015-12-08 15:45:58 -07001382 clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
Dave Barach9b8ffd92016-07-08 08:13:45 -04001383
Ed Warnickecb9cada2015-12-08 15:45:58 -07001384 /* Not all clients have ack'ed */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001385 if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
Ed Warnickecb9cada2015-12-08 15:45:58 -07001386 {
1387 return;
1388 }
1389 if (MC_EVENT_LOGGING > 0)
1390 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001391 /* *INDENT-OFF* */
1392 ELOG_TYPE_DECLARE (t) =
Ed Warnickecb9cada2015-12-08 15:45:58 -07001393 {
1394 .format = "ack: retire fifo elt loc seq %d after %d acks",
1395 .format_args = "i4i4",
1396 };
Dave Barach9b8ffd92016-07-08 08:13:45 -04001397 /* *INDENT-ON* */
1398 struct
1399 {
1400 u32 seq;
1401 u32 npeers;
1402 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001403 ed = ELOG_DATA (mcm->elog_main, t);
1404 ed->seq = r->local_sequence;
1405 ed->npeers = pool_elts (s->peers);
1406 }
Dave Barach9b8ffd92016-07-08 08:13:45 -04001407
Ed Warnickecb9cada2015-12-08 15:45:58 -07001408 hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
1409 mc_retry_free (mcm, s, r);
1410 remove_retry_from_pool (s, r);
1411 maybe_send_window_open_event (vm, s);
1412}
1413
1414#define EVENT_MC_SEND_CATCHUP_DATA 0
1415
1416static uword
1417mc_catchup_process (vlib_main_t * vm,
Dave Barach9b8ffd92016-07-08 08:13:45 -04001418 vlib_node_runtime_t * node, vlib_frame_t * f)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001419{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001420 mc_main_t *mcm = mc_node_get_main (node);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001421 uword *event_data = 0;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001422 mc_catchup_process_arg_t *args;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001423 int i;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001424
Ed Warnickecb9cada2015-12-08 15:45:58 -07001425 while (1)
1426 {
1427 if (event_data)
Dave Barach9b8ffd92016-07-08 08:13:45 -04001428 _vec_len (event_data) = 0;
1429 vlib_process_wait_for_event_with_type (vm, &event_data,
1430 EVENT_MC_SEND_CATCHUP_DATA);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001431
Dave Barach9b8ffd92016-07-08 08:13:45 -04001432 for (i = 0; i < vec_len (event_data); i++)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001433 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001434 args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
1435
Ed Warnickecb9cada2015-12-08 15:45:58 -07001436 mcm->transport.catchup_send_fun (mcm->transport.opaque,
1437 args->catchup_opaque,
1438 args->catchup_snapshot);
1439
1440 /* Send function will free snapshot data vector. */
1441 pool_put (mcm->catchup_process_args, args);
1442 }
1443 }
1444
Dave Barach9b8ffd92016-07-08 08:13:45 -04001445 return 0; /* not likely */
Ed Warnickecb9cada2015-12-08 15:45:58 -07001446}
1447
Dave Barach9b8ffd92016-07-08 08:13:45 -04001448static void
1449serialize_mc_stream (serialize_main_t * m, va_list * va)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001450{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001451 mc_stream_t *s = va_arg (*va, mc_stream_t *);
1452 mc_stream_peer_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001453
1454 serialize_integer (m, pool_elts (s->peers), sizeof (u32));
Dave Barach9b8ffd92016-07-08 08:13:45 -04001455 /* *INDENT-OFF* */
Ed Warnickecb9cada2015-12-08 15:45:58 -07001456 pool_foreach (p, s->peers, ({
1457 u8 * x = serialize_get (m, sizeof (p->id));
Damjan Marionf1213b82016-03-13 02:22:06 +01001458 clib_memcpy (x, p->id.as_u8, sizeof (p->id));
Dave Barach9b8ffd92016-07-08 08:13:45 -04001459 serialize_integer (m, p->last_sequence_received,
Ed Warnickecb9cada2015-12-08 15:45:58 -07001460 sizeof (p->last_sequence_received));
1461 }));
Dave Barach9b8ffd92016-07-08 08:13:45 -04001462/* *INDENT-ON* */
Ed Warnickecb9cada2015-12-08 15:45:58 -07001463 serialize_bitmap (m, s->all_peer_bitmap);
1464}
1465
Dave Barach9b8ffd92016-07-08 08:13:45 -04001466void
1467unserialize_mc_stream (serialize_main_t * m, va_list * va)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001468{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001469 mc_stream_t *s = va_arg (*va, mc_stream_t *);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001470 u32 i, n_peers;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001471 mc_stream_peer_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001472
1473 unserialize_integer (m, &n_peers, sizeof (u32));
1474 mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
1475 for (i = 0; i < n_peers; i++)
1476 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001477 u8 *x;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001478 pool_get (s->peers, p);
1479 x = unserialize_get (m, sizeof (p->id));
Damjan Marionf1213b82016-03-13 02:22:06 +01001480 clib_memcpy (p->id.as_u8, x, sizeof (p->id));
Dave Barach9b8ffd92016-07-08 08:13:45 -04001481 unserialize_integer (m, &p->last_sequence_received,
1482 sizeof (p->last_sequence_received));
1483 mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */
1484 0);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001485 }
1486 s->all_peer_bitmap = unserialize_bitmap (m);
1487
1488 /* This is really bad. */
1489 if (!s->all_peer_bitmap)
1490 clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
1491}
1492
Dave Barach9b8ffd92016-07-08 08:13:45 -04001493void
1494mc_msg_catchup_request_handler (mc_main_t * mcm,
1495 mc_msg_catchup_request_t * req,
1496 u32 catchup_opaque)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001497{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001498 vlib_main_t *vm = mcm->vlib_main;
1499 mc_stream_t *s;
1500 mc_catchup_process_arg_t *args;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001501
1502 mc_byte_swap_msg_catchup_request (req);
1503
1504 s = mc_stream_by_index (mcm, req->stream_index);
Dave Barach9b8ffd92016-07-08 08:13:45 -04001505 if (!s || s->state != MC_STREAM_STATE_ready)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001506 return;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001507
Ed Warnickecb9cada2015-12-08 15:45:58 -07001508 if (MC_EVENT_LOGGING > 0)
1509 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001510 /* *INDENT-OFF* */
1511 ELOG_TYPE_DECLARE (t) =
1512 {
1513 .format = "catchup-request: from %s stream %d",
1514 .format_args = "T4i4",
1515 };
1516 /* *INDENT-ON* */
1517 struct
1518 {
1519 u32 peer, stream;
1520 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001521 ed = ELOG_DATA (mcm->elog_main, t);
1522 ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
1523 ed->stream = req->stream_index;
1524 }
1525
Dave Barach9b8ffd92016-07-08 08:13:45 -04001526 /*
1527 * The application has to snapshoot its data structures right
1528 * here, right now. If we process any messages after
1529 * noting the last global sequence we've processed, the client
Ed Warnickecb9cada2015-12-08 15:45:58 -07001530 * won't be able to accurately reconstruct our data structures.
1531 *
Dave Barach9b8ffd92016-07-08 08:13:45 -04001532 * Once the data structures are e.g. vec_dup()'ed, we
Ed Warnickecb9cada2015-12-08 15:45:58 -07001533 * send the resulting messages from a separate process, to
1534 * make sure that we don't cause a bunch of message retransmissions
1535 */
1536 pool_get (mcm->catchup_process_args, args);
1537
1538 args->stream_index = s - mcm->stream_vector;
1539 args->catchup_opaque = catchup_opaque;
1540 args->catchup_snapshot = 0;
1541
1542 /* Construct catchup reply and snapshot state for stream to send as
1543 catchup reply payload. */
1544 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001545 mc_msg_catchup_reply_t *rep;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001546 serialize_main_t m;
1547
1548 vec_resize (args->catchup_snapshot, sizeof (rep[0]));
1549
1550 rep = (void *) args->catchup_snapshot;
1551
1552 rep->peer_id = req->peer_id;
1553 rep->stream_index = req->stream_index;
1554 rep->last_global_sequence_included = s->last_global_sequence_processed;
1555
1556 /* Setup for serialize to append to catchup snapshot. */
1557 serialize_open_vector (&m, args->catchup_snapshot);
1558 m.stream.current_buffer_index = vec_len (m.stream.buffer);
1559
1560 serialize (&m, serialize_mc_stream, s);
1561
1562 args->catchup_snapshot = serialize_close_vector (&m);
1563
1564 /* Actually copy internal state */
1565 args->catchup_snapshot = s->config.catchup_snapshot
Dave Barach9b8ffd92016-07-08 08:13:45 -04001566 (mcm, args->catchup_snapshot, rep->last_global_sequence_included);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001567
1568 rep = (void *) args->catchup_snapshot;
1569 rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
1570
1571 mc_byte_swap_msg_catchup_reply (rep);
1572 }
1573
1574 /* now go send it... */
1575 vlib_process_signal_event (vm, mcm->catchup_process,
Dave Barach9b8ffd92016-07-08 08:13:45 -04001576 EVENT_MC_SEND_CATCHUP_DATA,
1577 args - mcm->catchup_process_args);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001578}
1579
1580#define EVENT_MC_UNSERIALIZE_BUFFER 0
1581#define EVENT_MC_UNSERIALIZE_CATCHUP 1
1582
Dave Barach9b8ffd92016-07-08 08:13:45 -04001583void
1584mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
1585 u32 catchup_opaque)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001586{
1587 vlib_process_signal_event (mcm->vlib_main,
1588 mcm->unserialize_process,
1589 EVENT_MC_UNSERIALIZE_CATCHUP,
1590 pointer_to_uword (mp));
1591}
1592
Dave Barach9b8ffd92016-07-08 08:13:45 -04001593static void
1594perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001595{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001596 mc_stream_t *s;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001597 i32 seq_cmp_result;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001598
Ed Warnickecb9cada2015-12-08 15:45:58 -07001599 mc_byte_swap_msg_catchup_reply (mp);
1600
1601 s = mc_stream_by_index (mcm, mp->stream_index);
1602
1603 /* Never heard of this stream or already caught up. */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001604 if (!s || s->state == MC_STREAM_STATE_ready)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001605 return;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001606
Ed Warnickecb9cada2015-12-08 15:45:58 -07001607 {
1608 serialize_main_t m;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001609 mc_stream_peer_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001610 u32 n_stream_bytes;
1611
1612 /* For offline sim replay: save the entire catchup snapshot... */
1613 if (s->config.save_snapshot)
Dave Barach9b8ffd92016-07-08 08:13:45 -04001614 s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
1615 mp->n_data_bytes);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001616
1617 unserialize_open_data (&m, mp->data, mp->n_data_bytes);
1618 unserialize (&m, unserialize_mc_stream, s);
1619
1620 /* Make sure we start numbering our messages as expected */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001621 /* *INDENT-OFF* */
Ed Warnickecb9cada2015-12-08 15:45:58 -07001622 pool_foreach (p, s->peers, ({
1623 if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
1624 s->our_local_sequence = p->last_sequence_received + 1;
1625 }));
Dave Barach9b8ffd92016-07-08 08:13:45 -04001626/* *INDENT-ON* */
Ed Warnickecb9cada2015-12-08 15:45:58 -07001627
1628 n_stream_bytes = m.stream.current_buffer_index;
1629
1630 /* No need to unserialize close; nothing to free. */
1631
1632 /* After serialized stream is user's catchup data. */
1633 s->config.catchup (mcm, mp->data + n_stream_bytes,
Dave Barach9b8ffd92016-07-08 08:13:45 -04001634 mp->n_data_bytes - n_stream_bytes);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001635 }
1636
1637 /* Vector could have been moved by catchup.
1638 This can only happen for mc-internal stream. */
1639 s = mc_stream_by_index (mcm, mp->stream_index);
1640
1641 s->last_global_sequence_processed = mp->last_global_sequence_included;
1642
1643 while (clib_fifo_elts (s->catchup_fifo))
1644 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001645 mc_msg_user_request_t *gp;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001646 u32 bi;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001647 vlib_buffer_t *b;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001648
Dave Barach9b8ffd92016-07-08 08:13:45 -04001649 clib_fifo_sub1 (s->catchup_fifo, bi);
1650
Ed Warnickecb9cada2015-12-08 15:45:58 -07001651 b = vlib_get_buffer (mcm->vlib_main, bi);
1652 gp = vlib_buffer_get_current (b);
1653
1654 /* Make sure we're replaying "new" news */
1655 seq_cmp_result = mc_seq_cmp (gp->global_sequence,
1656 mp->last_global_sequence_included);
1657
1658 if (seq_cmp_result > 0)
1659 {
1660 vlib_buffer_advance (b, sizeof (gp[0]));
1661 s->config.rx_buffer (mcm, s, gp->peer_id, bi);
1662 s->last_global_sequence_processed = gp->global_sequence;
1663
1664 if (MC_EVENT_LOGGING)
1665 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001666 /* *INDENT-OFF* */
1667 ELOG_TYPE_DECLARE (t) =
1668 {
1669 .format = "catchup replay local sequence 0x%x",
1670 .format_args = "i4",
1671 };
1672 /* *INDENT-ON* */
1673 struct
1674 {
1675 u32 local_sequence;
1676 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001677 ed = ELOG_DATA (mcm->elog_main, t);
1678 ed->local_sequence = gp->local_sequence;
1679 }
1680 }
1681 else
1682 {
1683 if (MC_EVENT_LOGGING)
1684 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001685 /* *INDENT-OFF* */
1686 ELOG_TYPE_DECLARE (t) =
1687 {
1688 .format = "catchup discard local sequence 0x%x",
1689 .format_args = "i4",
1690 };
1691 /* *INDENT-ON* */
1692 struct
1693 {
1694 u32 local_sequence;
1695 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001696 ed = ELOG_DATA (mcm->elog_main, t);
1697 ed->local_sequence = gp->local_sequence;
1698 }
1699
1700 vlib_buffer_free_one (mcm->vlib_main, bi);
1701 }
1702 }
1703
1704 s->state = MC_STREAM_STATE_ready;
1705
1706 /* Now that we are caught up wake up joining process. */
1707 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001708 vlib_one_time_waiting_process_t *wp;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001709 vec_foreach (wp, s->procs_waiting_for_join_done)
1710 vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
1711 if (s->procs_waiting_for_join_done)
1712 _vec_len (s->procs_waiting_for_join_done) = 0;
1713 }
1714}
1715
Dave Barach9b8ffd92016-07-08 08:13:45 -04001716static void
1717this_node_maybe_master (mc_main_t * mcm)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001718{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001719 vlib_main_t *vm = mcm->vlib_main;
1720 mc_msg_master_assert_t *mp;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001721 uword event_type;
1722 int timeouts = 0;
1723 int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001724 clib_error_t *error;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001725 f64 now, time_last_master_assert = -1;
1726 u32 bi;
1727
1728 while (1)
1729 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001730 if (!mcm->we_can_be_relay_master)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001731 {
1732 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1733 if (MC_EVENT_LOGGING)
1734 {
1735 ELOG_TYPE (e, "become slave (config)");
1736 ELOG (mcm->elog_main, e, 0);
1737 }
1738 return;
1739 }
1740
1741 now = vlib_time_now (vm);
1742 if (now >= time_last_master_assert + 1)
1743 {
1744 time_last_master_assert = now;
1745 mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
1746
1747 mp->peer_id = mcm->transport.our_ack_peer_id;
1748 mp->global_sequence = mcm->relay_global_sequence;
1749
Dave Barach9b8ffd92016-07-08 08:13:45 -04001750 /*
1751 * these messages clog the event log, set MC_EVENT_LOGGING higher
1752 * if you want them
1753 */
Ed Warnickecb9cada2015-12-08 15:45:58 -07001754 if (MC_EVENT_LOGGING > 1)
1755 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001756 /* *INDENT-OFF* */
1757 ELOG_TYPE_DECLARE (e) =
1758 {
1759 .format = "tx-massert: peer %s global seq %u",
1760 .format_args = "T4i4",
1761 };
1762 /* *INDENT-ON* */
1763 struct
1764 {
1765 u32 peer, global_sequence;
1766 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001767 ed = ELOG_DATA (mcm->elog_main, e);
1768 ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
1769 ed->global_sequence = mp->global_sequence;
1770 }
1771
Dave Barach9b8ffd92016-07-08 08:13:45 -04001772 mc_byte_swap_msg_master_assert (mp);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001773
Dave Barach9b8ffd92016-07-08 08:13:45 -04001774 error =
1775 mcm->transport.tx_buffer (mcm->transport.opaque,
1776 MC_TRANSPORT_MASTERSHIP, bi);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001777 if (error)
1778 clib_error_report (error);
1779 }
1780
1781 vlib_process_wait_for_event_or_clock (vm, 1.0);
1782 event_type = vlib_process_get_events (vm, /* no event data */ 0);
1783
1784 switch (event_type)
1785 {
1786 case ~0:
Dave Barach9b8ffd92016-07-08 08:13:45 -04001787 if (!is_master && timeouts++ > 2)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001788 {
1789 mcm->relay_state = MC_RELAY_STATE_MASTER;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001790 mcm->relay_master_peer_id =
1791 mcm->transport.our_ack_peer_id.as_u64;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001792 if (MC_EVENT_LOGGING)
1793 {
1794 ELOG_TYPE (e, "become master (was maybe_master)");
1795 ELOG (mcm->elog_main, e, 0);
1796 }
1797 return;
1798 }
1799 break;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001800
Ed Warnickecb9cada2015-12-08 15:45:58 -07001801 case MC_RELAY_STATE_SLAVE:
1802 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1803 if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
1804 {
1805 ELOG_TYPE (e, "become slave (was maybe_master)");
1806 ELOG (mcm->elog_main, e, 0);
1807 }
1808 return;
1809 }
1810 }
1811}
1812
Dave Barach9b8ffd92016-07-08 08:13:45 -04001813static void
1814this_node_slave (mc_main_t * mcm)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001815{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001816 vlib_main_t *vm = mcm->vlib_main;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001817 uword event_type;
1818 int timeouts = 0;
1819
1820 if (MC_EVENT_LOGGING)
1821 {
1822 ELOG_TYPE (e, "become slave");
1823 ELOG (mcm->elog_main, e, 0);
1824 }
1825
1826 while (1)
1827 {
1828 vlib_process_wait_for_event_or_clock (vm, 1.0);
1829 event_type = vlib_process_get_events (vm, /* no event data */ 0);
1830
1831 switch (event_type)
1832 {
1833 case ~0:
1834 if (timeouts++ > 2)
1835 {
1836 mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
1837 mcm->relay_master_peer_id = ~0ULL;
1838 if (MC_EVENT_LOGGING)
1839 {
1840 ELOG_TYPE (e, "timeouts; negoitate mastership");
1841 ELOG (mcm->elog_main, e, 0);
1842 }
1843 return;
1844 }
1845 break;
1846
1847 case MC_RELAY_STATE_SLAVE:
1848 mcm->relay_state = MC_RELAY_STATE_SLAVE;
1849 timeouts = 0;
1850 break;
1851 }
1852 }
1853}
1854
1855static uword
1856mc_mastership_process (vlib_main_t * vm,
Dave Barach9b8ffd92016-07-08 08:13:45 -04001857 vlib_node_runtime_t * node, vlib_frame_t * f)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001858{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001859 mc_main_t *mcm = mc_node_get_main (node);
1860
Ed Warnickecb9cada2015-12-08 15:45:58 -07001861 while (1)
1862 {
1863 switch (mcm->relay_state)
1864 {
1865 case MC_RELAY_STATE_NEGOTIATE:
1866 case MC_RELAY_STATE_MASTER:
Dave Barach9b8ffd92016-07-08 08:13:45 -04001867 this_node_maybe_master (mcm);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001868 break;
1869
1870 case MC_RELAY_STATE_SLAVE:
1871 this_node_slave (mcm);
1872 break;
1873 }
1874 }
Dave Barach9b8ffd92016-07-08 08:13:45 -04001875 return 0; /* not likely */
Ed Warnickecb9cada2015-12-08 15:45:58 -07001876}
1877
Dave Barach9b8ffd92016-07-08 08:13:45 -04001878void
1879mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001880{
1881 if (we_can_be_master != mcm->we_can_be_relay_master)
1882 {
1883 mcm->we_can_be_relay_master = we_can_be_master;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001884 vlib_process_signal_event (mcm->vlib_main,
Ed Warnickecb9cada2015-12-08 15:45:58 -07001885 mcm->mastership_process,
1886 MC_RELAY_STATE_NEGOTIATE, 0);
1887 }
1888}
1889
Dave Barach9b8ffd92016-07-08 08:13:45 -04001890void
1891mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
1892 u32 buffer_index)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001893{
1894 mc_peer_id_t his_peer_id, our_peer_id;
1895 i32 seq_cmp_result;
1896 u8 signal_slave = 0;
1897 u8 update_global_sequence = 0;
1898
1899 mc_byte_swap_msg_master_assert (mp);
1900
1901 his_peer_id = mp->peer_id;
1902 our_peer_id = mcm->transport.our_ack_peer_id;
1903
1904 /* compare the incoming global sequence with ours */
1905 seq_cmp_result = mc_seq_cmp (mp->global_sequence,
1906 mcm->relay_global_sequence);
1907
1908 /* If the sender has a lower peer id and the sender's sequence >=
1909 our global sequence, we become a slave. Otherwise we are master. */
Dave Barach9b8ffd92016-07-08 08:13:45 -04001910 if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
1911 && seq_cmp_result >= 0)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001912 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001913 vlib_process_signal_event (mcm->vlib_main,
Ed Warnickecb9cada2015-12-08 15:45:58 -07001914 mcm->mastership_process,
1915 MC_RELAY_STATE_SLAVE, 0);
1916 signal_slave = 1;
1917 }
1918
1919 /* Update our global sequence. */
1920 if (seq_cmp_result > 0)
1921 {
1922 mcm->relay_global_sequence = mp->global_sequence;
1923 update_global_sequence = 1;
1924 }
1925
1926 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001927 uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
1928 mc_mastership_peer_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001929
1930 if (q)
1931 p = vec_elt_at_index (mcm->mastership_peers, q[0]);
1932 else
1933 {
1934 vec_add2 (mcm->mastership_peers, p, 1);
1935 p->peer_id = his_peer_id;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001936 mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id,
1937 p - mcm->mastership_peers,
Ed Warnickecb9cada2015-12-08 15:45:58 -07001938 /* old_value */ 0);
1939 }
1940 p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
1941 }
1942
Dave Barach9b8ffd92016-07-08 08:13:45 -04001943 /*
Ed Warnickecb9cada2015-12-08 15:45:58 -07001944 * these messages clog the event log, set MC_EVENT_LOGGING higher
1945 * if you want them.
1946 */
1947 if (MC_EVENT_LOGGING > 1)
1948 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04001949 /* *INDENT-OFF* */
1950 ELOG_TYPE_DECLARE (e) =
1951 {
1952 .format = "rx-massert: peer %s global seq %u upd %d slave %d",
1953 .format_args = "T4i4i1i1",
1954 };
1955 /* *INDENT-ON* */
1956
1957 struct
1958 {
Ed Warnickecb9cada2015-12-08 15:45:58 -07001959 u32 peer;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001960 u32 global_sequence;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001961 u8 update_sequence;
1962 u8 slave;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001963 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07001964 ed = ELOG_DATA (mcm->elog_main, e);
1965 ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
1966 ed->global_sequence = mp->global_sequence;
1967 ed->update_sequence = update_global_sequence;
1968 ed->slave = signal_slave;
1969 }
1970}
1971
1972static void
1973mc_serialize_init (mc_main_t * mcm)
1974{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001975 mc_serialize_msg_t *m;
1976 vlib_main_t *vm = vlib_get_main ();
Ed Warnickecb9cada2015-12-08 15:45:58 -07001977
1978 mcm->global_msg_index_by_name
Dave Barach9b8ffd92016-07-08 08:13:45 -04001979 = hash_create_string ( /* elts */ 0, sizeof (uword));
Ed Warnickecb9cada2015-12-08 15:45:58 -07001980
1981 m = vm->mc_msg_registrations;
Dave Barach9b8ffd92016-07-08 08:13:45 -04001982
Ed Warnickecb9cada2015-12-08 15:45:58 -07001983 while (m)
1984 {
1985 m->global_index = vec_len (mcm->global_msgs);
Dave Barach9b8ffd92016-07-08 08:13:45 -04001986 hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -07001987 vec_add1 (mcm->global_msgs, m);
1988 m = m->next_registration;
1989 }
1990}
1991
1992clib_error_t *
1993mc_serialize_va (mc_main_t * mc,
Dave Barach9b8ffd92016-07-08 08:13:45 -04001994 u32 stream_index,
Ed Warnickecb9cada2015-12-08 15:45:58 -07001995 u32 multiple_messages_per_vlib_buffer,
Dave Barach9b8ffd92016-07-08 08:13:45 -04001996 mc_serialize_msg_t * msg, va_list * va)
Ed Warnickecb9cada2015-12-08 15:45:58 -07001997{
Dave Barach9b8ffd92016-07-08 08:13:45 -04001998 mc_stream_t *s;
1999 clib_error_t *error;
2000 serialize_main_t *m = &mc->serialize_mains[VLIB_TX];
2001 vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX];
Ed Warnickecb9cada2015-12-08 15:45:58 -07002002 u32 bi, n_before, n_after, n_total, n_this_msg;
2003 u32 si, gi;
2004
Dave Barach9b8ffd92016-07-08 08:13:45 -04002005 if (!sbm->vlib_main)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002006 {
2007 sbm->tx.max_n_data_bytes_per_chain = 4096;
2008 sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
2009 }
2010
2011 if (sbm->first_buffer == 0)
2012 serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
2013
2014 n_before = serialize_vlib_buffer_n_bytes (m);
2015
2016 s = mc_stream_by_index (mc, stream_index);
2017 gi = msg->global_index;
2018 ASSERT (msg == vec_elt (mc->global_msgs, gi));
2019
2020 si = ~0;
2021 if (gi < vec_len (s->stream_msg_index_by_global_index))
2022 si = s->stream_msg_index_by_global_index[gi];
2023
2024 serialize_likely_small_unsigned_integer (m, si);
2025
2026 /* For first time message is sent, use name to identify message. */
Dave Barach9b8ffd92016-07-08 08:13:45 -04002027 if (si == ~0 || MSG_ID_DEBUG)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002028 serialize_cstring (m, msg->name);
2029
2030 if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
2031 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04002032 /* *INDENT-OFF* */
2033 ELOG_TYPE_DECLARE (e) =
2034 {
2035 .format = "serialize-msg: %s index %d",
2036 .format_args = "T4i4",
2037 };
2038 /* *INDENT-ON* */
2039 struct
2040 {
2041 u32 c[2];
2042 } *ed;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002043 ed = ELOG_DATA (mc->elog_main, e);
2044 ed->c[0] = elog_id_for_msg_name (mc, msg->name);
2045 ed->c[1] = si;
2046 }
2047
2048 error = va_serialize (m, va);
2049
2050 n_after = serialize_vlib_buffer_n_bytes (m);
2051 n_this_msg = n_after - n_before;
2052 n_total = n_after + sizeof (mc_msg_user_request_t);
2053
2054 /* For max message size ignore first message where string name is sent. */
2055 if (si != ~0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002056 msg->max_n_bytes_serialized =
2057 clib_max (msg->max_n_bytes_serialized, n_this_msg);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002058
Dave Barach9b8ffd92016-07-08 08:13:45 -04002059 if (!multiple_messages_per_vlib_buffer
Ed Warnickecb9cada2015-12-08 15:45:58 -07002060 || si == ~0
Dave Barach9b8ffd92016-07-08 08:13:45 -04002061 || n_total + msg->max_n_bytes_serialized >
2062 mc->transport.max_packet_size)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002063 {
2064 bi = serialize_close_vlib_buffer (m);
2065 sbm->first_buffer = 0;
Dave Barach9b8ffd92016-07-08 08:13:45 -04002066 if (!error)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002067 mc_stream_send (mc, stream_index, bi);
2068 else if (bi != ~0)
2069 vlib_buffer_free_one (mc->vlib_main, bi);
2070 }
2071
2072 return error;
2073}
2074
2075clib_error_t *
2076mc_serialize_internal (mc_main_t * mc,
2077 u32 stream_index,
2078 u32 multiple_messages_per_vlib_buffer,
Dave Barach9b8ffd92016-07-08 08:13:45 -04002079 mc_serialize_msg_t * msg, ...)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002080{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002081 vlib_main_t *vm = mc->vlib_main;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002082 va_list va;
Dave Barach9b8ffd92016-07-08 08:13:45 -04002083 clib_error_t *error;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002084
2085 if (stream_index == ~0)
2086 {
2087 if (vm->mc_main && vm->mc_stream_index == ~0)
2088 vlib_current_process_wait_for_one_time_event_vector
2089 (vm, &vm->procs_waiting_for_mc_stream_join);
2090 stream_index = vm->mc_stream_index;
2091 }
2092
2093 va_start (va, msg);
2094 error = mc_serialize_va (mc, stream_index,
Dave Barach9b8ffd92016-07-08 08:13:45 -04002095 multiple_messages_per_vlib_buffer, msg, &va);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002096 va_end (va);
2097 return error;
2098}
2099
Dave Barach9b8ffd92016-07-08 08:13:45 -04002100uword
2101mc_unserialize_message (mc_main_t * mcm,
2102 mc_stream_t * s, serialize_main_t * m)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002103{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002104 mc_serialize_stream_msg_t *sm;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002105 u32 gi, si;
2106
2107 si = unserialize_likely_small_unsigned_integer (m);
2108
Dave Barach9b8ffd92016-07-08 08:13:45 -04002109 if (!(si == ~0 || MSG_ID_DEBUG))
Ed Warnickecb9cada2015-12-08 15:45:58 -07002110 {
2111 sm = vec_elt_at_index (s->stream_msgs, si);
2112 gi = sm->global_index;
2113 }
2114 else
2115 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04002116 char *name;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002117
2118 unserialize_cstring (m, &name);
2119
2120 if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002121 {
2122 /* *INDENT-OFF* */
2123 ELOG_TYPE_DECLARE (e) =
2124 {
2125 .format = "unserialize-msg: %s rx index %d",
2126 .format_args = "T4i4",
2127 };
2128 /* *INDENT-ON* */
2129 struct
2130 {
2131 u32 c[2];
2132 } *ed;
2133 ed = ELOG_DATA (mcm->elog_main, e);
2134 ed->c[0] = elog_id_for_msg_name (mcm, name);
2135 ed->c[1] = si;
2136 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002137
2138 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04002139 uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
2140 gi = p ? p[0] : ~0;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002141 }
2142
2143 /* Unknown message? */
2144 if (gi == ~0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002145 {
2146 vec_free (name);
2147 goto done;
2148 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002149
2150 vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
2151 si = s->stream_msg_index_by_global_index[gi];
2152
2153 /* Stream local index unknown? Create it. */
2154 if (si == ~0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002155 {
2156 vec_add2 (s->stream_msgs, sm, 1);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002157
Dave Barach9b8ffd92016-07-08 08:13:45 -04002158 si = sm - s->stream_msgs;
2159 sm->global_index = gi;
2160 s->stream_msg_index_by_global_index[gi] = si;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002161
Dave Barach9b8ffd92016-07-08 08:13:45 -04002162 if (MC_EVENT_LOGGING > 0)
2163 {
2164 /* *INDENT-OFF* */
2165 ELOG_TYPE_DECLARE (e) =
2166 {
2167 .format = "msg-bind: stream %d %s to index %d",
2168 .format_args = "i4T4i4",
2169 };
2170 /* *INDENT-ON* */
2171 struct
2172 {
2173 u32 c[3];
2174 } *ed;
2175 ed = ELOG_DATA (mcm->elog_main, e);
2176 ed->c[0] = s->index;
2177 ed->c[1] = elog_id_for_msg_name (mcm, name);
2178 ed->c[2] = si;
2179 }
2180 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002181 else
Dave Barach9b8ffd92016-07-08 08:13:45 -04002182 {
2183 sm = vec_elt_at_index (s->stream_msgs, si);
2184 if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
2185 {
2186 /* *INDENT-OFF* */
2187 ELOG_TYPE_DECLARE (e) =
2188 {
2189 .format = "msg-id-ERROR: %s index %d expected %d",
2190 .format_args = "T4i4i4",
2191 };
2192 /* *INDENT-ON* */
2193 struct
2194 {
2195 u32 c[3];
2196 } *ed;
2197 ed = ELOG_DATA (mcm->elog_main, e);
2198 ed->c[0] = elog_id_for_msg_name (mcm, name);
2199 ed->c[1] = si;
2200 ed->c[2] = ~0;
2201 if (sm->global_index <
2202 vec_len (s->stream_msg_index_by_global_index))
2203 ed->c[2] =
2204 s->stream_msg_index_by_global_index[sm->global_index];
2205 }
2206 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002207
2208 vec_free (name);
2209 }
2210
2211 if (gi != ~0)
2212 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04002213 mc_serialize_msg_t *msg;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002214 msg = vec_elt (mcm->global_msgs, gi);
2215 unserialize (m, msg->unserialize, mcm);
2216 }
2217
Dave Barach9b8ffd92016-07-08 08:13:45 -04002218done:
Ed Warnickecb9cada2015-12-08 15:45:58 -07002219 return gi != ~0;
2220}
2221
2222void
2223mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
2224{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002225 vlib_main_t *vm = mcm->vlib_main;
2226 serialize_main_t *m = &mcm->serialize_mains[VLIB_RX];
2227 vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX];
2228 mc_stream_and_buffer_t *sb;
2229 mc_stream_t *stream;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002230 u32 buffer_index;
2231
Dave Barach9b8ffd92016-07-08 08:13:45 -04002232 sb =
2233 pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers,
2234 stream_and_buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002235 buffer_index = sb->buffer_index;
2236 stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
2237 pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
2238
2239 if (stream->config.save_snapshot)
2240 {
2241 u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
Dave Barach9b8ffd92016-07-08 08:13:45 -04002242 static u8 *contents;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002243 vec_reset_length (contents);
2244 vec_validate (contents, n_bytes - 1);
2245 vlib_buffer_contents (vm, buffer_index, contents);
Dave Barach9b8ffd92016-07-08 08:13:45 -04002246 stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
2247 n_bytes);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002248 }
2249
2250 ASSERT (vlib_in_process_context (vm));
2251
2252 unserialize_open_vlib_buffer (m, vm, sbm);
2253
2254 clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
2255
2256 while (unserialize_vlib_buffer_n_bytes (m) > 0)
2257 mc_unserialize_message (mcm, stream, m);
2258
2259 /* Frees buffer. */
2260 unserialize_close_vlib_buffer (m);
2261}
2262
2263void
2264mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
2265{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002266 vlib_main_t *vm = mcm->vlib_main;
2267 mc_stream_and_buffer_t *sb;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002268 pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
2269 sb->stream_index = s->index;
2270 sb->buffer_index = buffer_index;
Dave Barach9b8ffd92016-07-08 08:13:45 -04002271 vlib_process_signal_event (vm, mcm->unserialize_process,
2272 EVENT_MC_UNSERIALIZE_BUFFER,
2273 sb - mcm->mc_unserialize_stream_and_buffers);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002274}
2275
2276static uword
2277mc_unserialize_process (vlib_main_t * vm,
Dave Barach9b8ffd92016-07-08 08:13:45 -04002278 vlib_node_runtime_t * node, vlib_frame_t * f)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002279{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002280 mc_main_t *mcm = mc_node_get_main (node);
2281 uword event_type, *event_data = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002282 int i;
Dave Barach9b8ffd92016-07-08 08:13:45 -04002283
Ed Warnickecb9cada2015-12-08 15:45:58 -07002284 while (1)
2285 {
2286 if (event_data)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002287 _vec_len (event_data) = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002288
2289 vlib_process_wait_for_event (vm);
2290 event_type = vlib_process_get_events (vm, &event_data);
2291 switch (event_type)
2292 {
2293 case EVENT_MC_UNSERIALIZE_BUFFER:
2294 for (i = 0; i < vec_len (event_data); i++)
2295 mc_unserialize_internal (mcm, event_data[i]);
2296 break;
2297
2298 case EVENT_MC_UNSERIALIZE_CATCHUP:
2299 for (i = 0; i < vec_len (event_data); i++)
2300 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04002301 u8 *mp = uword_to_pointer (event_data[i], u8 *);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002302 perform_catchup (mcm, (void *) mp);
2303 vec_free (mp);
2304 }
2305 break;
2306
2307 default:
2308 break;
2309 }
2310 }
2311
Dave Barach9b8ffd92016-07-08 08:13:45 -04002312 return 0; /* not likely */
Ed Warnickecb9cada2015-12-08 15:45:58 -07002313}
2314
Dave Barach9b8ffd92016-07-08 08:13:45 -04002315void
2316serialize_mc_main (serialize_main_t * m, va_list * va)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002317{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002318 mc_main_t *mcm = va_arg (*va, mc_main_t *);
2319 mc_stream_t *s;
2320 mc_serialize_stream_msg_t *sm;
2321 mc_serialize_msg_t *msg;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002322
2323 serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
2324 vec_foreach (s, mcm->stream_vector)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002325 {
2326 /* Stream name. */
2327 serialize_cstring (m, s->config.name);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002328
Dave Barach9b8ffd92016-07-08 08:13:45 -04002329 /* Serialize global names for all sent messages. */
2330 serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
2331 vec_foreach (sm, s->stream_msgs)
2332 {
2333 msg = vec_elt (mcm->global_msgs, sm->global_index);
2334 serialize_cstring (m, msg->name);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002335 }
Dave Barach9b8ffd92016-07-08 08:13:45 -04002336 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002337}
2338
Dave Barach9b8ffd92016-07-08 08:13:45 -04002339void
2340unserialize_mc_main (serialize_main_t * m, va_list * va)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002341{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002342 mc_main_t *mcm = va_arg (*va, mc_main_t *);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002343 u32 i, n_streams, n_stream_msgs;
Dave Barach9b8ffd92016-07-08 08:13:45 -04002344 char *name;
2345 mc_stream_t *s;
2346 mc_serialize_stream_msg_t *sm;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002347
2348 unserialize_integer (m, &n_streams, sizeof (u32));
2349 for (i = 0; i < n_streams; i++)
2350 {
2351 unserialize_cstring (m, &name);
Dave Barach9b8ffd92016-07-08 08:13:45 -04002352 if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
2353 {
2354 vec_validate (mcm->stream_vector, i);
2355 s = vec_elt_at_index (mcm->stream_vector, i);
2356 mc_stream_init (s);
2357 s->index = s - mcm->stream_vector;
2358 s->config.name = name;
2359 s->state = MC_STREAM_STATE_name_known;
2360 hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
2361 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002362 else
Dave Barach9b8ffd92016-07-08 08:13:45 -04002363 vec_free (name);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002364
2365 s = vec_elt_at_index (mcm->stream_vector, i);
2366
2367 vec_free (s->stream_msgs);
2368 vec_free (s->stream_msg_index_by_global_index);
2369
2370 unserialize_integer (m, &n_stream_msgs, sizeof (u32));
2371 vec_resize (s->stream_msgs, n_stream_msgs);
2372 vec_foreach (sm, s->stream_msgs)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002373 {
2374 uword *p;
2375 u32 si, gi;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002376
Dave Barach9b8ffd92016-07-08 08:13:45 -04002377 unserialize_cstring (m, &name);
2378 p = hash_get (mcm->global_msg_index_by_name, name);
2379 gi = p ? p[0] : ~0;
2380 si = sm - s->stream_msgs;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002381
Dave Barach9b8ffd92016-07-08 08:13:45 -04002382 if (MC_EVENT_LOGGING > 0)
2383 {
2384 /* *INDENT-OFF* */
2385 ELOG_TYPE_DECLARE (e) =
2386 {
Ed Warnickecb9cada2015-12-08 15:45:58 -07002387 .format = "catchup-bind: %s to %d global index %d stream %d",
2388 .format_args = "T4i4i4i4",
2389 };
Dave Barach9b8ffd92016-07-08 08:13:45 -04002390 /* *INDENT-ON* */
Ed Warnickecb9cada2015-12-08 15:45:58 -07002391
Dave Barach9b8ffd92016-07-08 08:13:45 -04002392 struct
Ed Warnickecb9cada2015-12-08 15:45:58 -07002393 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04002394 u32 c[4];
2395 } *ed;
2396 ed = ELOG_DATA (mcm->elog_main, e);
2397 ed->c[0] = elog_id_for_msg_name (mcm, name);
2398 ed->c[1] = si;
2399 ed->c[2] = gi;
2400 ed->c[3] = s->index;
2401 }
2402
2403 vec_free (name);
2404
2405 sm->global_index = gi;
2406 if (gi != ~0)
2407 {
2408 vec_validate_init_empty (s->stream_msg_index_by_global_index,
2409 gi, ~0);
2410 s->stream_msg_index_by_global_index[gi] = si;
2411 }
2412 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002413 }
2414}
2415
Dave Barach9b8ffd92016-07-08 08:13:45 -04002416void
2417mc_main_init (mc_main_t * mcm, char *tag)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002418{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002419 vlib_main_t *vm = vlib_get_main ();
Ed Warnickecb9cada2015-12-08 15:45:58 -07002420
2421 mcm->vlib_main = vm;
2422 mcm->elog_main = &vm->elog_main;
2423
2424 mcm->relay_master_peer_id = ~0ULL;
2425 mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
2426
2427 mcm->stream_index_by_name
Dave Barach9b8ffd92016-07-08 08:13:45 -04002428 = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
Ed Warnickecb9cada2015-12-08 15:45:58 -07002429
2430 {
2431 vlib_node_registration_t r;
2432
2433 memset (&r, 0, sizeof (r));
2434
2435 r.type = VLIB_NODE_TYPE_PROCESS;
2436
2437 /* Point runtime data to main instance. */
2438 r.runtime_data = &mcm;
2439 r.runtime_data_bytes = sizeof (&mcm);
2440
2441 r.name = (char *) format (0, "mc-mastership-%s", tag);
2442 r.function = mc_mastership_process;
2443 mcm->mastership_process = vlib_register_node (vm, &r);
2444
2445 r.name = (char *) format (0, "mc-join-ager-%s", tag);
2446 r.function = mc_join_ager_process;
2447 mcm->join_ager_process = vlib_register_node (vm, &r);
2448
2449 r.name = (char *) format (0, "mc-retry-%s", tag);
2450 r.function = mc_retry_process;
2451 mcm->retry_process = vlib_register_node (vm, &r);
2452
2453 r.name = (char *) format (0, "mc-catchup-%s", tag);
2454 r.function = mc_catchup_process;
2455 mcm->catchup_process = vlib_register_node (vm, &r);
2456
2457 r.name = (char *) format (0, "mc-unserialize-%s", tag);
2458 r.function = mc_unserialize_process;
2459 mcm->unserialize_process = vlib_register_node (vm, &r);
2460 }
2461
2462 if (MC_EVENT_LOGGING > 0)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002463 mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
2464 sizeof (mc_peer_id_t));
Ed Warnickecb9cada2015-12-08 15:45:58 -07002465
Dave Barach9b8ffd92016-07-08 08:13:45 -04002466 mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
2467 sizeof (mc_peer_id_t));
Ed Warnickecb9cada2015-12-08 15:45:58 -07002468 mc_serialize_init (mcm);
2469}
2470
Dave Barach9b8ffd92016-07-08 08:13:45 -04002471static u8 *
2472format_mc_relay_state (u8 * s, va_list * args)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002473{
2474 mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
Dave Barach9b8ffd92016-07-08 08:13:45 -04002475 char *t = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002476 switch (state)
2477 {
2478 case MC_RELAY_STATE_NEGOTIATE:
2479 t = "negotiate";
2480 break;
2481 case MC_RELAY_STATE_MASTER:
2482 t = "master";
2483 break;
2484 case MC_RELAY_STATE_SLAVE:
2485 t = "slave";
2486 break;
2487 default:
2488 return format (s, "unknown 0x%x", state);
2489 }
2490
2491 return format (s, "%s", t);
2492}
2493
Dave Barach9b8ffd92016-07-08 08:13:45 -04002494static u8 *
2495format_mc_stream_state (u8 * s, va_list * args)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002496{
2497 mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
Dave Barach9b8ffd92016-07-08 08:13:45 -04002498 char *t = 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002499 switch (state)
2500 {
2501#define _(f) case MC_STREAM_STATE_##f: t = #f; break;
2502 foreach_mc_stream_state
2503#undef _
2504 default:
2505 return format (s, "unknown 0x%x", state);
2506 }
2507
2508 return format (s, "%s", t);
2509}
2510
Matus Fabiand2dc3df2015-12-14 10:31:33 -05002511static int
Dave Barach9b8ffd92016-07-08 08:13:45 -04002512mc_peer_comp (void *a1, void *a2)
Matus Fabiand2dc3df2015-12-14 10:31:33 -05002513{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002514 mc_stream_peer_t *p1 = a1;
2515 mc_stream_peer_t *p2 = a2;
Matus Fabiand2dc3df2015-12-14 10:31:33 -05002516
2517 return mc_peer_id_compare (p1->id, p2->id);
2518}
2519
Dave Barach9b8ffd92016-07-08 08:13:45 -04002520u8 *
2521format_mc_main (u8 * s, va_list * args)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002522{
Dave Barach9b8ffd92016-07-08 08:13:45 -04002523 mc_main_t *mcm = va_arg (*args, mc_main_t *);
2524 mc_stream_t *t;
2525 mc_stream_peer_t *p, *ps;
Christophe Fontained3c008d2017-10-02 18:10:54 +02002526 u32 indent = format_get_indent (s);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002527
Dave Barach9b8ffd92016-07-08 08:13:45 -04002528 s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
2529 format_mc_relay_state, mcm->relay_state,
2530 vec_len (mcm->stream_vector), mcm->relay_global_sequence);
Ed Warnickecb9cada2015-12-08 15:45:58 -07002531
2532 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04002533 mc_mastership_peer_t *mp;
Ed Warnickecb9cada2015-12-08 15:45:58 -07002534 f64 now = vlib_time_now (mcm->vlib_main);
2535 s = format (s, "\n%UMost recent mastership peers:",
2536 format_white_space, indent + 2);
2537 vec_foreach (mp, mcm->mastership_peers)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002538 {
2539 s = format (s, "\n%U%-30U%.4e",
2540 format_white_space, indent + 4,
2541 mcm->transport.format_peer_id, mp->peer_id,
2542 now - mp->time_last_master_assert_received);
2543 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002544 }
2545
2546 vec_foreach (t, mcm->stream_vector)
Dave Barach9b8ffd92016-07-08 08:13:45 -04002547 {
2548 s = format (s, "\n%Ustream `%s' index %d",
2549 format_white_space, indent + 2, t->config.name, t->index);
2550
2551 s = format (s, "\n%Ustate %U",
2552 format_white_space, indent + 4,
2553 format_mc_stream_state, t->state);
2554
2555 s =
2556 format (s,
2557 "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
2558 format_white_space, indent + 4, t->config.retry_interval,
2559 t->config.retry_limit, pool_elts (t->retry_pool),
2560 t->stats.n_retries - t->stats_last_clear.n_retries);
2561
2562 s = format (s, "\n%U%Ld/%Ld user requests sent/received",
2563 format_white_space, indent + 4,
2564 t->user_requests_sent, t->user_requests_received);
2565
2566 s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
2567 format_white_space, indent + 4,
2568 pool_elts (t->peers),
2569 t->our_local_sequence, t->last_global_sequence_processed);
2570
2571 ps = 0;
2572 /* *INDENT-OFF* */
2573 pool_foreach (p, t->peers,
2574 ({
2575 if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
2576 vec_add1 (ps, p[0]);
2577 }));
2578 /* *INDENT-ON* */
2579 vec_sort_with_function (ps, mc_peer_comp);
2580 s = format (s, "\n%U%=30s%10s%16s%16s",
2581 format_white_space, indent + 6,
2582 "Peer", "Last seq", "Retries", "Future");
2583
2584 vec_foreach (p, ps)
Ed Warnickecb9cada2015-12-08 15:45:58 -07002585 {
Dave Barach9b8ffd92016-07-08 08:13:45 -04002586 s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
2587 format_white_space, indent + 6,
2588 mcm->transport.format_peer_id, p->id.as_u64,
2589 p->last_sequence_received,
2590 p->stats.n_msgs_from_past -
2591 p->stats_last_clear.n_msgs_from_past,
2592 p->stats.n_msgs_from_future -
2593 p->stats_last_clear.n_msgs_from_future,
2594 (mcm->transport.our_ack_peer_id.as_u64 ==
2595 p->id.as_u64 ? " (self)" : ""));
Ed Warnickecb9cada2015-12-08 15:45:58 -07002596 }
Dave Barach9b8ffd92016-07-08 08:13:45 -04002597 vec_free (ps);
2598 }
Ed Warnickecb9cada2015-12-08 15:45:58 -07002599
2600 return s;
2601}
Dave Barach9b8ffd92016-07-08 08:13:45 -04002602
2603/*
2604 * fd.io coding-style-patch-verification: ON
2605 *
2606 * Local Variables:
2607 * eval: (c-set-style "gnu")
2608 * End:
2609 */