| /* |
| * mc.c: vlib reliable sequenced multicast distributed applications |
| * |
| * Copyright (c) 2010 Cisco and/or its affiliates. |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at: |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <vlib/vlib.h> |
| |
| /* |
| * 1 to enable msg id training wheels, which are useful for tracking |
| * down catchup and/or partitioned network problems |
| */ |
| #define MSG_ID_DEBUG 0 |
| |
| static format_function_t format_mc_stream_state; |
| |
| static u32 |
| elog_id_for_peer_id (mc_main_t * m, u64 peer_id) |
| { |
| uword *p, r; |
| mhash_t *h = &m->elog_id_by_peer_id; |
| |
| if (!m->elog_id_by_peer_id.hash) |
| mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t)); |
| |
| p = mhash_get (h, &peer_id); |
| if (p) |
| return p[0]; |
| r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id); |
| mhash_set (h, &peer_id, r, /* old_value */ 0); |
| return r; |
| } |
| |
| static u32 |
| elog_id_for_msg_name (mc_main_t * m, char *msg_name) |
| { |
| uword *p, r; |
| uword *h = m->elog_id_by_msg_name; |
| u8 *name_copy; |
| |
| if (!h) |
| h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword)); |
| |
| p = hash_get_mem (h, msg_name); |
| if (p) |
| return p[0]; |
| r = elog_string (m->elog_main, "%s", msg_name); |
| |
| name_copy = format (0, "%s%c", msg_name, 0); |
| |
| hash_set_mem (h, name_copy, r); |
| m->elog_id_by_msg_name = h; |
| |
| return r; |
| } |
| |
| static void |
| elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence, |
| u32 retry_count) |
| { |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "tx-msg: stream %d local seq %d attempt %d", |
| .format_args = "i4i4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 stream_id, local_sequence, retry_count; |
| } *ed; |
| ed = ELOG_DATA (m->elog_main, e); |
| ed->stream_id = stream_id; |
| ed->local_sequence = local_sequence; |
| ed->retry_count = retry_count; |
| } |
| } |
| |
| /* |
| * seq_cmp |
| * correctly compare two unsigned sequence numbers. |
| * This function works so long as x and y are within 2**(n-1) of each |
| * other, where n = bits(x, y). |
| * |
| * Magic decoder ring: |
| * seq_cmp == 0 => x and y are equal |
| * seq_cmp < 0 => x is "in the past" with respect to y |
| * seq_cmp > 0 => x is "in the future" with respect to y |
| */ |
| always_inline i32 |
| mc_seq_cmp (u32 x, u32 y) |
| { |
| return (i32) x - (i32) y; |
| } |
| |
| void * |
| mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return) |
| { |
| u32 n_alloc, bi; |
| vlib_buffer_t *b; |
| |
| n_alloc = vlib_buffer_alloc (vm, &bi, 1); |
| ASSERT (n_alloc == 1); |
| |
| b = vlib_get_buffer (vm, bi); |
| b->current_length = n_bytes; |
| *bi_return = bi; |
| return (void *) b->data; |
| } |
| |
| static void |
| delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s, |
| uword index, int notify_application) |
| { |
| mc_stream_peer_t *p = pool_elt_at_index (s->peers, index); |
| ASSERT (p != 0); |
| if (s->config.peer_died && notify_application) |
| s->config.peer_died (mcm, s, p->id); |
| |
| s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers); |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "delete peer %s from all_peer_bitmap", |
| .format_args = "T4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 peer; |
| } *ed = 0; |
| |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); |
| } |
| /* Do not delete the pool / hash table entries, or we lose sequence number state */ |
| } |
| |
| static mc_stream_peer_t * |
| get_or_create_peer_with_id (mc_main_t * mcm, |
| mc_stream_t * s, mc_peer_id_t id, int *created) |
| { |
| uword *q = mhash_get (&s->peer_index_by_id, &id); |
| mc_stream_peer_t *p; |
| |
| if (q) |
| { |
| p = pool_elt_at_index (s->peers, q[0]); |
| goto done; |
| } |
| |
| pool_get (s->peers, p); |
| memset (p, 0, sizeof (p[0])); |
| p->id = id; |
| p->last_sequence_received = ~0; |
| mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0); |
| if (created) |
| *created = 1; |
| |
| done: |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "get_or_create %s peer %s stream %d seq %d", |
| .format_args = "t4T4i4i4", |
| .n_enum_strings = 2, |
| .enum_strings = { |
| "old", "new", |
| }, |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 is_new, peer, stream_index, rx_sequence; |
| } *ed = 0; |
| |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->is_new = q ? 0 : 1; |
| ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); |
| ed->stream_index = s->index; |
| ed->rx_sequence = p->last_sequence_received; |
| } |
| /* $$$$ Enable or reenable this peer */ |
| s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers); |
| return p; |
| } |
| |
| static void |
| maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream) |
| { |
| vlib_one_time_waiting_process_t *p; |
| |
| if (pool_elts (stream->retry_pool) >= stream->config.window_size) |
| return; |
| |
| vec_foreach (p, stream->procs_waiting_for_open_window) |
| vlib_signal_one_time_waiting_process (vm, p); |
| |
| if (stream->procs_waiting_for_open_window) |
| _vec_len (stream->procs_waiting_for_open_window) = 0; |
| } |
| |
| static void |
| mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r) |
| { |
| mc_retry_t record, *retp; |
| |
| if (r->unacked_by_peer_bitmap) |
| _vec_len (r->unacked_by_peer_bitmap) = 0; |
| |
| if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size) |
| { |
| clib_fifo_sub1 (s->retired_fifo, record); |
| vlib_buffer_free_one (mcm->vlib_main, record.buffer_index); |
| } |
| |
| clib_fifo_add2 (s->retired_fifo, retp); |
| |
| retp->buffer_index = r->buffer_index; |
| retp->local_sequence = r->local_sequence; |
| |
| r->buffer_index = ~0; /* poison buffer index in this retry */ |
| } |
| |
| static void |
| mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence) |
| { |
| mc_retry_t *retry; |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "resend-retired: search for local seq %d", |
| .format_args = "i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 local_sequence; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->local_sequence = local_sequence; |
| } |
| |
| /* *INDENT-OFF* */ |
| clib_fifo_foreach (retry, s->retired_fifo, |
| ({ |
| if (retry->local_sequence == local_sequence) |
| { |
| elog_tx_msg (mcm, s->index, retry-> local_sequence, -13); |
| mcm->transport.tx_buffer (mcm->transport.opaque, |
| MC_TRANSPORT_USER_REQUEST_TO_RELAY, |
| retry->buffer_index); |
| return; |
| } |
| })); |
| /* *INDENT-ON* */ |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "resend-retired: FAILED search for local seq %d", |
| .format_args = "i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 local_sequence; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->local_sequence = local_sequence; |
| } |
| } |
| |
| static uword * |
| delete_retry_fifo_elt (mc_main_t * mcm, |
| mc_stream_t * stream, |
| mc_retry_t * r, uword * dead_peer_bitmap) |
| { |
| mc_stream_peer_t *p; |
| |
| /* *INDENT-OFF* */ |
| pool_foreach (p, stream->peers, ({ |
| uword pi = p - stream->peers; |
| uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi); |
| |
| if (! is_alive) |
| dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi); |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| ELOG_TYPE_DECLARE (e) = { |
| .format = "delete_retry_fifo_elt: peer %s is %s", |
| .format_args = "T4t4", |
| .n_enum_strings = 2, |
| .enum_strings = { "alive", "dead", }, |
| }; |
| struct { u32 peer, is_alive; } * ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); |
| ed->is_alive = is_alive; |
| } |
| })); |
| /* *INDENT-ON* */ |
| |
| hash_unset (stream->retry_index_by_local_sequence, r->local_sequence); |
| mc_retry_free (mcm, stream, r); |
| |
| return dead_peer_bitmap; |
| } |
| |
| always_inline mc_retry_t * |
| prev_retry (mc_stream_t * s, mc_retry_t * r) |
| { |
| return (r->prev_index != ~0 |
| ? pool_elt_at_index (s->retry_pool, r->prev_index) : 0); |
| } |
| |
| always_inline mc_retry_t * |
| next_retry (mc_stream_t * s, mc_retry_t * r) |
| { |
| return (r->next_index != ~0 |
| ? pool_elt_at_index (s->retry_pool, r->next_index) : 0); |
| } |
| |
| always_inline void |
| remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r) |
| { |
| mc_retry_t *p = prev_retry (s, r); |
| mc_retry_t *n = next_retry (s, r); |
| |
| if (p) |
| p->next_index = r->next_index; |
| else |
| s->retry_head_index = r->next_index; |
| if (n) |
| n->prev_index = r->prev_index; |
| else |
| s->retry_tail_index = r->prev_index; |
| |
| pool_put_index (s->retry_pool, r - s->retry_pool); |
| } |
| |
| static void |
| check_retry (mc_main_t * mcm, mc_stream_t * s) |
| { |
| mc_retry_t *r; |
| vlib_main_t *vm = mcm->vlib_main; |
| f64 now = vlib_time_now (vm); |
| uword *dead_peer_bitmap = 0; |
| u32 ri, ri_next; |
| |
| for (ri = s->retry_head_index; ri != ~0; ri = ri_next) |
| { |
| r = pool_elt_at_index (s->retry_pool, ri); |
| ri_next = r->next_index; |
| |
| if (now < r->sent_at + s->config.retry_interval) |
| continue; |
| |
| r->n_retries += 1; |
| if (r->n_retries > s->config.retry_limit) |
| { |
| dead_peer_bitmap = |
| delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap); |
| remove_retry_from_pool (s, r); |
| } |
| else |
| { |
| if (MC_EVENT_LOGGING > 0) |
| { |
| mc_stream_peer_t *p; |
| |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "resend local seq %d attempt %d", |
| .format_args = "i4i4", |
| }; |
| /* *INDENT-ON* */ |
| |
| /* *INDENT-OFF* */ |
| pool_foreach (p, s->peers, ({ |
| if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers)) |
| { |
| ELOG_TYPE_DECLARE (ev) = { |
| .format = "resend: needed by peer %s local seq %d", |
| .format_args = "T4i4", |
| }; |
| struct { u32 peer, rx_sequence; } * ed; |
| ed = ELOG_DATA (mcm->elog_main, ev); |
| ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64); |
| ed->rx_sequence = r->local_sequence; |
| } |
| })); |
| /* *INDENT-ON* */ |
| |
| struct |
| { |
| u32 sequence; |
| u32 trail; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->sequence = r->local_sequence; |
| ed->trail = r->n_retries; |
| } |
| |
| r->sent_at = vlib_time_now (vm); |
| s->stats.n_retries += 1; |
| |
| elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries); |
| |
| mcm->transport.tx_buffer |
| (mcm->transport.opaque, |
| MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index); |
| } |
| } |
| |
| maybe_send_window_open_event (mcm->vlib_main, s); |
| |
| /* Delete any dead peers we've found. */ |
| if (!clib_bitmap_is_zero (dead_peer_bitmap)) |
| { |
| uword i; |
| |
| /* *INDENT-OFF* */ |
| clib_bitmap_foreach (i, dead_peer_bitmap, ({ |
| delete_peer_with_index (mcm, s, i, /* notify_application */ 1); |
| |
| /* Delete any references to just deleted peer in retry pool. */ |
| pool_foreach (r, s->retry_pool, ({ |
| r->unacked_by_peer_bitmap = |
| clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i); |
| })); |
| })); |
| /* *INDENT-ON* */ |
| clib_bitmap_free (dead_peer_bitmap); |
| } |
| } |
| |
| always_inline mc_main_t * |
| mc_node_get_main (vlib_node_runtime_t * node) |
| { |
| mc_main_t **p = (void *) node->runtime_data; |
| return p[0]; |
| } |
| |
| static uword |
| mc_retry_process (vlib_main_t * vm, |
| vlib_node_runtime_t * node, vlib_frame_t * f) |
| { |
| mc_main_t *mcm = mc_node_get_main (node); |
| mc_stream_t *s; |
| |
| while (1) |
| { |
| vlib_process_suspend (vm, 1.0); |
| vec_foreach (s, mcm->stream_vector) |
| { |
| if (s->state != MC_STREAM_STATE_invalid) |
| check_retry (mcm, s); |
| } |
| } |
| return 0; /* not likely */ |
| } |
| |
| static void |
| send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join) |
| { |
| vlib_main_t *vm = mcm->vlib_main; |
| mc_msg_join_or_leave_request_t *mp; |
| u32 bi; |
| |
| mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi); |
| memset (mp, 0, sizeof (*mp)); |
| mp->type = MC_MSG_TYPE_join_or_leave_request; |
| mp->peer_id = mcm->transport.our_ack_peer_id; |
| mp->stream_index = stream_index; |
| mp->is_join = is_join; |
| |
| mc_byte_swap_msg_join_or_leave_request (mp); |
| |
| /* |
| * These msgs are unnumbered, unordered so send on the from-relay |
| * channel. |
| */ |
| mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); |
| } |
| |
| static uword |
| mc_join_ager_process (vlib_main_t * vm, |
| vlib_node_runtime_t * node, vlib_frame_t * f) |
| { |
| mc_main_t *mcm = mc_node_get_main (node); |
| |
| while (1) |
| { |
| if (mcm->joins_in_progress) |
| { |
| mc_stream_t *s; |
| vlib_one_time_waiting_process_t *p; |
| f64 now = vlib_time_now (vm); |
| |
| vec_foreach (s, mcm->stream_vector) |
| { |
| if (s->state != MC_STREAM_STATE_join_in_progress) |
| continue; |
| |
| if (now > s->join_timeout) |
| { |
| s->state = MC_STREAM_STATE_ready; |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "stream %d join timeout", |
| }; |
| /* *INDENT-ON* */ |
| ELOG (mcm->elog_main, e, s->index); |
| } |
| /* Make sure that this app instance exists as a stream peer, |
| or we may answer a catchup request with a NULL |
| all_peer_bitmap... */ |
| (void) get_or_create_peer_with_id |
| (mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0); |
| |
| vec_foreach (p, s->procs_waiting_for_join_done) |
| vlib_signal_one_time_waiting_process (vm, p); |
| if (s->procs_waiting_for_join_done) |
| _vec_len (s->procs_waiting_for_join_done) = 0; |
| |
| mcm->joins_in_progress--; |
| ASSERT (mcm->joins_in_progress >= 0); |
| } |
| else |
| { |
| /* Resent join request which may have been lost. */ |
| send_join_or_leave_request (mcm, s->index, 1 /* is_join */ ); |
| |
| /* We're *not* alone, retry for as long as it takes */ |
| if (mcm->relay_state == MC_RELAY_STATE_SLAVE) |
| s->join_timeout = vlib_time_now (vm) + 2.0; |
| |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "stream %d resend join request", |
| }; |
| /* *INDENT-ON* */ |
| ELOG (mcm->elog_main, e, s->index); |
| } |
| } |
| } |
| } |
| |
| vlib_process_suspend (vm, .5); |
| } |
| |
| return 0; /* not likely */ |
| } |
| |
| static void |
| serialize_mc_register_stream_name (serialize_main_t * m, va_list * va) |
| { |
| char *name = va_arg (*va, char *); |
| serialize_cstring (m, name); |
| } |
| |
| static void |
| elog_stream_name (char *buf, int n_buf_bytes, char *v) |
| { |
| clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v))); |
| buf[n_buf_bytes - 1] = 0; |
| } |
| |
| static void |
| unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va) |
| { |
| mc_main_t *mcm = va_arg (*va, mc_main_t *); |
| char *name; |
| mc_stream_t *s; |
| uword *p; |
| |
| unserialize_cstring (m, &name); |
| |
| if ((p = hash_get_mem (mcm->stream_index_by_name, name))) |
| { |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "stream index %d already named %s", |
| .format_args = "i4s16", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 stream_index; |
| char name[16]; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->stream_index = p[0]; |
| elog_stream_name (ed->name, sizeof (ed->name), name); |
| } |
| |
| vec_free (name); |
| return; |
| } |
| |
| vec_add2 (mcm->stream_vector, s, 1); |
| mc_stream_init (s); |
| s->state = MC_STREAM_STATE_name_known; |
| s->index = s - mcm->stream_vector; |
| s->config.name = name; |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "stream index %d named %s", |
| .format_args = "i4s16", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 stream_index; |
| char name[16]; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->stream_index = s->index; |
| elog_stream_name (ed->name, sizeof (ed->name), name); |
| } |
| |
| hash_set_mem (mcm->stream_index_by_name, name, s->index); |
| |
| p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name); |
| if (p) |
| { |
| vlib_one_time_waiting_process_t *wp, **w; |
| w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]); |
| vec_foreach (wp, w[0]) |
| vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); |
| pool_put (mcm->procs_waiting_for_stream_name_pool, w); |
| hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name); |
| } |
| } |
| |
| /* *INDENT-OFF* */ |
| MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) = |
| { |
| .name = "mc_register_stream_name", |
| .serialize = serialize_mc_register_stream_name, |
| .unserialize = unserialize_mc_register_stream_name, |
| }; |
| /* *INDENT-ON* */ |
| |
| void |
| mc_rx_buffer_unserialize (mc_main_t * mcm, |
| mc_stream_t * stream, |
| mc_peer_id_t peer_id, u32 buffer_index) |
| { |
| return mc_unserialize (mcm, stream, buffer_index); |
| } |
| |
| static u8 * |
| mc_internal_catchup_snapshot (mc_main_t * mcm, |
| u8 * data_vector, |
| u32 last_global_sequence_processed) |
| { |
| serialize_main_t m; |
| |
| /* Append serialized data to data vector. */ |
| serialize_open_vector (&m, data_vector); |
| m.stream.current_buffer_index = vec_len (data_vector); |
| |
| serialize (&m, serialize_mc_main, mcm); |
| return serialize_close_vector (&m); |
| } |
| |
| static void |
| mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes) |
| { |
| serialize_main_t s; |
| |
| unserialize_open_data (&s, data, n_data_bytes); |
| |
| unserialize (&s, unserialize_mc_main, mcm); |
| } |
| |
| /* Overridden from the application layer, not actually used here */ |
| void mc_stream_join_process_hold (void) __attribute__ ((weak)); |
| void |
| mc_stream_join_process_hold (void) |
| { |
| } |
| |
| static u32 |
| mc_stream_join_helper (mc_main_t * mcm, |
| mc_stream_config_t * config, u32 is_internal) |
| { |
| mc_stream_t *s; |
| vlib_main_t *vm = mcm->vlib_main; |
| |
| s = 0; |
| if (!is_internal) |
| { |
| uword *p; |
| |
| /* Already have a stream with given name? */ |
| if ((s = mc_stream_by_name (mcm, config->name))) |
| { |
| /* Already joined and ready? */ |
| if (s->state == MC_STREAM_STATE_ready) |
| return s->index; |
| } |
| |
| /* First join MC internal stream. */ |
| if (!mcm->stream_vector |
| || (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state |
| == MC_STREAM_STATE_invalid)) |
| { |
| static mc_stream_config_t c = { |
| .name = "mc-internal", |
| .rx_buffer = mc_rx_buffer_unserialize, |
| .catchup = mc_internal_catchup, |
| .catchup_snapshot = mc_internal_catchup_snapshot, |
| }; |
| |
| c.save_snapshot = config->save_snapshot; |
| |
| mc_stream_join_helper (mcm, &c, /* is_internal */ 1); |
| } |
| |
| /* If stream is still unknown register this name and wait for |
| sequenced message to name stream. This way all peers agree |
| on stream name to index mappings. */ |
| s = mc_stream_by_name (mcm, config->name); |
| if (!s) |
| { |
| vlib_one_time_waiting_process_t *wp, **w; |
| u8 *name_copy = format (0, "%s", config->name); |
| |
| mc_serialize_stream (mcm, |
| MC_STREAM_INDEX_INTERNAL, |
| &mc_register_stream_name_msg, config->name); |
| |
| /* Wait for this stream to be named. */ |
| p = |
| hash_get_mem (mcm->procs_waiting_for_stream_name_by_name, |
| name_copy); |
| if (p) |
| w = |
| pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, |
| p[0]); |
| else |
| { |
| pool_get (mcm->procs_waiting_for_stream_name_pool, w); |
| if (!mcm->procs_waiting_for_stream_name_by_name) |
| mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */ |
| sizeof |
| (uword)); |
| hash_set_mem (mcm->procs_waiting_for_stream_name_by_name, |
| name_copy, |
| w - mcm->procs_waiting_for_stream_name_pool); |
| w[0] = 0; |
| } |
| |
| vec_add2 (w[0], wp, 1); |
| vlib_current_process_wait_for_one_time_event (vm, wp); |
| vec_free (name_copy); |
| } |
| |
| /* Name should be known now. */ |
| s = mc_stream_by_name (mcm, config->name); |
| ASSERT (s != 0); |
| ASSERT (s->state == MC_STREAM_STATE_name_known); |
| } |
| |
| if (!s) |
| { |
| vec_add2 (mcm->stream_vector, s, 1); |
| mc_stream_init (s); |
| s->index = s - mcm->stream_vector; |
| } |
| |
| { |
| /* Save name since we could have already used it as hash key. */ |
| char *name_save = s->config.name; |
| |
| s->config = config[0]; |
| |
| if (name_save) |
| s->config.name = name_save; |
| } |
| |
| if (s->config.window_size == 0) |
| s->config.window_size = 8; |
| |
| if (s->config.retry_interval == 0.0) |
| s->config.retry_interval = 1.0; |
| |
| /* Sanity. */ |
| ASSERT (s->config.retry_interval < 30); |
| |
| if (s->config.retry_limit == 0) |
| s->config.retry_limit = 7; |
| |
| s->state = MC_STREAM_STATE_join_in_progress; |
| if (!s->peer_index_by_id.hash) |
| mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); |
| |
| /* If we don't hear from someone in 5 seconds, we're alone */ |
| s->join_timeout = vlib_time_now (vm) + 5.0; |
| mcm->joins_in_progress++; |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "stream index %d join request %s", |
| .format_args = "i4s16", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 stream_index; |
| char name[16]; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->stream_index = s->index; |
| elog_stream_name (ed->name, sizeof (ed->name), s->config.name); |
| } |
| |
| send_join_or_leave_request (mcm, s->index, 1 /* join */ ); |
| |
| vlib_current_process_wait_for_one_time_event_vector |
| (vm, &s->procs_waiting_for_join_done); |
| |
| if (MC_EVENT_LOGGING) |
| { |
| ELOG_TYPE (e, "join complete stream %d"); |
| ELOG (mcm->elog_main, e, s->index); |
| } |
| |
| return s->index; |
| } |
| |
| u32 |
| mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config) |
| { |
| return mc_stream_join_helper (mcm, config, /* is_internal */ 0); |
| } |
| |
| void |
| mc_stream_leave (mc_main_t * mcm, u32 stream_index) |
| { |
| mc_stream_t *s = mc_stream_by_index (mcm, stream_index); |
| |
| if (!s) |
| return; |
| |
| if (MC_EVENT_LOGGING) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "leave-stream: %d",.format_args = "i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 index; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->index = stream_index; |
| } |
| |
| send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ ); |
| mc_stream_free (s); |
| s->state = MC_STREAM_STATE_name_known; |
| } |
| |
| void |
| mc_msg_join_or_leave_request_handler (mc_main_t * mcm, |
| mc_msg_join_or_leave_request_t * req, |
| u32 buffer_index) |
| { |
| mc_stream_t *s; |
| mc_msg_join_reply_t *rep; |
| u32 bi; |
| |
| mc_byte_swap_msg_join_or_leave_request (req); |
| |
| s = mc_stream_by_index (mcm, req->stream_index); |
| if (!s || s->state != MC_STREAM_STATE_ready) |
| return; |
| |
| /* If the peer is joining, create it */ |
| if (req->is_join) |
| { |
| mc_stream_t *this_s; |
| |
| /* We're not in a position to catch up a peer until all |
| stream joins are complete. */ |
| if (0) |
| { |
| /* XXX This is hard to test so we've. */ |
| vec_foreach (this_s, mcm->stream_vector) |
| { |
| if (this_s->state != MC_STREAM_STATE_ready |
| && this_s->state != MC_STREAM_STATE_name_known) |
| return; |
| } |
| } |
| else if (mcm->joins_in_progress > 0) |
| return; |
| |
| (void) get_or_create_peer_with_id (mcm, s, req->peer_id, |
| /* created */ 0); |
| |
| rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi); |
| memset (rep, 0, sizeof (rep[0])); |
| rep->type = MC_MSG_TYPE_join_reply; |
| rep->stream_index = req->stream_index; |
| |
| mc_byte_swap_msg_join_reply (rep); |
| /* These two are already in network byte order... */ |
| rep->peer_id = mcm->transport.our_ack_peer_id; |
| rep->catchup_peer_id = mcm->transport.our_catchup_peer_id; |
| |
| mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi); |
| } |
| else |
| { |
| if (s->config.peer_died) |
| s->config.peer_died (mcm, s, req->peer_id); |
| } |
| } |
| |
| void |
| mc_msg_join_reply_handler (mc_main_t * mcm, |
| mc_msg_join_reply_t * mp, u32 buffer_index) |
| { |
| mc_stream_t *s; |
| |
| mc_byte_swap_msg_join_reply (mp); |
| |
| s = mc_stream_by_index (mcm, mp->stream_index); |
| |
| if (!s || s->state != MC_STREAM_STATE_join_in_progress) |
| return; |
| |
| /* Switch to catchup state; next join reply |
| for this stream will be ignored. */ |
| s->state = MC_STREAM_STATE_catchup; |
| |
| mcm->joins_in_progress--; |
| mcm->transport.catchup_request_fun (mcm->transport.opaque, |
| mp->stream_index, mp->catchup_peer_id); |
| } |
| |
| void |
| mc_wait_for_stream_ready (mc_main_t * m, char *stream_name) |
| { |
| mc_stream_t *s; |
| |
| while (1) |
| { |
| s = mc_stream_by_name (m, stream_name); |
| if (s) |
| break; |
| vlib_process_suspend (m->vlib_main, .1); |
| } |
| |
| /* It's OK to send a message in catchup and ready states. */ |
| if (s->state == MC_STREAM_STATE_catchup |
| || s->state == MC_STREAM_STATE_ready) |
| return; |
| |
| /* Otherwise we are waiting for a join to finish. */ |
| vlib_current_process_wait_for_one_time_event_vector |
| (m->vlib_main, &s->procs_waiting_for_join_done); |
| } |
| |
| u32 |
| mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index) |
| { |
| mc_stream_t *s = mc_stream_by_index (mcm, stream_index); |
| vlib_main_t *vm = mcm->vlib_main; |
| mc_retry_t *r; |
| mc_msg_user_request_t *mp; |
| vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); |
| u32 ri; |
| |
| if (!s) |
| return 0; |
| |
| if (s->state != MC_STREAM_STATE_ready) |
| vlib_current_process_wait_for_one_time_event_vector |
| (vm, &s->procs_waiting_for_join_done); |
| |
| while (pool_elts (s->retry_pool) >= s->config.window_size) |
| { |
| vlib_current_process_wait_for_one_time_event_vector |
| (vm, &s->procs_waiting_for_open_window); |
| } |
| |
| pool_get (s->retry_pool, r); |
| ri = r - s->retry_pool; |
| |
| r->prev_index = s->retry_tail_index; |
| r->next_index = ~0; |
| s->retry_tail_index = ri; |
| |
| if (r->prev_index == ~0) |
| s->retry_head_index = ri; |
| else |
| { |
| mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index); |
| p->next_index = ri; |
| } |
| |
| vlib_buffer_advance (b, -sizeof (mp[0])); |
| mp = vlib_buffer_get_current (b); |
| |
| mp->peer_id = mcm->transport.our_ack_peer_id; |
| /* mp->transport.global_sequence set by relay agent. */ |
| mp->global_sequence = 0xdeadbeef; |
| mp->stream_index = s->index; |
| mp->local_sequence = s->our_local_sequence++; |
| mp->n_data_bytes = |
| vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]); |
| |
| r->buffer_index = buffer_index; |
| r->local_sequence = mp->local_sequence; |
| r->sent_at = vlib_time_now (vm); |
| r->n_retries = 0; |
| |
| /* Retry will be freed when all currently known peers have acked. */ |
| vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1); |
| vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap); |
| |
| hash_set (s->retry_index_by_local_sequence, r->local_sequence, |
| r - s->retry_pool); |
| |
| elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries); |
| |
| mc_byte_swap_msg_user_request (mp); |
| |
| mcm->transport.tx_buffer (mcm->transport.opaque, |
| MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index); |
| |
| s->user_requests_sent++; |
| |
| /* return amount of window remaining */ |
| return s->config.window_size - pool_elts (s->retry_pool); |
| } |
| |
| void |
| mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp, |
| u32 buffer_index) |
| { |
| vlib_main_t *vm = mcm->vlib_main; |
| mc_stream_t *s; |
| mc_stream_peer_t *peer; |
| i32 seq_cmp_result; |
| static int once = 0; |
| |
| mc_byte_swap_msg_user_request (mp); |
| |
| s = mc_stream_by_index (mcm, mp->stream_index); |
| |
| /* Not signed up for this stream? Turf-o-matic */ |
| if (!s || s->state != MC_STREAM_STATE_ready) |
| { |
| vlib_buffer_free_one (vm, buffer_index); |
| return; |
| } |
| |
| /* Find peer, including ourselves. */ |
| peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, |
| /* created */ 0); |
| |
| seq_cmp_result = mc_seq_cmp (mp->local_sequence, |
| peer->last_sequence_received + 1); |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d", |
| .format_args = "T4i4i4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 peer, stream_index, rx_sequence; |
| i32 seq_cmp_result; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); |
| ed->stream_index = mp->stream_index; |
| ed->rx_sequence = mp->local_sequence; |
| ed->seq_cmp_result = seq_cmp_result; |
| } |
| |
| if (0 && mp->stream_index == 1 && once == 0) |
| { |
| once = 1; |
| ELOG_TYPE (e, "FAKE lost msg on stream 1"); |
| ELOG (mcm->elog_main, e, 0); |
| return; |
| } |
| |
| peer->last_sequence_received += seq_cmp_result == 0; |
| s->user_requests_received++; |
| |
| if (seq_cmp_result > 0) |
| peer->stats.n_msgs_from_future += 1; |
| |
| /* Send ack even if msg from future */ |
| if (1) |
| { |
| mc_msg_user_ack_t *rp; |
| u32 bi; |
| |
| rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi); |
| rp->peer_id = mcm->transport.our_ack_peer_id; |
| rp->stream_index = s->index; |
| rp->local_sequence = mp->local_sequence; |
| rp->seq_cmp_result = seq_cmp_result; |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "tx-ack: stream %d local seq %d", |
| .format_args = "i4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 stream_index; |
| u32 local_sequence; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->stream_index = rp->stream_index; |
| ed->local_sequence = rp->local_sequence; |
| } |
| |
| mc_byte_swap_msg_user_ack (rp); |
| |
| mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi); |
| /* Msg from past? If so, free the buffer... */ |
| if (seq_cmp_result < 0) |
| { |
| vlib_buffer_free_one (vm, buffer_index); |
| peer->stats.n_msgs_from_past += 1; |
| } |
| } |
| |
| if (seq_cmp_result == 0) |
| { |
| vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index); |
| switch (s->state) |
| { |
| case MC_STREAM_STATE_ready: |
| vlib_buffer_advance (b, sizeof (mp[0])); |
| s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index); |
| |
| /* Stream vector can change address via rx callback for mc-internal |
| stream. */ |
| s = mc_stream_by_index (mcm, mp->stream_index); |
| ASSERT (s != 0); |
| s->last_global_sequence_processed = mp->global_sequence; |
| break; |
| |
| case MC_STREAM_STATE_catchup: |
| clib_fifo_add1 (s->catchup_fifo, buffer_index); |
| break; |
| |
| default: |
| clib_warning ("stream in unknown state %U", |
| format_mc_stream_state, s->state); |
| break; |
| } |
| } |
| } |
| |
| void |
| mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp, |
| u32 buffer_index) |
| { |
| vlib_main_t *vm = mcm->vlib_main; |
| uword *p; |
| mc_stream_t *s; |
| mc_stream_peer_t *peer; |
| mc_retry_t *r; |
| int peer_created = 0; |
| |
| mc_byte_swap_msg_user_ack (mp); |
| |
| s = mc_stream_by_index (mcm, mp->stream_index); |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "rx-ack: local seq %d peer %s seq_cmp_result %d", |
| .format_args = "i4T4i4", |
| }; |
| /* *INDENT-ON* */ |
| |
| struct |
| { |
| u32 local_sequence; |
| u32 peer; |
| i32 seq_cmp_result; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->local_sequence = mp->local_sequence; |
| ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); |
| ed->seq_cmp_result = mp->seq_cmp_result; |
| } |
| |
| /* Unknown stream? */ |
| if (!s) |
| return; |
| |
| /* Find the peer which just ack'ed. */ |
| peer = get_or_create_peer_with_id (mcm, s, mp->peer_id, |
| /* created */ &peer_created); |
| |
| /* |
| * Peer reports message from the future. If it's not in the retry |
| * fifo, look for a retired message. |
| */ |
| if (mp->seq_cmp_result > 0) |
| { |
| p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence - |
| mp->seq_cmp_result); |
| if (p == 0) |
| mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result); |
| |
| /* Normal retry should fix it... */ |
| return; |
| } |
| |
| /* |
| * Pointer to the indicated retry fifo entry. |
| * Worth hashing because we could use a window size of 100 or 1000. |
| */ |
| p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence); |
| |
| /* |
| * Is this a duplicate ACK, received after we've retired the |
| * fifo entry. This can happen when learning about new |
| * peers. |
| */ |
| if (p == 0) |
| { |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "ack: for seq %d from peer %s no fifo elt", |
| .format_args = "i4T4", |
| }; |
| /* *INDENT-ON* */ |
| |
| struct |
| { |
| u32 seq; |
| u32 peer; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->seq = mp->local_sequence; |
| ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); |
| } |
| |
| return; |
| } |
| |
| r = pool_elt_at_index (s->retry_pool, p[0]); |
| |
| /* Make sure that this new peer ACKs our msgs from now on */ |
| if (peer_created) |
| { |
| mc_retry_t *later_retry = next_retry (s, r); |
| |
| while (later_retry) |
| { |
| later_retry->unacked_by_peer_bitmap = |
| clib_bitmap_ori (later_retry->unacked_by_peer_bitmap, |
| peer - s->peers); |
| later_retry = next_retry (s, later_retry); |
| } |
| } |
| |
| ASSERT (mp->local_sequence == r->local_sequence); |
| |
| /* If we weren't expecting to hear from this peer */ |
| if (!peer_created && |
| !clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers)) |
| { |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "dup-ack: for seq %d from peer %s", |
| .format_args = "i4T4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 seq; |
| u32 peer; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->seq = r->local_sequence; |
| ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); |
| } |
| if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) |
| return; |
| } |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "ack: for seq %d from peer %s", |
| .format_args = "i4T4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 seq; |
| u32 peer; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->seq = mp->local_sequence; |
| ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64); |
| } |
| |
| r->unacked_by_peer_bitmap = |
| clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers); |
| |
| /* Not all clients have ack'ed */ |
| if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap)) |
| { |
| return; |
| } |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "ack: retire fifo elt loc seq %d after %d acks", |
| .format_args = "i4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 seq; |
| u32 npeers; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->seq = r->local_sequence; |
| ed->npeers = pool_elts (s->peers); |
| } |
| |
| hash_unset (s->retry_index_by_local_sequence, mp->local_sequence); |
| mc_retry_free (mcm, s, r); |
| remove_retry_from_pool (s, r); |
| maybe_send_window_open_event (vm, s); |
| } |
| |
| #define EVENT_MC_SEND_CATCHUP_DATA 0 |
| |
| static uword |
| mc_catchup_process (vlib_main_t * vm, |
| vlib_node_runtime_t * node, vlib_frame_t * f) |
| { |
| mc_main_t *mcm = mc_node_get_main (node); |
| uword *event_data = 0; |
| mc_catchup_process_arg_t *args; |
| int i; |
| |
| while (1) |
| { |
| if (event_data) |
| _vec_len (event_data) = 0; |
| vlib_process_wait_for_event_with_type (vm, &event_data, |
| EVENT_MC_SEND_CATCHUP_DATA); |
| |
| for (i = 0; i < vec_len (event_data); i++) |
| { |
| args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]); |
| |
| mcm->transport.catchup_send_fun (mcm->transport.opaque, |
| args->catchup_opaque, |
| args->catchup_snapshot); |
| |
| /* Send function will free snapshot data vector. */ |
| pool_put (mcm->catchup_process_args, args); |
| } |
| } |
| |
| return 0; /* not likely */ |
| } |
| |
| static void |
| serialize_mc_stream (serialize_main_t * m, va_list * va) |
| { |
| mc_stream_t *s = va_arg (*va, mc_stream_t *); |
| mc_stream_peer_t *p; |
| |
| serialize_integer (m, pool_elts (s->peers), sizeof (u32)); |
| /* *INDENT-OFF* */ |
| pool_foreach (p, s->peers, ({ |
| u8 * x = serialize_get (m, sizeof (p->id)); |
| clib_memcpy (x, p->id.as_u8, sizeof (p->id)); |
| serialize_integer (m, p->last_sequence_received, |
| sizeof (p->last_sequence_received)); |
| })); |
| /* *INDENT-ON* */ |
| serialize_bitmap (m, s->all_peer_bitmap); |
| } |
| |
| void |
| unserialize_mc_stream (serialize_main_t * m, va_list * va) |
| { |
| mc_stream_t *s = va_arg (*va, mc_stream_t *); |
| u32 i, n_peers; |
| mc_stream_peer_t *p; |
| |
| unserialize_integer (m, &n_peers, sizeof (u32)); |
| mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t)); |
| for (i = 0; i < n_peers; i++) |
| { |
| u8 *x; |
| pool_get (s->peers, p); |
| x = unserialize_get (m, sizeof (p->id)); |
| clib_memcpy (p->id.as_u8, x, sizeof (p->id)); |
| unserialize_integer (m, &p->last_sequence_received, |
| sizeof (p->last_sequence_received)); |
| mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */ |
| 0); |
| } |
| s->all_peer_bitmap = unserialize_bitmap (m); |
| |
| /* This is really bad. */ |
| if (!s->all_peer_bitmap) |
| clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name); |
| } |
| |
| void |
| mc_msg_catchup_request_handler (mc_main_t * mcm, |
| mc_msg_catchup_request_t * req, |
| u32 catchup_opaque) |
| { |
| vlib_main_t *vm = mcm->vlib_main; |
| mc_stream_t *s; |
| mc_catchup_process_arg_t *args; |
| |
| mc_byte_swap_msg_catchup_request (req); |
| |
| s = mc_stream_by_index (mcm, req->stream_index); |
| if (!s || s->state != MC_STREAM_STATE_ready) |
| return; |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "catchup-request: from %s stream %d", |
| .format_args = "T4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 peer, stream; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64); |
| ed->stream = req->stream_index; |
| } |
| |
| /* |
| * The application has to snapshoot its data structures right |
| * here, right now. If we process any messages after |
| * noting the last global sequence we've processed, the client |
| * won't be able to accurately reconstruct our data structures. |
| * |
| * Once the data structures are e.g. vec_dup()'ed, we |
| * send the resulting messages from a separate process, to |
| * make sure that we don't cause a bunch of message retransmissions |
| */ |
| pool_get (mcm->catchup_process_args, args); |
| |
| args->stream_index = s - mcm->stream_vector; |
| args->catchup_opaque = catchup_opaque; |
| args->catchup_snapshot = 0; |
| |
| /* Construct catchup reply and snapshot state for stream to send as |
| catchup reply payload. */ |
| { |
| mc_msg_catchup_reply_t *rep; |
| serialize_main_t m; |
| |
| vec_resize (args->catchup_snapshot, sizeof (rep[0])); |
| |
| rep = (void *) args->catchup_snapshot; |
| |
| rep->peer_id = req->peer_id; |
| rep->stream_index = req->stream_index; |
| rep->last_global_sequence_included = s->last_global_sequence_processed; |
| |
| /* Setup for serialize to append to catchup snapshot. */ |
| serialize_open_vector (&m, args->catchup_snapshot); |
| m.stream.current_buffer_index = vec_len (m.stream.buffer); |
| |
| serialize (&m, serialize_mc_stream, s); |
| |
| args->catchup_snapshot = serialize_close_vector (&m); |
| |
| /* Actually copy internal state */ |
| args->catchup_snapshot = s->config.catchup_snapshot |
| (mcm, args->catchup_snapshot, rep->last_global_sequence_included); |
| |
| rep = (void *) args->catchup_snapshot; |
| rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]); |
| |
| mc_byte_swap_msg_catchup_reply (rep); |
| } |
| |
| /* now go send it... */ |
| vlib_process_signal_event (vm, mcm->catchup_process, |
| EVENT_MC_SEND_CATCHUP_DATA, |
| args - mcm->catchup_process_args); |
| } |
| |
| #define EVENT_MC_UNSERIALIZE_BUFFER 0 |
| #define EVENT_MC_UNSERIALIZE_CATCHUP 1 |
| |
| void |
| mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp, |
| u32 catchup_opaque) |
| { |
| vlib_process_signal_event (mcm->vlib_main, |
| mcm->unserialize_process, |
| EVENT_MC_UNSERIALIZE_CATCHUP, |
| pointer_to_uword (mp)); |
| } |
| |
| static void |
| perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp) |
| { |
| mc_stream_t *s; |
| i32 seq_cmp_result; |
| |
| mc_byte_swap_msg_catchup_reply (mp); |
| |
| s = mc_stream_by_index (mcm, mp->stream_index); |
| |
| /* Never heard of this stream or already caught up. */ |
| if (!s || s->state == MC_STREAM_STATE_ready) |
| return; |
| |
| { |
| serialize_main_t m; |
| mc_stream_peer_t *p; |
| u32 n_stream_bytes; |
| |
| /* For offline sim replay: save the entire catchup snapshot... */ |
| if (s->config.save_snapshot) |
| s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data, |
| mp->n_data_bytes); |
| |
| unserialize_open_data (&m, mp->data, mp->n_data_bytes); |
| unserialize (&m, unserialize_mc_stream, s); |
| |
| /* Make sure we start numbering our messages as expected */ |
| /* *INDENT-OFF* */ |
| pool_foreach (p, s->peers, ({ |
| if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64) |
| s->our_local_sequence = p->last_sequence_received + 1; |
| })); |
| /* *INDENT-ON* */ |
| |
| n_stream_bytes = m.stream.current_buffer_index; |
| |
| /* No need to unserialize close; nothing to free. */ |
| |
| /* After serialized stream is user's catchup data. */ |
| s->config.catchup (mcm, mp->data + n_stream_bytes, |
| mp->n_data_bytes - n_stream_bytes); |
| } |
| |
| /* Vector could have been moved by catchup. |
| This can only happen for mc-internal stream. */ |
| s = mc_stream_by_index (mcm, mp->stream_index); |
| |
| s->last_global_sequence_processed = mp->last_global_sequence_included; |
| |
| while (clib_fifo_elts (s->catchup_fifo)) |
| { |
| mc_msg_user_request_t *gp; |
| u32 bi; |
| vlib_buffer_t *b; |
| |
| clib_fifo_sub1 (s->catchup_fifo, bi); |
| |
| b = vlib_get_buffer (mcm->vlib_main, bi); |
| gp = vlib_buffer_get_current (b); |
| |
| /* Make sure we're replaying "new" news */ |
| seq_cmp_result = mc_seq_cmp (gp->global_sequence, |
| mp->last_global_sequence_included); |
| |
| if (seq_cmp_result > 0) |
| { |
| vlib_buffer_advance (b, sizeof (gp[0])); |
| s->config.rx_buffer (mcm, s, gp->peer_id, bi); |
| s->last_global_sequence_processed = gp->global_sequence; |
| |
| if (MC_EVENT_LOGGING) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "catchup replay local sequence 0x%x", |
| .format_args = "i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 local_sequence; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->local_sequence = gp->local_sequence; |
| } |
| } |
| else |
| { |
| if (MC_EVENT_LOGGING) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (t) = |
| { |
| .format = "catchup discard local sequence 0x%x", |
| .format_args = "i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 local_sequence; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, t); |
| ed->local_sequence = gp->local_sequence; |
| } |
| |
| vlib_buffer_free_one (mcm->vlib_main, bi); |
| } |
| } |
| |
| s->state = MC_STREAM_STATE_ready; |
| |
| /* Now that we are caught up wake up joining process. */ |
| { |
| vlib_one_time_waiting_process_t *wp; |
| vec_foreach (wp, s->procs_waiting_for_join_done) |
| vlib_signal_one_time_waiting_process (mcm->vlib_main, wp); |
| if (s->procs_waiting_for_join_done) |
| _vec_len (s->procs_waiting_for_join_done) = 0; |
| } |
| } |
| |
| static void |
| this_node_maybe_master (mc_main_t * mcm) |
| { |
| vlib_main_t *vm = mcm->vlib_main; |
| mc_msg_master_assert_t *mp; |
| uword event_type; |
| int timeouts = 0; |
| int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER; |
| clib_error_t *error; |
| f64 now, time_last_master_assert = -1; |
| u32 bi; |
| |
| while (1) |
| { |
| if (!mcm->we_can_be_relay_master) |
| { |
| mcm->relay_state = MC_RELAY_STATE_SLAVE; |
| if (MC_EVENT_LOGGING) |
| { |
| ELOG_TYPE (e, "become slave (config)"); |
| ELOG (mcm->elog_main, e, 0); |
| } |
| return; |
| } |
| |
| now = vlib_time_now (vm); |
| if (now >= time_last_master_assert + 1) |
| { |
| time_last_master_assert = now; |
| mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi); |
| |
| mp->peer_id = mcm->transport.our_ack_peer_id; |
| mp->global_sequence = mcm->relay_global_sequence; |
| |
| /* |
| * these messages clog the event log, set MC_EVENT_LOGGING higher |
| * if you want them |
| */ |
| if (MC_EVENT_LOGGING > 1) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "tx-massert: peer %s global seq %u", |
| .format_args = "T4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 peer, global_sequence; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64); |
| ed->global_sequence = mp->global_sequence; |
| } |
| |
| mc_byte_swap_msg_master_assert (mp); |
| |
| error = |
| mcm->transport.tx_buffer (mcm->transport.opaque, |
| MC_TRANSPORT_MASTERSHIP, bi); |
| if (error) |
| clib_error_report (error); |
| } |
| |
| vlib_process_wait_for_event_or_clock (vm, 1.0); |
| event_type = vlib_process_get_events (vm, /* no event data */ 0); |
| |
| switch (event_type) |
| { |
| case ~0: |
| if (!is_master && timeouts++ > 2) |
| { |
| mcm->relay_state = MC_RELAY_STATE_MASTER; |
| mcm->relay_master_peer_id = |
| mcm->transport.our_ack_peer_id.as_u64; |
| if (MC_EVENT_LOGGING) |
| { |
| ELOG_TYPE (e, "become master (was maybe_master)"); |
| ELOG (mcm->elog_main, e, 0); |
| } |
| return; |
| } |
| break; |
| |
| case MC_RELAY_STATE_SLAVE: |
| mcm->relay_state = MC_RELAY_STATE_SLAVE; |
| if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE) |
| { |
| ELOG_TYPE (e, "become slave (was maybe_master)"); |
| ELOG (mcm->elog_main, e, 0); |
| } |
| return; |
| } |
| } |
| } |
| |
| static void |
| this_node_slave (mc_main_t * mcm) |
| { |
| vlib_main_t *vm = mcm->vlib_main; |
| uword event_type; |
| int timeouts = 0; |
| |
| if (MC_EVENT_LOGGING) |
| { |
| ELOG_TYPE (e, "become slave"); |
| ELOG (mcm->elog_main, e, 0); |
| } |
| |
| while (1) |
| { |
| vlib_process_wait_for_event_or_clock (vm, 1.0); |
| event_type = vlib_process_get_events (vm, /* no event data */ 0); |
| |
| switch (event_type) |
| { |
| case ~0: |
| if (timeouts++ > 2) |
| { |
| mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; |
| mcm->relay_master_peer_id = ~0ULL; |
| if (MC_EVENT_LOGGING) |
| { |
| ELOG_TYPE (e, "timeouts; negoitate mastership"); |
| ELOG (mcm->elog_main, e, 0); |
| } |
| return; |
| } |
| break; |
| |
| case MC_RELAY_STATE_SLAVE: |
| mcm->relay_state = MC_RELAY_STATE_SLAVE; |
| timeouts = 0; |
| break; |
| } |
| } |
| } |
| |
| static uword |
| mc_mastership_process (vlib_main_t * vm, |
| vlib_node_runtime_t * node, vlib_frame_t * f) |
| { |
| mc_main_t *mcm = mc_node_get_main (node); |
| |
| while (1) |
| { |
| switch (mcm->relay_state) |
| { |
| case MC_RELAY_STATE_NEGOTIATE: |
| case MC_RELAY_STATE_MASTER: |
| this_node_maybe_master (mcm); |
| break; |
| |
| case MC_RELAY_STATE_SLAVE: |
| this_node_slave (mcm); |
| break; |
| } |
| } |
| return 0; /* not likely */ |
| } |
| |
| void |
| mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master) |
| { |
| if (we_can_be_master != mcm->we_can_be_relay_master) |
| { |
| mcm->we_can_be_relay_master = we_can_be_master; |
| vlib_process_signal_event (mcm->vlib_main, |
| mcm->mastership_process, |
| MC_RELAY_STATE_NEGOTIATE, 0); |
| } |
| } |
| |
| void |
| mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp, |
| u32 buffer_index) |
| { |
| mc_peer_id_t his_peer_id, our_peer_id; |
| i32 seq_cmp_result; |
| u8 signal_slave = 0; |
| u8 update_global_sequence = 0; |
| |
| mc_byte_swap_msg_master_assert (mp); |
| |
| his_peer_id = mp->peer_id; |
| our_peer_id = mcm->transport.our_ack_peer_id; |
| |
| /* compare the incoming global sequence with ours */ |
| seq_cmp_result = mc_seq_cmp (mp->global_sequence, |
| mcm->relay_global_sequence); |
| |
| /* If the sender has a lower peer id and the sender's sequence >= |
| our global sequence, we become a slave. Otherwise we are master. */ |
| if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0 |
| && seq_cmp_result >= 0) |
| { |
| vlib_process_signal_event (mcm->vlib_main, |
| mcm->mastership_process, |
| MC_RELAY_STATE_SLAVE, 0); |
| signal_slave = 1; |
| } |
| |
| /* Update our global sequence. */ |
| if (seq_cmp_result > 0) |
| { |
| mcm->relay_global_sequence = mp->global_sequence; |
| update_global_sequence = 1; |
| } |
| |
| { |
| uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id); |
| mc_mastership_peer_t *p; |
| |
| if (q) |
| p = vec_elt_at_index (mcm->mastership_peers, q[0]); |
| else |
| { |
| vec_add2 (mcm->mastership_peers, p, 1); |
| p->peer_id = his_peer_id; |
| mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id, |
| p - mcm->mastership_peers, |
| /* old_value */ 0); |
| } |
| p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main); |
| } |
| |
| /* |
| * these messages clog the event log, set MC_EVENT_LOGGING higher |
| * if you want them. |
| */ |
| if (MC_EVENT_LOGGING > 1) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "rx-massert: peer %s global seq %u upd %d slave %d", |
| .format_args = "T4i4i1i1", |
| }; |
| /* *INDENT-ON* */ |
| |
| struct |
| { |
| u32 peer; |
| u32 global_sequence; |
| u8 update_sequence; |
| u8 slave; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64); |
| ed->global_sequence = mp->global_sequence; |
| ed->update_sequence = update_global_sequence; |
| ed->slave = signal_slave; |
| } |
| } |
| |
| static void |
| mc_serialize_init (mc_main_t * mcm) |
| { |
| mc_serialize_msg_t *m; |
| vlib_main_t *vm = vlib_get_main (); |
| |
| mcm->global_msg_index_by_name |
| = hash_create_string ( /* elts */ 0, sizeof (uword)); |
| |
| m = vm->mc_msg_registrations; |
| |
| while (m) |
| { |
| m->global_index = vec_len (mcm->global_msgs); |
| hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index); |
| vec_add1 (mcm->global_msgs, m); |
| m = m->next_registration; |
| } |
| } |
| |
| clib_error_t * |
| mc_serialize_va (mc_main_t * mc, |
| u32 stream_index, |
| u32 multiple_messages_per_vlib_buffer, |
| mc_serialize_msg_t * msg, va_list * va) |
| { |
| mc_stream_t *s; |
| clib_error_t *error; |
| serialize_main_t *m = &mc->serialize_mains[VLIB_TX]; |
| vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX]; |
| u32 bi, n_before, n_after, n_total, n_this_msg; |
| u32 si, gi; |
| |
| if (!sbm->vlib_main) |
| { |
| sbm->tx.max_n_data_bytes_per_chain = 4096; |
| sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX; |
| } |
| |
| if (sbm->first_buffer == 0) |
| serialize_open_vlib_buffer (m, mc->vlib_main, sbm); |
| |
| n_before = serialize_vlib_buffer_n_bytes (m); |
| |
| s = mc_stream_by_index (mc, stream_index); |
| gi = msg->global_index; |
| ASSERT (msg == vec_elt (mc->global_msgs, gi)); |
| |
| si = ~0; |
| if (gi < vec_len (s->stream_msg_index_by_global_index)) |
| si = s->stream_msg_index_by_global_index[gi]; |
| |
| serialize_likely_small_unsigned_integer (m, si); |
| |
| /* For first time message is sent, use name to identify message. */ |
| if (si == ~0 || MSG_ID_DEBUG) |
| serialize_cstring (m, msg->name); |
| |
| if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "serialize-msg: %s index %d", |
| .format_args = "T4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 c[2]; |
| } *ed; |
| ed = ELOG_DATA (mc->elog_main, e); |
| ed->c[0] = elog_id_for_msg_name (mc, msg->name); |
| ed->c[1] = si; |
| } |
| |
| error = va_serialize (m, va); |
| |
| n_after = serialize_vlib_buffer_n_bytes (m); |
| n_this_msg = n_after - n_before; |
| n_total = n_after + sizeof (mc_msg_user_request_t); |
| |
| /* For max message size ignore first message where string name is sent. */ |
| if (si != ~0) |
| msg->max_n_bytes_serialized = |
| clib_max (msg->max_n_bytes_serialized, n_this_msg); |
| |
| if (!multiple_messages_per_vlib_buffer |
| || si == ~0 |
| || n_total + msg->max_n_bytes_serialized > |
| mc->transport.max_packet_size) |
| { |
| bi = serialize_close_vlib_buffer (m); |
| sbm->first_buffer = 0; |
| if (!error) |
| mc_stream_send (mc, stream_index, bi); |
| else if (bi != ~0) |
| vlib_buffer_free_one (mc->vlib_main, bi); |
| } |
| |
| return error; |
| } |
| |
| clib_error_t * |
| mc_serialize_internal (mc_main_t * mc, |
| u32 stream_index, |
| u32 multiple_messages_per_vlib_buffer, |
| mc_serialize_msg_t * msg, ...) |
| { |
| vlib_main_t *vm = mc->vlib_main; |
| va_list va; |
| clib_error_t *error; |
| |
| if (stream_index == ~0) |
| { |
| if (vm->mc_main && vm->mc_stream_index == ~0) |
| vlib_current_process_wait_for_one_time_event_vector |
| (vm, &vm->procs_waiting_for_mc_stream_join); |
| stream_index = vm->mc_stream_index; |
| } |
| |
| va_start (va, msg); |
| error = mc_serialize_va (mc, stream_index, |
| multiple_messages_per_vlib_buffer, msg, &va); |
| va_end (va); |
| return error; |
| } |
| |
| uword |
| mc_unserialize_message (mc_main_t * mcm, |
| mc_stream_t * s, serialize_main_t * m) |
| { |
| mc_serialize_stream_msg_t *sm; |
| u32 gi, si; |
| |
| si = unserialize_likely_small_unsigned_integer (m); |
| |
| if (!(si == ~0 || MSG_ID_DEBUG)) |
| { |
| sm = vec_elt_at_index (s->stream_msgs, si); |
| gi = sm->global_index; |
| } |
| else |
| { |
| char *name; |
| |
| unserialize_cstring (m, &name); |
| |
| if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "unserialize-msg: %s rx index %d", |
| .format_args = "T4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 c[2]; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->c[0] = elog_id_for_msg_name (mcm, name); |
| ed->c[1] = si; |
| } |
| |
| { |
| uword *p = hash_get_mem (mcm->global_msg_index_by_name, name); |
| gi = p ? p[0] : ~0; |
| } |
| |
| /* Unknown message? */ |
| if (gi == ~0) |
| { |
| vec_free (name); |
| goto done; |
| } |
| |
| vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0); |
| si = s->stream_msg_index_by_global_index[gi]; |
| |
| /* Stream local index unknown? Create it. */ |
| if (si == ~0) |
| { |
| vec_add2 (s->stream_msgs, sm, 1); |
| |
| si = sm - s->stream_msgs; |
| sm->global_index = gi; |
| s->stream_msg_index_by_global_index[gi] = si; |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "msg-bind: stream %d %s to index %d", |
| .format_args = "i4T4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 c[3]; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->c[0] = s->index; |
| ed->c[1] = elog_id_for_msg_name (mcm, name); |
| ed->c[2] = si; |
| } |
| } |
| else |
| { |
| sm = vec_elt_at_index (s->stream_msgs, si); |
| if (gi != sm->global_index && MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "msg-id-ERROR: %s index %d expected %d", |
| .format_args = "T4i4i4", |
| }; |
| /* *INDENT-ON* */ |
| struct |
| { |
| u32 c[3]; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->c[0] = elog_id_for_msg_name (mcm, name); |
| ed->c[1] = si; |
| ed->c[2] = ~0; |
| if (sm->global_index < |
| vec_len (s->stream_msg_index_by_global_index)) |
| ed->c[2] = |
| s->stream_msg_index_by_global_index[sm->global_index]; |
| } |
| } |
| |
| vec_free (name); |
| } |
| |
| if (gi != ~0) |
| { |
| mc_serialize_msg_t *msg; |
| msg = vec_elt (mcm->global_msgs, gi); |
| unserialize (m, msg->unserialize, mcm); |
| } |
| |
| done: |
| return gi != ~0; |
| } |
| |
| void |
| mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index) |
| { |
| vlib_main_t *vm = mcm->vlib_main; |
| serialize_main_t *m = &mcm->serialize_mains[VLIB_RX]; |
| vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX]; |
| mc_stream_and_buffer_t *sb; |
| mc_stream_t *stream; |
| u32 buffer_index; |
| |
| sb = |
| pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers, |
| stream_and_buffer_index); |
| buffer_index = sb->buffer_index; |
| stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index); |
| pool_put (mcm->mc_unserialize_stream_and_buffers, sb); |
| |
| if (stream->config.save_snapshot) |
| { |
| u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index); |
| static u8 *contents; |
| vec_reset_length (contents); |
| vec_validate (contents, n_bytes - 1); |
| vlib_buffer_contents (vm, buffer_index, contents); |
| stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents, |
| n_bytes); |
| } |
| |
| ASSERT (vlib_in_process_context (vm)); |
| |
| unserialize_open_vlib_buffer (m, vm, sbm); |
| |
| clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index); |
| |
| while (unserialize_vlib_buffer_n_bytes (m) > 0) |
| mc_unserialize_message (mcm, stream, m); |
| |
| /* Frees buffer. */ |
| unserialize_close_vlib_buffer (m); |
| } |
| |
| void |
| mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index) |
| { |
| vlib_main_t *vm = mcm->vlib_main; |
| mc_stream_and_buffer_t *sb; |
| pool_get (mcm->mc_unserialize_stream_and_buffers, sb); |
| sb->stream_index = s->index; |
| sb->buffer_index = buffer_index; |
| vlib_process_signal_event (vm, mcm->unserialize_process, |
| EVENT_MC_UNSERIALIZE_BUFFER, |
| sb - mcm->mc_unserialize_stream_and_buffers); |
| } |
| |
| static uword |
| mc_unserialize_process (vlib_main_t * vm, |
| vlib_node_runtime_t * node, vlib_frame_t * f) |
| { |
| mc_main_t *mcm = mc_node_get_main (node); |
| uword event_type, *event_data = 0; |
| int i; |
| |
| while (1) |
| { |
| if (event_data) |
| _vec_len (event_data) = 0; |
| |
| vlib_process_wait_for_event (vm); |
| event_type = vlib_process_get_events (vm, &event_data); |
| switch (event_type) |
| { |
| case EVENT_MC_UNSERIALIZE_BUFFER: |
| for (i = 0; i < vec_len (event_data); i++) |
| mc_unserialize_internal (mcm, event_data[i]); |
| break; |
| |
| case EVENT_MC_UNSERIALIZE_CATCHUP: |
| for (i = 0; i < vec_len (event_data); i++) |
| { |
| u8 *mp = uword_to_pointer (event_data[i], u8 *); |
| perform_catchup (mcm, (void *) mp); |
| vec_free (mp); |
| } |
| break; |
| |
| default: |
| break; |
| } |
| } |
| |
| return 0; /* not likely */ |
| } |
| |
| void |
| serialize_mc_main (serialize_main_t * m, va_list * va) |
| { |
| mc_main_t *mcm = va_arg (*va, mc_main_t *); |
| mc_stream_t *s; |
| mc_serialize_stream_msg_t *sm; |
| mc_serialize_msg_t *msg; |
| |
| serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32)); |
| vec_foreach (s, mcm->stream_vector) |
| { |
| /* Stream name. */ |
| serialize_cstring (m, s->config.name); |
| |
| /* Serialize global names for all sent messages. */ |
| serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32)); |
| vec_foreach (sm, s->stream_msgs) |
| { |
| msg = vec_elt (mcm->global_msgs, sm->global_index); |
| serialize_cstring (m, msg->name); |
| } |
| } |
| } |
| |
| void |
| unserialize_mc_main (serialize_main_t * m, va_list * va) |
| { |
| mc_main_t *mcm = va_arg (*va, mc_main_t *); |
| u32 i, n_streams, n_stream_msgs; |
| char *name; |
| mc_stream_t *s; |
| mc_serialize_stream_msg_t *sm; |
| |
| unserialize_integer (m, &n_streams, sizeof (u32)); |
| for (i = 0; i < n_streams; i++) |
| { |
| unserialize_cstring (m, &name); |
| if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name)) |
| { |
| vec_validate (mcm->stream_vector, i); |
| s = vec_elt_at_index (mcm->stream_vector, i); |
| mc_stream_init (s); |
| s->index = s - mcm->stream_vector; |
| s->config.name = name; |
| s->state = MC_STREAM_STATE_name_known; |
| hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index); |
| } |
| else |
| vec_free (name); |
| |
| s = vec_elt_at_index (mcm->stream_vector, i); |
| |
| vec_free (s->stream_msgs); |
| vec_free (s->stream_msg_index_by_global_index); |
| |
| unserialize_integer (m, &n_stream_msgs, sizeof (u32)); |
| vec_resize (s->stream_msgs, n_stream_msgs); |
| vec_foreach (sm, s->stream_msgs) |
| { |
| uword *p; |
| u32 si, gi; |
| |
| unserialize_cstring (m, &name); |
| p = hash_get (mcm->global_msg_index_by_name, name); |
| gi = p ? p[0] : ~0; |
| si = sm - s->stream_msgs; |
| |
| if (MC_EVENT_LOGGING > 0) |
| { |
| /* *INDENT-OFF* */ |
| ELOG_TYPE_DECLARE (e) = |
| { |
| .format = "catchup-bind: %s to %d global index %d stream %d", |
| .format_args = "T4i4i4i4", |
| }; |
| /* *INDENT-ON* */ |
| |
| struct |
| { |
| u32 c[4]; |
| } *ed; |
| ed = ELOG_DATA (mcm->elog_main, e); |
| ed->c[0] = elog_id_for_msg_name (mcm, name); |
| ed->c[1] = si; |
| ed->c[2] = gi; |
| ed->c[3] = s->index; |
| } |
| |
| vec_free (name); |
| |
| sm->global_index = gi; |
| if (gi != ~0) |
| { |
| vec_validate_init_empty (s->stream_msg_index_by_global_index, |
| gi, ~0); |
| s->stream_msg_index_by_global_index[gi] = si; |
| } |
| } |
| } |
| } |
| |
| void |
| mc_main_init (mc_main_t * mcm, char *tag) |
| { |
| vlib_main_t *vm = vlib_get_main (); |
| |
| mcm->vlib_main = vm; |
| mcm->elog_main = &vm->elog_main; |
| |
| mcm->relay_master_peer_id = ~0ULL; |
| mcm->relay_state = MC_RELAY_STATE_NEGOTIATE; |
| |
| mcm->stream_index_by_name |
| = hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword)); |
| |
| { |
| vlib_node_registration_t r; |
| |
| memset (&r, 0, sizeof (r)); |
| |
| r.type = VLIB_NODE_TYPE_PROCESS; |
| |
| /* Point runtime data to main instance. */ |
| r.runtime_data = &mcm; |
| r.runtime_data_bytes = sizeof (&mcm); |
| |
| r.name = (char *) format (0, "mc-mastership-%s", tag); |
| r.function = mc_mastership_process; |
| mcm->mastership_process = vlib_register_node (vm, &r); |
| |
| r.name = (char *) format (0, "mc-join-ager-%s", tag); |
| r.function = mc_join_ager_process; |
| mcm->join_ager_process = vlib_register_node (vm, &r); |
| |
| r.name = (char *) format (0, "mc-retry-%s", tag); |
| r.function = mc_retry_process; |
| mcm->retry_process = vlib_register_node (vm, &r); |
| |
| r.name = (char *) format (0, "mc-catchup-%s", tag); |
| r.function = mc_catchup_process; |
| mcm->catchup_process = vlib_register_node (vm, &r); |
| |
| r.name = (char *) format (0, "mc-unserialize-%s", tag); |
| r.function = mc_unserialize_process; |
| mcm->unserialize_process = vlib_register_node (vm, &r); |
| } |
| |
| if (MC_EVENT_LOGGING > 0) |
| mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword), |
| sizeof (mc_peer_id_t)); |
| |
| mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword), |
| sizeof (mc_peer_id_t)); |
| mc_serialize_init (mcm); |
| } |
| |
| static u8 * |
| format_mc_relay_state (u8 * s, va_list * args) |
| { |
| mc_relay_state_t state = va_arg (*args, mc_relay_state_t); |
| char *t = 0; |
| switch (state) |
| { |
| case MC_RELAY_STATE_NEGOTIATE: |
| t = "negotiate"; |
| break; |
| case MC_RELAY_STATE_MASTER: |
| t = "master"; |
| break; |
| case MC_RELAY_STATE_SLAVE: |
| t = "slave"; |
| break; |
| default: |
| return format (s, "unknown 0x%x", state); |
| } |
| |
| return format (s, "%s", t); |
| } |
| |
| static u8 * |
| format_mc_stream_state (u8 * s, va_list * args) |
| { |
| mc_stream_state_t state = va_arg (*args, mc_stream_state_t); |
| char *t = 0; |
| switch (state) |
| { |
| #define _(f) case MC_STREAM_STATE_##f: t = #f; break; |
| foreach_mc_stream_state |
| #undef _ |
| default: |
| return format (s, "unknown 0x%x", state); |
| } |
| |
| return format (s, "%s", t); |
| } |
| |
| static int |
| mc_peer_comp (void *a1, void *a2) |
| { |
| mc_stream_peer_t *p1 = a1; |
| mc_stream_peer_t *p2 = a2; |
| |
| return mc_peer_id_compare (p1->id, p2->id); |
| } |
| |
| u8 * |
| format_mc_main (u8 * s, va_list * args) |
| { |
| mc_main_t *mcm = va_arg (*args, mc_main_t *); |
| mc_stream_t *t; |
| mc_stream_peer_t *p, *ps; |
| u32 indent = format_get_indent (s); |
| |
| s = format (s, "MC state %U, %d streams joined, global sequence 0x%x", |
| format_mc_relay_state, mcm->relay_state, |
| vec_len (mcm->stream_vector), mcm->relay_global_sequence); |
| |
| { |
| mc_mastership_peer_t *mp; |
| f64 now = vlib_time_now (mcm->vlib_main); |
| s = format (s, "\n%UMost recent mastership peers:", |
| format_white_space, indent + 2); |
| vec_foreach (mp, mcm->mastership_peers) |
| { |
| s = format (s, "\n%U%-30U%.4e", |
| format_white_space, indent + 4, |
| mcm->transport.format_peer_id, mp->peer_id, |
| now - mp->time_last_master_assert_received); |
| } |
| } |
| |
| vec_foreach (t, mcm->stream_vector) |
| { |
| s = format (s, "\n%Ustream `%s' index %d", |
| format_white_space, indent + 2, t->config.name, t->index); |
| |
| s = format (s, "\n%Ustate %U", |
| format_white_space, indent + 4, |
| format_mc_stream_state, t->state); |
| |
| s = |
| format (s, |
| "\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent", |
| format_white_space, indent + 4, t->config.retry_interval, |
| t->config.retry_limit, pool_elts (t->retry_pool), |
| t->stats.n_retries - t->stats_last_clear.n_retries); |
| |
| s = format (s, "\n%U%Ld/%Ld user requests sent/received", |
| format_white_space, indent + 4, |
| t->user_requests_sent, t->user_requests_received); |
| |
| s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x", |
| format_white_space, indent + 4, |
| pool_elts (t->peers), |
| t->our_local_sequence, t->last_global_sequence_processed); |
| |
| ps = 0; |
| /* *INDENT-OFF* */ |
| pool_foreach (p, t->peers, |
| ({ |
| if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers)) |
| vec_add1 (ps, p[0]); |
| })); |
| /* *INDENT-ON* */ |
| vec_sort_with_function (ps, mc_peer_comp); |
| s = format (s, "\n%U%=30s%10s%16s%16s", |
| format_white_space, indent + 6, |
| "Peer", "Last seq", "Retries", "Future"); |
| |
| vec_foreach (p, ps) |
| { |
| s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s", |
| format_white_space, indent + 6, |
| mcm->transport.format_peer_id, p->id.as_u64, |
| p->last_sequence_received, |
| p->stats.n_msgs_from_past - |
| p->stats_last_clear.n_msgs_from_past, |
| p->stats.n_msgs_from_future - |
| p->stats_last_clear.n_msgs_from_future, |
| (mcm->transport.our_ack_peer_id.as_u64 == |
| p->id.as_u64 ? " (self)" : "")); |
| } |
| vec_free (ps); |
| } |
| |
| return s; |
| } |
| |
| /* |
| * fd.io coding-style-patch-verification: ON |
| * |
| * Local Variables: |
| * eval: (c-set-style "gnu") |
| * End: |
| */ |