blob: 28f94350537cdd57bc9f4156849d93027eae20f5 [file] [log] [blame]
Ed Warnickecb9cada2015-12-08 15:45:58 -07001/*
2 * mc.h: vlib reliable sequenced multicast distributed applications
3 *
4 * Copyright (c) 2010 Cisco and/or its affiliates.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef included_vlib_mc_h
19#define included_vlib_mc_h
20
21#include <vppinfra/elog.h>
22#include <vppinfra/fifo.h>
23#include <vppinfra/mhash.h>
24#include <vlib/node.h>
25
26#ifndef MC_EVENT_LOGGING
27#define MC_EVENT_LOGGING 1
28#endif
29
30always_inline uword
31mc_need_byte_swap (void)
Dave Barach9b8ffd92016-07-08 08:13:45 -040032{
33 return CLIB_ARCH_IS_LITTLE_ENDIAN;
34}
Ed Warnickecb9cada2015-12-08 15:45:58 -070035
Dave Barach9b8ffd92016-07-08 08:13:45 -040036/*
37 * Used to uniquely identify hosts.
38 * For IP4 this would be ip4_address plus tcp/udp port.
Ed Warnickecb9cada2015-12-08 15:45:58 -070039 */
Dave Barach9b8ffd92016-07-08 08:13:45 -040040typedef union
41{
Ed Warnickecb9cada2015-12-08 15:45:58 -070042 u8 as_u8[8];
43 u64 as_u64;
44} mc_peer_id_t;
45
46always_inline mc_peer_id_t
47mc_byte_swap_peer_id (mc_peer_id_t i)
Dave Barach9b8ffd92016-07-08 08:13:45 -040048{
Ed Warnickecb9cada2015-12-08 15:45:58 -070049 /* Peer id is already in network byte order. */
50 return i;
51}
52
53always_inline int
54mc_peer_id_compare (mc_peer_id_t a, mc_peer_id_t b)
55{
56 return memcmp (a.as_u8, b.as_u8, sizeof (a.as_u8));
57}
58
59/* Assert mastership. Lowest peer_id amount all peers wins mastership.
60 Only sent/received over mastership channel (MC_TRANSPORT_MASTERSHIP).
61 So, we don't need a message opcode. */
Dave Barach9b8ffd92016-07-08 08:13:45 -040062typedef CLIB_PACKED (struct
63 {
64 /* Peer id asserting mastership. */
65 mc_peer_id_t peer_id;
66 /* Global sequence number asserted. */
67 u32 global_sequence;}) mc_msg_master_assert_t;
Ed Warnickecb9cada2015-12-08 15:45:58 -070068
69always_inline void
70mc_byte_swap_msg_master_assert (mc_msg_master_assert_t * r)
71{
72 if (mc_need_byte_swap ())
73 {
74 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
75 r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
76 }
77}
78
79#define foreach_mc_msg_type \
80 _ (master_assert) \
81 _ (join_or_leave_request) \
82 _ (join_reply) \
83 _ (user_request) \
84 _ (user_ack) \
85 _ (catchup_request) \
86 _ (catchup_reply)
87
Dave Barach9b8ffd92016-07-08 08:13:45 -040088typedef enum
89{
Ed Warnickecb9cada2015-12-08 15:45:58 -070090#define _(f) MC_MSG_TYPE_##f,
91 foreach_mc_msg_type
92#undef _
93} mc_relay_msg_type_t;
94
95/* Request to join a given stream. Multicast over MC_TRANSPORT_JOIN. */
Dave Barach9b8ffd92016-07-08 08:13:45 -040096typedef CLIB_PACKED (struct
97 {
98mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
Ed Warnicke853e7202016-08-12 11:42:26 -070099 /* MC_MSG_TYPE_join_or_leave_request */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400100 /* Stream to join or leave. */
101 u32 stream_index;
102 /* join = 1, leave = 0 */
103 u8 is_join;}) mc_msg_join_or_leave_request_t;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700104
105always_inline void
106mc_byte_swap_msg_join_or_leave_request (mc_msg_join_or_leave_request_t * r)
107{
108 if (mc_need_byte_swap ())
109 {
110 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
111 r->type = clib_byte_swap_u32 (r->type);
112 r->stream_index = clib_byte_swap_u32 (r->stream_index);
113 }
114}
115
116/* Join reply. Multicast over MC_TRANSPORT_JOIN. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400117typedef CLIB_PACKED (struct
118 {
119mc_peer_id_t peer_id; mc_relay_msg_type_t type:32;
Ed Warnicke853e7202016-08-12 11:42:26 -0700120 /* MC_MSG_TYPE_join_reply */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400121 u32 stream_index;
122 /* Peer ID to contact to catchup with this stream. */
123 mc_peer_id_t catchup_peer_id;}) mc_msg_join_reply_t;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700124
125always_inline void
126mc_byte_swap_msg_join_reply (mc_msg_join_reply_t * r)
127{
128 if (mc_need_byte_swap ())
129 {
130 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
131 r->type = clib_byte_swap_u32 (r->type);
132 r->stream_index = clib_byte_swap_u32 (r->stream_index);
133 r->catchup_peer_id = mc_byte_swap_peer_id (r->catchup_peer_id);
134 }
135}
136
137/* Generic (application) request. Multicast over MC_TRANSPORT_USER_REQUEST_TO_RELAY and then
138 relayed by relay master after filling in global sequence number. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400139typedef CLIB_PACKED (struct
140 {
141 mc_peer_id_t peer_id; u32 stream_index;
142 /* Global sequence number as filled in by relay master. */
143 u32 global_sequence;
144 /* Local sequence number as filled in by peer sending message. */
145 u32 local_sequence;
146 /* Size of request data. */
147 u32 n_data_bytes;
148 /* Opaque request data. */
149 u8 data[0];}) mc_msg_user_request_t;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700150
151always_inline void
152mc_byte_swap_msg_user_request (mc_msg_user_request_t * r)
153{
154 if (mc_need_byte_swap ())
155 {
156 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
157 r->stream_index = clib_byte_swap_u32 (r->stream_index);
158 r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
159 r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
160 r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
161 }
162}
163
164/* Sent unicast over ACK channel. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400165typedef CLIB_PACKED (struct
166 {
167 mc_peer_id_t peer_id;
Ed Warnicke853e7202016-08-12 11:42:26 -0700168 u32 global_sequence; u32 stream_index;
Dave Barach9b8ffd92016-07-08 08:13:45 -0400169 u32 local_sequence;
170 i32 seq_cmp_result;}) mc_msg_user_ack_t;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700171
172always_inline void
173mc_byte_swap_msg_user_ack (mc_msg_user_ack_t * r)
174{
175 if (mc_need_byte_swap ())
176 {
177 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
178 r->stream_index = clib_byte_swap_u32 (r->stream_index);
179 r->global_sequence = clib_byte_swap_u32 (r->global_sequence);
180 r->local_sequence = clib_byte_swap_u32 (r->local_sequence);
181 r->seq_cmp_result = clib_byte_swap_i32 (r->seq_cmp_result);
182 }
183}
184
185/* Sent/received unicast over catchup channel (e.g. using TCP). */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400186typedef CLIB_PACKED (struct
187 {
188 mc_peer_id_t peer_id;
189 u32 stream_index;}) mc_msg_catchup_request_t;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700190
191always_inline void
192mc_byte_swap_msg_catchup_request (mc_msg_catchup_request_t * r)
193{
194 if (mc_need_byte_swap ())
195 {
196 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
197 r->stream_index = clib_byte_swap_u32 (r->stream_index);
198 }
199}
200
201/* Sent/received unicast over catchup channel. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400202typedef CLIB_PACKED (struct
203 {
204 mc_peer_id_t peer_id; u32 stream_index;
205 /* Last global sequence number included in catchup data. */
206 u32 last_global_sequence_included;
207 /* Size of catchup data. */
208 u32 n_data_bytes;
209 /* Catchup data. */
210 u8 data[0];}) mc_msg_catchup_reply_t;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700211
212always_inline void
213mc_byte_swap_msg_catchup_reply (mc_msg_catchup_reply_t * r)
214{
215 if (mc_need_byte_swap ())
216 {
217 r->peer_id = mc_byte_swap_peer_id (r->peer_id);
218 r->stream_index = clib_byte_swap_u32 (r->stream_index);
Dave Barach9b8ffd92016-07-08 08:13:45 -0400219 r->last_global_sequence_included =
220 clib_byte_swap_u32 (r->last_global_sequence_included);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700221 r->n_data_bytes = clib_byte_swap_u32 (r->n_data_bytes);
222 }
223}
224
Dave Barach9b8ffd92016-07-08 08:13:45 -0400225typedef struct _mc_serialize_msg
226{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700227 /* Name for this type. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400228 char *name;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700229
230 /* Functions to serialize/unserialize data. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400231 serialize_function_t *serialize;
232 serialize_function_t *unserialize;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700233
234 /* Maximum message size in bytes when serialized.
235 If zero then this will be set to the largest sent message. */
236 u32 max_n_bytes_serialized;
237
238 /* Opaque to use for first argument to serialize/unserialize function. */
239 u32 opaque;
240
241 /* Index in global message vector. */
242 u32 global_index;
243
244 /* Registration list */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400245 struct _mc_serialize_msg *next_registration;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700246} mc_serialize_msg_t;
247
Dave Barach9b8ffd92016-07-08 08:13:45 -0400248typedef struct
249{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700250 /* Index into global message vector. */
251 u32 global_index;
252} mc_serialize_stream_msg_t;
253
254#define MC_SERIALIZE_MSG(x,...) \
255 __VA_ARGS__ mc_serialize_msg_t x; \
256static void __mc_serialize_msg_registration_##x (void) \
257 __attribute__((__constructor__)) ; \
258static void __mc_serialize_msg_registration_##x (void) \
259{ \
260 vlib_main_t * vm = vlib_get_main(); \
261 x.next_registration = vm->mc_msg_registrations; \
262 vm->mc_msg_registrations = &x; \
263} \
Damjan Marion72d2c4f2018-04-05 21:32:29 +0200264static void __mc_serialize_msg_unregistration_##x (void) \
265 __attribute__((__destructor__)) ; \
266static void __mc_serialize_msg_unregistration_##x (void) \
267{ \
268 vlib_main_t * vm = vlib_get_main(); \
269 VLIB_REMOVE_FROM_LINKED_LIST (vm->mc_msg_registrations, &x, \
270 next_registration); \
271} \
Dave Barach9b8ffd92016-07-08 08:13:45 -0400272__VA_ARGS__ mc_serialize_msg_t x
Ed Warnickecb9cada2015-12-08 15:45:58 -0700273
Dave Barach9b8ffd92016-07-08 08:13:45 -0400274typedef enum
275{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700276 MC_TRANSPORT_MASTERSHIP,
277 MC_TRANSPORT_JOIN,
278 MC_TRANSPORT_USER_REQUEST_TO_RELAY,
279 MC_TRANSPORT_USER_REQUEST_FROM_RELAY,
280 MC_N_TRANSPORT_TYPE,
281} mc_transport_type_t;
282
Dave Barach9b8ffd92016-07-08 08:13:45 -0400283typedef struct
284{
285 clib_error_t *(*tx_buffer) (void *opaque, mc_transport_type_t type,
286 u32 buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700287
Dave Barach9b8ffd92016-07-08 08:13:45 -0400288 clib_error_t *(*tx_ack) (void *opaque, mc_peer_id_t peer_id,
289 u32 buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700290
291 /* Returns catchup opaque. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400292 uword (*catchup_request_fun) (void *opaque, u32 stream_index,
293 mc_peer_id_t catchup_peer_id);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700294
Dave Barach9b8ffd92016-07-08 08:13:45 -0400295 void (*catchup_send_fun) (void *opaque, uword catchup_opaque,
296 u8 * data_vector);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700297
298 /* Opaque passed to callbacks. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400299 void *opaque;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700300
301 mc_peer_id_t our_ack_peer_id;
302 mc_peer_id_t our_catchup_peer_id;
303
304 /* Max packet size (MTU) for this transport.
305 For IP this is interface MTU less IP + UDP header size. */
306 u32 max_packet_size;
307
Dave Barach9b8ffd92016-07-08 08:13:45 -0400308 format_function_t *format_peer_id;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700309} mc_transport_t;
310
Dave Barach9b8ffd92016-07-08 08:13:45 -0400311typedef struct
312{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700313 /* Count of messages received from this peer from the past/future
314 (with seq_cmp != 0). */
315 u64 n_msgs_from_past;
316 u64 n_msgs_from_future;
317} mc_stream_peer_stats_t;
318
Dave Barach9b8ffd92016-07-08 08:13:45 -0400319typedef struct
320{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700321 /* ID of this peer. */
322 mc_peer_id_t id;
323
324 /* The last sequence we received from this peer. */
325 u32 last_sequence_received;
326
327 mc_stream_peer_stats_t stats, stats_last_clear;
328} mc_stream_peer_t;
329
Dave Barach9b8ffd92016-07-08 08:13:45 -0400330typedef struct
331{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700332 u32 buffer_index;
333
334 /* Cached copy of local sequence number from buffer. */
335 u32 local_sequence;
336
337 /* Number of times this buffer has been sent (retried). */
338 u32 n_retries;
339
340 /* Previous/next retries in doubly-linked list. */
341 u32 prev_index, next_index;
342
343 /* Bitmap of all peers which have acked this msg */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400344 uword *unacked_by_peer_bitmap;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700345
346 /* Message send or resend time */
347 f64 sent_at;
348} mc_retry_t;
349
Dave Barach9b8ffd92016-07-08 08:13:45 -0400350typedef struct
351{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700352 /* Number of retries sent for this stream. */
353 u64 n_retries;
354} mc_stream_stats_t;
355
356struct mc_main_t;
357struct mc_stream_t;
358
Dave Barach9b8ffd92016-07-08 08:13:45 -0400359typedef struct
360{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700361 /* Stream name. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400362 char *name;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700363
364 /* Number of outstanding messages. */
365 u32 window_size;
366
367 /* Retry interval, in seconds */
368 f64 retry_interval;
369
370 /* Retry limit */
371 u32 retry_limit;
372
373 /* User rx buffer callback */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400374 void (*rx_buffer) (struct mc_main_t * mc_main,
375 struct mc_stream_t * stream,
376 mc_peer_id_t peer_id, u32 buffer_index);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700377
378 /* User callback to create a snapshot */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400379 u8 *(*catchup_snapshot) (struct mc_main_t * mc_main,
380 u8 * snapshot_vector,
381 u32 last_global_sequence_included);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700382
383 /* User callback to replay a snapshot */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400384 void (*catchup) (struct mc_main_t * mc_main,
385 u8 * snapshot_data, u32 n_snapshot_data_bytes);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700386
387 /* Callback to save a snapshot for offline replay */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400388 void (*save_snapshot) (struct mc_main_t * mc_main,
389 u32 is_catchup,
390 u8 * snapshot_data, u32 n_snapshot_data_bytes);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700391
392 /* Called when a peer dies */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400393 void (*peer_died) (struct mc_main_t * mc_main,
394 struct mc_stream_t * stream, mc_peer_id_t peer_id);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700395} mc_stream_config_t;
396
397#define foreach_mc_stream_state \
398 _ (invalid) \
399 _ (name_known) \
400 _ (join_in_progress) \
401 _ (catchup) \
402 _ (ready)
403
Dave Barach9b8ffd92016-07-08 08:13:45 -0400404typedef enum
405{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700406#define _(f) MC_STREAM_STATE_##f,
407 foreach_mc_stream_state
408#undef _
409} mc_stream_state_t;
410
Dave Barach9b8ffd92016-07-08 08:13:45 -0400411typedef struct mc_stream_t
412{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700413 mc_stream_config_t config;
414
415 mc_stream_state_t state;
416
417 /* Index in stream pool. */
418 u32 index;
419
420 /* Stream index 0 is always for MC internal use. */
421#define MC_STREAM_INDEX_INTERNAL 0
422
Dave Barach9b8ffd92016-07-08 08:13:45 -0400423 mc_retry_t *retry_pool;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700424
425 /* Head and tail index of retry pool. */
426 u32 retry_head_index, retry_tail_index;
427
Dave Barach9b8ffd92016-07-08 08:13:45 -0400428 /*
Ed Warnickecb9cada2015-12-08 15:45:58 -0700429 * Country club for recently retired messages
430 * If the set of peers is expanding and a new peer
431 * misses a message, we can easily retire the FIFO
432 * element before we even know about the new peer
433 */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400434 mc_retry_t *retired_fifo;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700435
436 /* Hash mapping local sequence to retry pool index. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400437 uword *retry_index_by_local_sequence;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700438
439 /* catch-up fifo of VLIB buffer indices.
440 start recording when catching up. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400441 u32 *catchup_fifo;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700442
443 mc_stream_stats_t stats, stats_last_clear;
444
445 /* Peer pool. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400446 mc_stream_peer_t *peers;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700447
448 /* Bitmap with ones for all peers in peer pool. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400449 uword *all_peer_bitmap;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700450
451 /* Map of 64 bit id to index in stream pool. */
452 mhash_t peer_index_by_id;
453
454 /* Timeout, in case we're alone in the world */
455 f64 join_timeout;
456
Dave Barach9b8ffd92016-07-08 08:13:45 -0400457 vlib_one_time_waiting_process_t *procs_waiting_for_join_done;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700458
Dave Barach9b8ffd92016-07-08 08:13:45 -0400459 vlib_one_time_waiting_process_t *procs_waiting_for_open_window;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700460
461 /* Next sequence number to use */
462 u32 our_local_sequence;
463
Dave Barach9b8ffd92016-07-08 08:13:45 -0400464 /*
Ed Warnickecb9cada2015-12-08 15:45:58 -0700465 * Last global sequence we processed.
466 * When supplying catchup data, we need to tell
467 * the client precisely where to start replaying
468 */
469 u32 last_global_sequence_processed;
470
471 /* Vector of unique messages we've sent on this stream. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400472 mc_serialize_stream_msg_t *stream_msgs;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700473
474 /* Vector global message index into per stream message index. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400475 u32 *stream_msg_index_by_global_index;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700476
477 /* Hashed by message name. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400478 uword *stream_msg_index_by_name;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700479
480 u64 user_requests_sent;
481 u64 user_requests_received;
482} mc_stream_t;
483
484always_inline void
485mc_stream_free (mc_stream_t * s)
486{
487 pool_free (s->retry_pool);
488 hash_free (s->retry_index_by_local_sequence);
489 clib_fifo_free (s->catchup_fifo);
490 pool_free (s->peers);
491 mhash_free (&s->peer_index_by_id);
492 vec_free (s->procs_waiting_for_join_done);
493 vec_free (s->procs_waiting_for_open_window);
494}
495
496always_inline void
497mc_stream_init (mc_stream_t * s)
498{
499 memset (s, 0, sizeof (s[0]));
500 s->retry_head_index = s->retry_tail_index = ~0;
501}
502
Dave Barach9b8ffd92016-07-08 08:13:45 -0400503typedef struct
504{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700505 u32 stream_index;
506 u32 catchup_opaque;
507 u8 *catchup_snapshot;
508} mc_catchup_process_arg_t;
509
Dave Barach9b8ffd92016-07-08 08:13:45 -0400510typedef enum
511{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700512 MC_RELAY_STATE_NEGOTIATE,
513 MC_RELAY_STATE_MASTER,
514 MC_RELAY_STATE_SLAVE,
515} mc_relay_state_t;
516
Dave Barach9b8ffd92016-07-08 08:13:45 -0400517typedef struct
518{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700519 mc_peer_id_t peer_id;
520
521 f64 time_last_master_assert_received;
522} mc_mastership_peer_t;
523
Dave Barach9b8ffd92016-07-08 08:13:45 -0400524typedef struct
525{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700526 u32 stream_index;
527 u32 buffer_index;
528} mc_stream_and_buffer_t;
529
Dave Barach9b8ffd92016-07-08 08:13:45 -0400530typedef struct mc_main_t
531{
Ed Warnickecb9cada2015-12-08 15:45:58 -0700532 mc_relay_state_t relay_state;
533
534 /* Mastership */
535 u32 we_can_be_relay_master;
536
537 u64 relay_master_peer_id;
538
Dave Barach9b8ffd92016-07-08 08:13:45 -0400539 mc_mastership_peer_t *mastership_peers;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700540
541 /* Map of 64 bit id to index in stream pool. */
542 mhash_t mastership_peer_index_by_id;
543
544 /* The transport we're using. */
545 mc_transport_t transport;
Dave Barach9b8ffd92016-07-08 08:13:45 -0400546
Ed Warnickecb9cada2015-12-08 15:45:58 -0700547 /* Last-used global sequence number. */
548 u32 relay_global_sequence;
549
550 /* Vector of streams. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400551 mc_stream_t *stream_vector;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700552
553 /* Hash table mapping stream name to pool index. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400554 uword *stream_index_by_name;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700555
Dave Barach9b8ffd92016-07-08 08:13:45 -0400556 uword *procs_waiting_for_stream_name_by_name;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700557
Dave Barach9b8ffd92016-07-08 08:13:45 -0400558 vlib_one_time_waiting_process_t **procs_waiting_for_stream_name_pool;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700559
560 int joins_in_progress;
561
Dave Barach9b8ffd92016-07-08 08:13:45 -0400562 mc_catchup_process_arg_t *catchup_process_args;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700563
564 /* Node indices for mastership, join ager,
565 retry and catchup processes. */
566 u32 mastership_process;
567 u32 join_ager_process;
568 u32 retry_process;
569 u32 catchup_process;
570 u32 unserialize_process;
571
572 /* Global vector of messages. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400573 mc_serialize_msg_t **global_msgs;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700574
575 /* Hash table mapping message name to index. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400576 uword *global_msg_index_by_name;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700577
578 /* Shared serialize/unserialize main. */
579 serialize_main_t serialize_mains[VLIB_N_RX_TX];
580
581 vlib_serialize_buffer_main_t serialize_buffer_mains[VLIB_N_RX_TX];
582
583 /* Convenience variables */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400584 struct vlib_main_t *vlib_main;
585 elog_main_t *elog_main;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700586
587 /* Maps 64 bit peer id to elog string table offset for this formatted peer id. */
588 mhash_t elog_id_by_peer_id;
589
590 uword *elog_id_by_msg_name;
591
592 /* For mc_unserialize. */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400593 mc_stream_and_buffer_t *mc_unserialize_stream_and_buffers;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700594} mc_main_t;
595
596always_inline mc_stream_t *
Dave Barach9b8ffd92016-07-08 08:13:45 -0400597mc_stream_by_name (mc_main_t * m, char *name)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700598{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400599 uword *p = hash_get (m->stream_index_by_name, name);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700600 return p ? vec_elt_at_index (m->stream_vector, p[0]) : 0;
601}
602
603always_inline mc_stream_t *
604mc_stream_by_index (mc_main_t * m, u32 i)
605{
606 return i < vec_len (m->stream_vector) ? m->stream_vector + i : 0;
607}
608
609always_inline void
610mc_clear_stream_stats (mc_main_t * m)
611{
Dave Barach9b8ffd92016-07-08 08:13:45 -0400612 mc_stream_t *s;
613 mc_stream_peer_t *p;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700614 vec_foreach (s, m->stream_vector)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400615 {
616 s->stats_last_clear = s->stats;
617 /* *INDENT-OFF* */
Ed Warnickecb9cada2015-12-08 15:45:58 -0700618 pool_foreach (p, s->peers, ({
619 p->stats_last_clear = p->stats;
620 }));
Dave Barach9b8ffd92016-07-08 08:13:45 -0400621 /* *INDENT-ON* */
622 }
Ed Warnickecb9cada2015-12-08 15:45:58 -0700623}
624
625/* Declare all message handlers. */
626#define _(f) void mc_msg_##f##_handler (mc_main_t * mcm, mc_msg_##f##_t * msg, u32 buffer_index);
627foreach_mc_msg_type
628#undef _
Dave Barach9b8ffd92016-07-08 08:13:45 -0400629 u32 mc_stream_join (mc_main_t * mcm, mc_stream_config_t *);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700630
631void mc_stream_leave (mc_main_t * mcm, u32 stream_index);
632
Dave Barach9b8ffd92016-07-08 08:13:45 -0400633void mc_wait_for_stream_ready (mc_main_t * m, char *stream_name);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700634
635u32 mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index);
636
Dave Barach9b8ffd92016-07-08 08:13:45 -0400637void mc_main_init (mc_main_t * mcm, char *tag);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700638
639void mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master);
640
Dave Barach9b8ffd92016-07-08 08:13:45 -0400641void *mc_get_vlib_buffer (struct vlib_main_t *vm, u32 n_bytes,
642 u32 * bi_return);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700643
644format_function_t format_mc_main;
645
Dave Barach9b8ffd92016-07-08 08:13:45 -0400646clib_error_t *mc_serialize_internal (mc_main_t * mc,
647 u32 stream_index,
648 u32 multiple_messages_per_vlib_buffer,
649 mc_serialize_msg_t * msg, ...);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700650
Dave Barach9b8ffd92016-07-08 08:13:45 -0400651clib_error_t *mc_serialize_va (mc_main_t * mc,
652 u32 stream_index,
653 u32 multiple_messages_per_vlib_buffer,
654 mc_serialize_msg_t * msg, va_list * va);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700655
656#define mc_serialize_stream(mc,si,msg,args...) \
657 mc_serialize_internal((mc),(si),(0),(msg),(msg)->serialize,args)
658
659#define mc_serialize(mc,msg,args...) \
660 mc_serialize_internal((mc),(~0),(0),(msg),(msg)->serialize,args)
661
662#define mc_serialize2(mc,add,msg,args...) \
663 mc_serialize_internal((mc),(~0),(add),(msg),(msg)->serialize,args)
664
665void mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index);
666uword mc_unserialize_message (mc_main_t * mcm, mc_stream_t * s,
Dave Barach9b8ffd92016-07-08 08:13:45 -0400667 serialize_main_t * m);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700668
669serialize_function_t serialize_mc_main, unserialize_mc_main;
670
671always_inline uword
672mc_max_message_size_in_bytes (mc_main_t * mcm)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400673{
674 return mcm->transport.max_packet_size - sizeof (mc_msg_user_request_t);
675}
Ed Warnickecb9cada2015-12-08 15:45:58 -0700676
677always_inline word
678mc_serialize_n_bytes_left (mc_main_t * mcm, serialize_main_t * m)
Dave Barach9b8ffd92016-07-08 08:13:45 -0400679{
680 return mc_max_message_size_in_bytes (mcm) -
681 serialize_vlib_buffer_n_bytes (m);
682}
Ed Warnickecb9cada2015-12-08 15:45:58 -0700683
684void unserialize_mc_stream (serialize_main_t * m, va_list * va);
685void mc_stream_join_process_hold (void);
686
687#endif /* included_vlib_mc_h */
Dave Barach9b8ffd92016-07-08 08:13:45 -0400688
689/*
690 * fd.io coding-style-patch-verification: ON
691 *
692 * Local Variables:
693 * eval: (c-set-style "gnu")
694 * End:
695 */