blob: e57962c94de9659b673840973c22aa9034bd92cc [file] [log] [blame]
/*
* mc.c: vlib reliable sequenced multicast distributed applications
*
* Copyright (c) 2010 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vlib/vlib.h>
/*
* 1 to enable msg id training wheels, which are useful for tracking
* down catchup and/or partitioned network problems
*/
#define MSG_ID_DEBUG 0
static format_function_t format_mc_stream_state;
static u32
elog_id_for_peer_id (mc_main_t * m, u64 peer_id)
{
uword *p, r;
mhash_t *h = &m->elog_id_by_peer_id;
if (!m->elog_id_by_peer_id.hash)
mhash_init (h, sizeof (uword), sizeof (mc_peer_id_t));
p = mhash_get (h, &peer_id);
if (p)
return p[0];
r = elog_string (m->elog_main, "%U", m->transport.format_peer_id, peer_id);
mhash_set (h, &peer_id, r, /* old_value */ 0);
return r;
}
static u32
elog_id_for_msg_name (mc_main_t * m, char *msg_name)
{
uword *p, r;
uword *h = m->elog_id_by_msg_name;
u8 *name_copy;
if (!h)
h = m->elog_id_by_msg_name = hash_create_string (0, sizeof (uword));
p = hash_get_mem (h, msg_name);
if (p)
return p[0];
r = elog_string (m->elog_main, "%s", msg_name);
name_copy = format (0, "%s%c", msg_name, 0);
hash_set_mem (h, name_copy, r);
m->elog_id_by_msg_name = h;
return r;
}
static void
elog_tx_msg (mc_main_t * m, u32 stream_id, u32 local_sequence,
u32 retry_count)
{
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "tx-msg: stream %d local seq %d attempt %d",
.format_args = "i4i4i4",
};
/* *INDENT-ON* */
struct
{
u32 stream_id, local_sequence, retry_count;
} *ed;
ed = ELOG_DATA (m->elog_main, e);
ed->stream_id = stream_id;
ed->local_sequence = local_sequence;
ed->retry_count = retry_count;
}
}
/*
* seq_cmp
* correctly compare two unsigned sequence numbers.
* This function works so long as x and y are within 2**(n-1) of each
* other, where n = bits(x, y).
*
* Magic decoder ring:
* seq_cmp == 0 => x and y are equal
* seq_cmp < 0 => x is "in the past" with respect to y
* seq_cmp > 0 => x is "in the future" with respect to y
*/
always_inline i32
mc_seq_cmp (u32 x, u32 y)
{
return (i32) x - (i32) y;
}
void *
mc_get_vlib_buffer (vlib_main_t * vm, u32 n_bytes, u32 * bi_return)
{
u32 n_alloc, bi;
vlib_buffer_t *b;
n_alloc = vlib_buffer_alloc (vm, &bi, 1);
ASSERT (n_alloc == 1);
b = vlib_get_buffer (vm, bi);
b->current_length = n_bytes;
*bi_return = bi;
return (void *) b->data;
}
static void
delete_peer_with_index (mc_main_t * mcm, mc_stream_t * s,
uword index, int notify_application)
{
mc_stream_peer_t *p = pool_elt_at_index (s->peers, index);
ASSERT (p != 0);
if (s->config.peer_died && notify_application)
s->config.peer_died (mcm, s, p->id);
s->all_peer_bitmap = clib_bitmap_andnoti (s->all_peer_bitmap, p - s->peers);
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "delete peer %s from all_peer_bitmap",
.format_args = "T4",
};
/* *INDENT-ON* */
struct
{
u32 peer;
} *ed = 0;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
}
/* Do not delete the pool / hash table entries, or we lose sequence number state */
}
static mc_stream_peer_t *
get_or_create_peer_with_id (mc_main_t * mcm,
mc_stream_t * s, mc_peer_id_t id, int *created)
{
uword *q = mhash_get (&s->peer_index_by_id, &id);
mc_stream_peer_t *p;
if (q)
{
p = pool_elt_at_index (s->peers, q[0]);
goto done;
}
pool_get (s->peers, p);
memset (p, 0, sizeof (p[0]));
p->id = id;
p->last_sequence_received = ~0;
mhash_set (&s->peer_index_by_id, &id, p - s->peers, /* old_value */ 0);
if (created)
*created = 1;
done:
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "get_or_create %s peer %s stream %d seq %d",
.format_args = "t4T4i4i4",
.n_enum_strings = 2,
.enum_strings = {
"old", "new",
},
};
/* *INDENT-ON* */
struct
{
u32 is_new, peer, stream_index, rx_sequence;
} *ed = 0;
ed = ELOG_DATA (mcm->elog_main, e);
ed->is_new = q ? 0 : 1;
ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
ed->stream_index = s->index;
ed->rx_sequence = p->last_sequence_received;
}
/* $$$$ Enable or reenable this peer */
s->all_peer_bitmap = clib_bitmap_ori (s->all_peer_bitmap, p - s->peers);
return p;
}
static void
maybe_send_window_open_event (vlib_main_t * vm, mc_stream_t * stream)
{
vlib_one_time_waiting_process_t *p;
if (pool_elts (stream->retry_pool) >= stream->config.window_size)
return;
vec_foreach (p, stream->procs_waiting_for_open_window)
vlib_signal_one_time_waiting_process (vm, p);
if (stream->procs_waiting_for_open_window)
_vec_len (stream->procs_waiting_for_open_window) = 0;
}
static void
mc_retry_free (mc_main_t * mcm, mc_stream_t * s, mc_retry_t * r)
{
mc_retry_t record, *retp;
if (r->unacked_by_peer_bitmap)
_vec_len (r->unacked_by_peer_bitmap) = 0;
if (clib_fifo_elts (s->retired_fifo) >= 2 * s->config.window_size)
{
clib_fifo_sub1 (s->retired_fifo, record);
vlib_buffer_free_one (mcm->vlib_main, record.buffer_index);
}
clib_fifo_add2 (s->retired_fifo, retp);
retp->buffer_index = r->buffer_index;
retp->local_sequence = r->local_sequence;
r->buffer_index = ~0; /* poison buffer index in this retry */
}
static void
mc_resend_retired (mc_main_t * mcm, mc_stream_t * s, u32 local_sequence)
{
mc_retry_t *retry;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "resend-retired: search for local seq %d",
.format_args = "i4",
};
/* *INDENT-ON* */
struct
{
u32 local_sequence;
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->local_sequence = local_sequence;
}
/* *INDENT-OFF* */
clib_fifo_foreach (retry, s->retired_fifo,
({
if (retry->local_sequence == local_sequence)
{
elog_tx_msg (mcm, s->index, retry-> local_sequence, -13);
mcm->transport.tx_buffer (mcm->transport.opaque,
MC_TRANSPORT_USER_REQUEST_TO_RELAY,
retry->buffer_index);
return;
}
}));
/* *INDENT-ON* */
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "resend-retired: FAILED search for local seq %d",
.format_args = "i4",
};
/* *INDENT-ON* */
struct
{
u32 local_sequence;
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->local_sequence = local_sequence;
}
}
static uword *
delete_retry_fifo_elt (mc_main_t * mcm,
mc_stream_t * stream,
mc_retry_t * r, uword * dead_peer_bitmap)
{
mc_stream_peer_t *p;
/* *INDENT-OFF* */
pool_foreach (p, stream->peers, ({
uword pi = p - stream->peers;
uword is_alive = 0 == clib_bitmap_get (r->unacked_by_peer_bitmap, pi);
if (! is_alive)
dead_peer_bitmap = clib_bitmap_ori (dead_peer_bitmap, pi);
if (MC_EVENT_LOGGING > 0)
{
ELOG_TYPE_DECLARE (e) = {
.format = "delete_retry_fifo_elt: peer %s is %s",
.format_args = "T4t4",
.n_enum_strings = 2,
.enum_strings = { "alive", "dead", },
};
struct { u32 peer, is_alive; } * ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
ed->is_alive = is_alive;
}
}));
/* *INDENT-ON* */
hash_unset (stream->retry_index_by_local_sequence, r->local_sequence);
mc_retry_free (mcm, stream, r);
return dead_peer_bitmap;
}
always_inline mc_retry_t *
prev_retry (mc_stream_t * s, mc_retry_t * r)
{
return (r->prev_index != ~0
? pool_elt_at_index (s->retry_pool, r->prev_index) : 0);
}
always_inline mc_retry_t *
next_retry (mc_stream_t * s, mc_retry_t * r)
{
return (r->next_index != ~0
? pool_elt_at_index (s->retry_pool, r->next_index) : 0);
}
always_inline void
remove_retry_from_pool (mc_stream_t * s, mc_retry_t * r)
{
mc_retry_t *p = prev_retry (s, r);
mc_retry_t *n = next_retry (s, r);
if (p)
p->next_index = r->next_index;
else
s->retry_head_index = r->next_index;
if (n)
n->prev_index = r->prev_index;
else
s->retry_tail_index = r->prev_index;
pool_put_index (s->retry_pool, r - s->retry_pool);
}
static void
check_retry (mc_main_t * mcm, mc_stream_t * s)
{
mc_retry_t *r;
vlib_main_t *vm = mcm->vlib_main;
f64 now = vlib_time_now (vm);
uword *dead_peer_bitmap = 0;
u32 ri, ri_next;
for (ri = s->retry_head_index; ri != ~0; ri = ri_next)
{
r = pool_elt_at_index (s->retry_pool, ri);
ri_next = r->next_index;
if (now < r->sent_at + s->config.retry_interval)
continue;
r->n_retries += 1;
if (r->n_retries > s->config.retry_limit)
{
dead_peer_bitmap =
delete_retry_fifo_elt (mcm, s, r, dead_peer_bitmap);
remove_retry_from_pool (s, r);
}
else
{
if (MC_EVENT_LOGGING > 0)
{
mc_stream_peer_t *p;
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "resend local seq %d attempt %d",
.format_args = "i4i4",
};
/* *INDENT-ON* */
/* *INDENT-OFF* */
pool_foreach (p, s->peers, ({
if (clib_bitmap_get (r->unacked_by_peer_bitmap, p - s->peers))
{
ELOG_TYPE_DECLARE (ev) = {
.format = "resend: needed by peer %s local seq %d",
.format_args = "T4i4",
};
struct { u32 peer, rx_sequence; } * ed;
ed = ELOG_DATA (mcm->elog_main, ev);
ed->peer = elog_id_for_peer_id (mcm, p->id.as_u64);
ed->rx_sequence = r->local_sequence;
}
}));
/* *INDENT-ON* */
struct
{
u32 sequence;
u32 trail;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->sequence = r->local_sequence;
ed->trail = r->n_retries;
}
r->sent_at = vlib_time_now (vm);
s->stats.n_retries += 1;
elog_tx_msg (mcm, s->index, r->local_sequence, r->n_retries);
mcm->transport.tx_buffer
(mcm->transport.opaque,
MC_TRANSPORT_USER_REQUEST_TO_RELAY, r->buffer_index);
}
}
maybe_send_window_open_event (mcm->vlib_main, s);
/* Delete any dead peers we've found. */
if (!clib_bitmap_is_zero (dead_peer_bitmap))
{
uword i;
/* *INDENT-OFF* */
clib_bitmap_foreach (i, dead_peer_bitmap, ({
delete_peer_with_index (mcm, s, i, /* notify_application */ 1);
/* Delete any references to just deleted peer in retry pool. */
pool_foreach (r, s->retry_pool, ({
r->unacked_by_peer_bitmap =
clib_bitmap_andnoti (r->unacked_by_peer_bitmap, i);
}));
}));
/* *INDENT-ON* */
clib_bitmap_free (dead_peer_bitmap);
}
}
always_inline mc_main_t *
mc_node_get_main (vlib_node_runtime_t * node)
{
mc_main_t **p = (void *) node->runtime_data;
return p[0];
}
static uword
mc_retry_process (vlib_main_t * vm,
vlib_node_runtime_t * node, vlib_frame_t * f)
{
mc_main_t *mcm = mc_node_get_main (node);
mc_stream_t *s;
while (1)
{
vlib_process_suspend (vm, 1.0);
vec_foreach (s, mcm->stream_vector)
{
if (s->state != MC_STREAM_STATE_invalid)
check_retry (mcm, s);
}
}
return 0; /* not likely */
}
static void
send_join_or_leave_request (mc_main_t * mcm, u32 stream_index, u32 is_join)
{
vlib_main_t *vm = mcm->vlib_main;
mc_msg_join_or_leave_request_t *mp;
u32 bi;
mp = mc_get_vlib_buffer (vm, sizeof (mp[0]), &bi);
memset (mp, 0, sizeof (*mp));
mp->type = MC_MSG_TYPE_join_or_leave_request;
mp->peer_id = mcm->transport.our_ack_peer_id;
mp->stream_index = stream_index;
mp->is_join = is_join;
mc_byte_swap_msg_join_or_leave_request (mp);
/*
* These msgs are unnumbered, unordered so send on the from-relay
* channel.
*/
mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
}
static uword
mc_join_ager_process (vlib_main_t * vm,
vlib_node_runtime_t * node, vlib_frame_t * f)
{
mc_main_t *mcm = mc_node_get_main (node);
while (1)
{
if (mcm->joins_in_progress)
{
mc_stream_t *s;
vlib_one_time_waiting_process_t *p;
f64 now = vlib_time_now (vm);
vec_foreach (s, mcm->stream_vector)
{
if (s->state != MC_STREAM_STATE_join_in_progress)
continue;
if (now > s->join_timeout)
{
s->state = MC_STREAM_STATE_ready;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "stream %d join timeout",
};
/* *INDENT-ON* */
ELOG (mcm->elog_main, e, s->index);
}
/* Make sure that this app instance exists as a stream peer,
or we may answer a catchup request with a NULL
all_peer_bitmap... */
(void) get_or_create_peer_with_id
(mcm, s, mcm->transport.our_ack_peer_id, /* created */ 0);
vec_foreach (p, s->procs_waiting_for_join_done)
vlib_signal_one_time_waiting_process (vm, p);
if (s->procs_waiting_for_join_done)
_vec_len (s->procs_waiting_for_join_done) = 0;
mcm->joins_in_progress--;
ASSERT (mcm->joins_in_progress >= 0);
}
else
{
/* Resent join request which may have been lost. */
send_join_or_leave_request (mcm, s->index, 1 /* is_join */ );
/* We're *not* alone, retry for as long as it takes */
if (mcm->relay_state == MC_RELAY_STATE_SLAVE)
s->join_timeout = vlib_time_now (vm) + 2.0;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "stream %d resend join request",
};
/* *INDENT-ON* */
ELOG (mcm->elog_main, e, s->index);
}
}
}
}
vlib_process_suspend (vm, .5);
}
return 0; /* not likely */
}
static void
serialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
{
char *name = va_arg (*va, char *);
serialize_cstring (m, name);
}
static void
elog_stream_name (char *buf, int n_buf_bytes, char *v)
{
clib_memcpy (buf, v, clib_min (n_buf_bytes - 1, vec_len (v)));
buf[n_buf_bytes - 1] = 0;
}
static void
unserialize_mc_register_stream_name (serialize_main_t * m, va_list * va)
{
mc_main_t *mcm = va_arg (*va, mc_main_t *);
char *name;
mc_stream_t *s;
uword *p;
unserialize_cstring (m, &name);
if ((p = hash_get_mem (mcm->stream_index_by_name, name)))
{
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "stream index %d already named %s",
.format_args = "i4s16",
};
/* *INDENT-ON* */
struct
{
u32 stream_index;
char name[16];
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->stream_index = p[0];
elog_stream_name (ed->name, sizeof (ed->name), name);
}
vec_free (name);
return;
}
vec_add2 (mcm->stream_vector, s, 1);
mc_stream_init (s);
s->state = MC_STREAM_STATE_name_known;
s->index = s - mcm->stream_vector;
s->config.name = name;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "stream index %d named %s",
.format_args = "i4s16",
};
/* *INDENT-ON* */
struct
{
u32 stream_index;
char name[16];
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->stream_index = s->index;
elog_stream_name (ed->name, sizeof (ed->name), name);
}
hash_set_mem (mcm->stream_index_by_name, name, s->index);
p = hash_get (mcm->procs_waiting_for_stream_name_by_name, name);
if (p)
{
vlib_one_time_waiting_process_t *wp, **w;
w = pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool, p[0]);
vec_foreach (wp, w[0])
vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
pool_put (mcm->procs_waiting_for_stream_name_pool, w);
hash_unset_mem (mcm->procs_waiting_for_stream_name_by_name, name);
}
}
/* *INDENT-OFF* */
MC_SERIALIZE_MSG (mc_register_stream_name_msg, static) =
{
.name = "mc_register_stream_name",
.serialize = serialize_mc_register_stream_name,
.unserialize = unserialize_mc_register_stream_name,
};
/* *INDENT-ON* */
void
mc_rx_buffer_unserialize (mc_main_t * mcm,
mc_stream_t * stream,
mc_peer_id_t peer_id, u32 buffer_index)
{
return mc_unserialize (mcm, stream, buffer_index);
}
static u8 *
mc_internal_catchup_snapshot (mc_main_t * mcm,
u8 * data_vector,
u32 last_global_sequence_processed)
{
serialize_main_t m;
/* Append serialized data to data vector. */
serialize_open_vector (&m, data_vector);
m.stream.current_buffer_index = vec_len (data_vector);
serialize (&m, serialize_mc_main, mcm);
return serialize_close_vector (&m);
}
static void
mc_internal_catchup (mc_main_t * mcm, u8 * data, u32 n_data_bytes)
{
serialize_main_t s;
unserialize_open_data (&s, data, n_data_bytes);
unserialize (&s, unserialize_mc_main, mcm);
}
/* Overridden from the application layer, not actually used here */
void mc_stream_join_process_hold (void) __attribute__ ((weak));
void
mc_stream_join_process_hold (void)
{
}
static u32
mc_stream_join_helper (mc_main_t * mcm,
mc_stream_config_t * config, u32 is_internal)
{
mc_stream_t *s;
vlib_main_t *vm = mcm->vlib_main;
s = 0;
if (!is_internal)
{
uword *p;
/* Already have a stream with given name? */
if ((s = mc_stream_by_name (mcm, config->name)))
{
/* Already joined and ready? */
if (s->state == MC_STREAM_STATE_ready)
return s->index;
}
/* First join MC internal stream. */
if (!mcm->stream_vector
|| (mcm->stream_vector[MC_STREAM_INDEX_INTERNAL].state
== MC_STREAM_STATE_invalid))
{
static mc_stream_config_t c = {
.name = "mc-internal",
.rx_buffer = mc_rx_buffer_unserialize,
.catchup = mc_internal_catchup,
.catchup_snapshot = mc_internal_catchup_snapshot,
};
c.save_snapshot = config->save_snapshot;
mc_stream_join_helper (mcm, &c, /* is_internal */ 1);
}
/* If stream is still unknown register this name and wait for
sequenced message to name stream. This way all peers agree
on stream name to index mappings. */
s = mc_stream_by_name (mcm, config->name);
if (!s)
{
vlib_one_time_waiting_process_t *wp, **w;
u8 *name_copy = format (0, "%s", config->name);
mc_serialize_stream (mcm,
MC_STREAM_INDEX_INTERNAL,
&mc_register_stream_name_msg, config->name);
/* Wait for this stream to be named. */
p =
hash_get_mem (mcm->procs_waiting_for_stream_name_by_name,
name_copy);
if (p)
w =
pool_elt_at_index (mcm->procs_waiting_for_stream_name_pool,
p[0]);
else
{
pool_get (mcm->procs_waiting_for_stream_name_pool, w);
if (!mcm->procs_waiting_for_stream_name_by_name)
mcm->procs_waiting_for_stream_name_by_name = hash_create_string ( /* elts */ 0, /* value size */
sizeof
(uword));
hash_set_mem (mcm->procs_waiting_for_stream_name_by_name,
name_copy,
w - mcm->procs_waiting_for_stream_name_pool);
w[0] = 0;
}
vec_add2 (w[0], wp, 1);
vlib_current_process_wait_for_one_time_event (vm, wp);
vec_free (name_copy);
}
/* Name should be known now. */
s = mc_stream_by_name (mcm, config->name);
ASSERT (s != 0);
ASSERT (s->state == MC_STREAM_STATE_name_known);
}
if (!s)
{
vec_add2 (mcm->stream_vector, s, 1);
mc_stream_init (s);
s->index = s - mcm->stream_vector;
}
{
/* Save name since we could have already used it as hash key. */
char *name_save = s->config.name;
s->config = config[0];
if (name_save)
s->config.name = name_save;
}
if (s->config.window_size == 0)
s->config.window_size = 8;
if (s->config.retry_interval == 0.0)
s->config.retry_interval = 1.0;
/* Sanity. */
ASSERT (s->config.retry_interval < 30);
if (s->config.retry_limit == 0)
s->config.retry_limit = 7;
s->state = MC_STREAM_STATE_join_in_progress;
if (!s->peer_index_by_id.hash)
mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
/* If we don't hear from someone in 5 seconds, we're alone */
s->join_timeout = vlib_time_now (vm) + 5.0;
mcm->joins_in_progress++;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "stream index %d join request %s",
.format_args = "i4s16",
};
/* *INDENT-ON* */
struct
{
u32 stream_index;
char name[16];
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->stream_index = s->index;
elog_stream_name (ed->name, sizeof (ed->name), s->config.name);
}
send_join_or_leave_request (mcm, s->index, 1 /* join */ );
vlib_current_process_wait_for_one_time_event_vector
(vm, &s->procs_waiting_for_join_done);
if (MC_EVENT_LOGGING)
{
ELOG_TYPE (e, "join complete stream %d");
ELOG (mcm->elog_main, e, s->index);
}
return s->index;
}
u32
mc_stream_join (mc_main_t * mcm, mc_stream_config_t * config)
{
return mc_stream_join_helper (mcm, config, /* is_internal */ 0);
}
void
mc_stream_leave (mc_main_t * mcm, u32 stream_index)
{
mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
if (!s)
return;
if (MC_EVENT_LOGGING)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "leave-stream: %d",.format_args = "i4",
};
/* *INDENT-ON* */
struct
{
u32 index;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->index = stream_index;
}
send_join_or_leave_request (mcm, stream_index, 0 /* is_join */ );
mc_stream_free (s);
s->state = MC_STREAM_STATE_name_known;
}
void
mc_msg_join_or_leave_request_handler (mc_main_t * mcm,
mc_msg_join_or_leave_request_t * req,
u32 buffer_index)
{
mc_stream_t *s;
mc_msg_join_reply_t *rep;
u32 bi;
mc_byte_swap_msg_join_or_leave_request (req);
s = mc_stream_by_index (mcm, req->stream_index);
if (!s || s->state != MC_STREAM_STATE_ready)
return;
/* If the peer is joining, create it */
if (req->is_join)
{
mc_stream_t *this_s;
/* We're not in a position to catch up a peer until all
stream joins are complete. */
if (0)
{
/* XXX This is hard to test so we've. */
vec_foreach (this_s, mcm->stream_vector)
{
if (this_s->state != MC_STREAM_STATE_ready
&& this_s->state != MC_STREAM_STATE_name_known)
return;
}
}
else if (mcm->joins_in_progress > 0)
return;
(void) get_or_create_peer_with_id (mcm, s, req->peer_id,
/* created */ 0);
rep = mc_get_vlib_buffer (mcm->vlib_main, sizeof (rep[0]), &bi);
memset (rep, 0, sizeof (rep[0]));
rep->type = MC_MSG_TYPE_join_reply;
rep->stream_index = req->stream_index;
mc_byte_swap_msg_join_reply (rep);
/* These two are already in network byte order... */
rep->peer_id = mcm->transport.our_ack_peer_id;
rep->catchup_peer_id = mcm->transport.our_catchup_peer_id;
mcm->transport.tx_buffer (mcm->transport.opaque, MC_TRANSPORT_JOIN, bi);
}
else
{
if (s->config.peer_died)
s->config.peer_died (mcm, s, req->peer_id);
}
}
void
mc_msg_join_reply_handler (mc_main_t * mcm,
mc_msg_join_reply_t * mp, u32 buffer_index)
{
mc_stream_t *s;
mc_byte_swap_msg_join_reply (mp);
s = mc_stream_by_index (mcm, mp->stream_index);
if (!s || s->state != MC_STREAM_STATE_join_in_progress)
return;
/* Switch to catchup state; next join reply
for this stream will be ignored. */
s->state = MC_STREAM_STATE_catchup;
mcm->joins_in_progress--;
mcm->transport.catchup_request_fun (mcm->transport.opaque,
mp->stream_index, mp->catchup_peer_id);
}
void
mc_wait_for_stream_ready (mc_main_t * m, char *stream_name)
{
mc_stream_t *s;
while (1)
{
s = mc_stream_by_name (m, stream_name);
if (s)
break;
vlib_process_suspend (m->vlib_main, .1);
}
/* It's OK to send a message in catchup and ready states. */
if (s->state == MC_STREAM_STATE_catchup
|| s->state == MC_STREAM_STATE_ready)
return;
/* Otherwise we are waiting for a join to finish. */
vlib_current_process_wait_for_one_time_event_vector
(m->vlib_main, &s->procs_waiting_for_join_done);
}
u32
mc_stream_send (mc_main_t * mcm, u32 stream_index, u32 buffer_index)
{
mc_stream_t *s = mc_stream_by_index (mcm, stream_index);
vlib_main_t *vm = mcm->vlib_main;
mc_retry_t *r;
mc_msg_user_request_t *mp;
vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
u32 ri;
if (!s)
return 0;
if (s->state != MC_STREAM_STATE_ready)
vlib_current_process_wait_for_one_time_event_vector
(vm, &s->procs_waiting_for_join_done);
while (pool_elts (s->retry_pool) >= s->config.window_size)
{
vlib_current_process_wait_for_one_time_event_vector
(vm, &s->procs_waiting_for_open_window);
}
pool_get (s->retry_pool, r);
ri = r - s->retry_pool;
r->prev_index = s->retry_tail_index;
r->next_index = ~0;
s->retry_tail_index = ri;
if (r->prev_index == ~0)
s->retry_head_index = ri;
else
{
mc_retry_t *p = pool_elt_at_index (s->retry_pool, r->prev_index);
p->next_index = ri;
}
vlib_buffer_advance (b, -sizeof (mp[0]));
mp = vlib_buffer_get_current (b);
mp->peer_id = mcm->transport.our_ack_peer_id;
/* mp->transport.global_sequence set by relay agent. */
mp->global_sequence = 0xdeadbeef;
mp->stream_index = s->index;
mp->local_sequence = s->our_local_sequence++;
mp->n_data_bytes =
vlib_buffer_index_length_in_chain (vm, buffer_index) - sizeof (mp[0]);
r->buffer_index = buffer_index;
r->local_sequence = mp->local_sequence;
r->sent_at = vlib_time_now (vm);
r->n_retries = 0;
/* Retry will be freed when all currently known peers have acked. */
vec_validate (r->unacked_by_peer_bitmap, vec_len (s->all_peer_bitmap) - 1);
vec_copy (r->unacked_by_peer_bitmap, s->all_peer_bitmap);
hash_set (s->retry_index_by_local_sequence, r->local_sequence,
r - s->retry_pool);
elog_tx_msg (mcm, s->index, mp->local_sequence, r->n_retries);
mc_byte_swap_msg_user_request (mp);
mcm->transport.tx_buffer (mcm->transport.opaque,
MC_TRANSPORT_USER_REQUEST_TO_RELAY, buffer_index);
s->user_requests_sent++;
/* return amount of window remaining */
return s->config.window_size - pool_elts (s->retry_pool);
}
void
mc_msg_user_request_handler (mc_main_t * mcm, mc_msg_user_request_t * mp,
u32 buffer_index)
{
vlib_main_t *vm = mcm->vlib_main;
mc_stream_t *s;
mc_stream_peer_t *peer;
i32 seq_cmp_result;
static int once = 0;
mc_byte_swap_msg_user_request (mp);
s = mc_stream_by_index (mcm, mp->stream_index);
/* Not signed up for this stream? Turf-o-matic */
if (!s || s->state != MC_STREAM_STATE_ready)
{
vlib_buffer_free_one (vm, buffer_index);
return;
}
/* Find peer, including ourselves. */
peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
/* created */ 0);
seq_cmp_result = mc_seq_cmp (mp->local_sequence,
peer->last_sequence_received + 1);
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "rx-msg: peer %s stream %d rx seq %d seq_cmp %d",
.format_args = "T4i4i4i4",
};
/* *INDENT-ON* */
struct
{
u32 peer, stream_index, rx_sequence;
i32 seq_cmp_result;
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
ed->stream_index = mp->stream_index;
ed->rx_sequence = mp->local_sequence;
ed->seq_cmp_result = seq_cmp_result;
}
if (0 && mp->stream_index == 1 && once == 0)
{
once = 1;
ELOG_TYPE (e, "FAKE lost msg on stream 1");
ELOG (mcm->elog_main, e, 0);
return;
}
peer->last_sequence_received += seq_cmp_result == 0;
s->user_requests_received++;
if (seq_cmp_result > 0)
peer->stats.n_msgs_from_future += 1;
/* Send ack even if msg from future */
if (1)
{
mc_msg_user_ack_t *rp;
u32 bi;
rp = mc_get_vlib_buffer (vm, sizeof (rp[0]), &bi);
rp->peer_id = mcm->transport.our_ack_peer_id;
rp->stream_index = s->index;
rp->local_sequence = mp->local_sequence;
rp->seq_cmp_result = seq_cmp_result;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "tx-ack: stream %d local seq %d",
.format_args = "i4i4",
};
/* *INDENT-ON* */
struct
{
u32 stream_index;
u32 local_sequence;
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->stream_index = rp->stream_index;
ed->local_sequence = rp->local_sequence;
}
mc_byte_swap_msg_user_ack (rp);
mcm->transport.tx_ack (mcm->transport.opaque, mp->peer_id, bi);
/* Msg from past? If so, free the buffer... */
if (seq_cmp_result < 0)
{
vlib_buffer_free_one (vm, buffer_index);
peer->stats.n_msgs_from_past += 1;
}
}
if (seq_cmp_result == 0)
{
vlib_buffer_t *b = vlib_get_buffer (vm, buffer_index);
switch (s->state)
{
case MC_STREAM_STATE_ready:
vlib_buffer_advance (b, sizeof (mp[0]));
s->config.rx_buffer (mcm, s, mp->peer_id, buffer_index);
/* Stream vector can change address via rx callback for mc-internal
stream. */
s = mc_stream_by_index (mcm, mp->stream_index);
ASSERT (s != 0);
s->last_global_sequence_processed = mp->global_sequence;
break;
case MC_STREAM_STATE_catchup:
clib_fifo_add1 (s->catchup_fifo, buffer_index);
break;
default:
clib_warning ("stream in unknown state %U",
format_mc_stream_state, s->state);
break;
}
}
}
void
mc_msg_user_ack_handler (mc_main_t * mcm, mc_msg_user_ack_t * mp,
u32 buffer_index)
{
vlib_main_t *vm = mcm->vlib_main;
uword *p;
mc_stream_t *s;
mc_stream_peer_t *peer;
mc_retry_t *r;
int peer_created = 0;
mc_byte_swap_msg_user_ack (mp);
s = mc_stream_by_index (mcm, mp->stream_index);
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "rx-ack: local seq %d peer %s seq_cmp_result %d",
.format_args = "i4T4i4",
};
/* *INDENT-ON* */
struct
{
u32 local_sequence;
u32 peer;
i32 seq_cmp_result;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->local_sequence = mp->local_sequence;
ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
ed->seq_cmp_result = mp->seq_cmp_result;
}
/* Unknown stream? */
if (!s)
return;
/* Find the peer which just ack'ed. */
peer = get_or_create_peer_with_id (mcm, s, mp->peer_id,
/* created */ &peer_created);
/*
* Peer reports message from the future. If it's not in the retry
* fifo, look for a retired message.
*/
if (mp->seq_cmp_result > 0)
{
p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence -
mp->seq_cmp_result);
if (p == 0)
mc_resend_retired (mcm, s, mp->local_sequence - mp->seq_cmp_result);
/* Normal retry should fix it... */
return;
}
/*
* Pointer to the indicated retry fifo entry.
* Worth hashing because we could use a window size of 100 or 1000.
*/
p = hash_get (s->retry_index_by_local_sequence, mp->local_sequence);
/*
* Is this a duplicate ACK, received after we've retired the
* fifo entry. This can happen when learning about new
* peers.
*/
if (p == 0)
{
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "ack: for seq %d from peer %s no fifo elt",
.format_args = "i4T4",
};
/* *INDENT-ON* */
struct
{
u32 seq;
u32 peer;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->seq = mp->local_sequence;
ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
}
return;
}
r = pool_elt_at_index (s->retry_pool, p[0]);
/* Make sure that this new peer ACKs our msgs from now on */
if (peer_created)
{
mc_retry_t *later_retry = next_retry (s, r);
while (later_retry)
{
later_retry->unacked_by_peer_bitmap =
clib_bitmap_ori (later_retry->unacked_by_peer_bitmap,
peer - s->peers);
later_retry = next_retry (s, later_retry);
}
}
ASSERT (mp->local_sequence == r->local_sequence);
/* If we weren't expecting to hear from this peer */
if (!peer_created &&
!clib_bitmap_get (r->unacked_by_peer_bitmap, peer - s->peers))
{
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "dup-ack: for seq %d from peer %s",
.format_args = "i4T4",
};
/* *INDENT-ON* */
struct
{
u32 seq;
u32 peer;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->seq = r->local_sequence;
ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
}
if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
return;
}
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "ack: for seq %d from peer %s",
.format_args = "i4T4",
};
/* *INDENT-ON* */
struct
{
u32 seq;
u32 peer;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->seq = mp->local_sequence;
ed->peer = elog_id_for_peer_id (mcm, peer->id.as_u64);
}
r->unacked_by_peer_bitmap =
clib_bitmap_andnoti (r->unacked_by_peer_bitmap, peer - s->peers);
/* Not all clients have ack'ed */
if (!clib_bitmap_is_zero (r->unacked_by_peer_bitmap))
{
return;
}
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "ack: retire fifo elt loc seq %d after %d acks",
.format_args = "i4i4",
};
/* *INDENT-ON* */
struct
{
u32 seq;
u32 npeers;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->seq = r->local_sequence;
ed->npeers = pool_elts (s->peers);
}
hash_unset (s->retry_index_by_local_sequence, mp->local_sequence);
mc_retry_free (mcm, s, r);
remove_retry_from_pool (s, r);
maybe_send_window_open_event (vm, s);
}
#define EVENT_MC_SEND_CATCHUP_DATA 0
static uword
mc_catchup_process (vlib_main_t * vm,
vlib_node_runtime_t * node, vlib_frame_t * f)
{
mc_main_t *mcm = mc_node_get_main (node);
uword *event_data = 0;
mc_catchup_process_arg_t *args;
int i;
while (1)
{
if (event_data)
_vec_len (event_data) = 0;
vlib_process_wait_for_event_with_type (vm, &event_data,
EVENT_MC_SEND_CATCHUP_DATA);
for (i = 0; i < vec_len (event_data); i++)
{
args = pool_elt_at_index (mcm->catchup_process_args, event_data[i]);
mcm->transport.catchup_send_fun (mcm->transport.opaque,
args->catchup_opaque,
args->catchup_snapshot);
/* Send function will free snapshot data vector. */
pool_put (mcm->catchup_process_args, args);
}
}
return 0; /* not likely */
}
static void
serialize_mc_stream (serialize_main_t * m, va_list * va)
{
mc_stream_t *s = va_arg (*va, mc_stream_t *);
mc_stream_peer_t *p;
serialize_integer (m, pool_elts (s->peers), sizeof (u32));
/* *INDENT-OFF* */
pool_foreach (p, s->peers, ({
u8 * x = serialize_get (m, sizeof (p->id));
clib_memcpy (x, p->id.as_u8, sizeof (p->id));
serialize_integer (m, p->last_sequence_received,
sizeof (p->last_sequence_received));
}));
/* *INDENT-ON* */
serialize_bitmap (m, s->all_peer_bitmap);
}
void
unserialize_mc_stream (serialize_main_t * m, va_list * va)
{
mc_stream_t *s = va_arg (*va, mc_stream_t *);
u32 i, n_peers;
mc_stream_peer_t *p;
unserialize_integer (m, &n_peers, sizeof (u32));
mhash_init (&s->peer_index_by_id, sizeof (uword), sizeof (mc_peer_id_t));
for (i = 0; i < n_peers; i++)
{
u8 *x;
pool_get (s->peers, p);
x = unserialize_get (m, sizeof (p->id));
clib_memcpy (p->id.as_u8, x, sizeof (p->id));
unserialize_integer (m, &p->last_sequence_received,
sizeof (p->last_sequence_received));
mhash_set (&s->peer_index_by_id, &p->id, p - s->peers, /* old_value */
0);
}
s->all_peer_bitmap = unserialize_bitmap (m);
/* This is really bad. */
if (!s->all_peer_bitmap)
clib_warning ("BUG: stream %s all_peer_bitmap NULL", s->config.name);
}
void
mc_msg_catchup_request_handler (mc_main_t * mcm,
mc_msg_catchup_request_t * req,
u32 catchup_opaque)
{
vlib_main_t *vm = mcm->vlib_main;
mc_stream_t *s;
mc_catchup_process_arg_t *args;
mc_byte_swap_msg_catchup_request (req);
s = mc_stream_by_index (mcm, req->stream_index);
if (!s || s->state != MC_STREAM_STATE_ready)
return;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "catchup-request: from %s stream %d",
.format_args = "T4i4",
};
/* *INDENT-ON* */
struct
{
u32 peer, stream;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->peer = elog_id_for_peer_id (mcm, req->peer_id.as_u64);
ed->stream = req->stream_index;
}
/*
* The application has to snapshoot its data structures right
* here, right now. If we process any messages after
* noting the last global sequence we've processed, the client
* won't be able to accurately reconstruct our data structures.
*
* Once the data structures are e.g. vec_dup()'ed, we
* send the resulting messages from a separate process, to
* make sure that we don't cause a bunch of message retransmissions
*/
pool_get (mcm->catchup_process_args, args);
args->stream_index = s - mcm->stream_vector;
args->catchup_opaque = catchup_opaque;
args->catchup_snapshot = 0;
/* Construct catchup reply and snapshot state for stream to send as
catchup reply payload. */
{
mc_msg_catchup_reply_t *rep;
serialize_main_t m;
vec_resize (args->catchup_snapshot, sizeof (rep[0]));
rep = (void *) args->catchup_snapshot;
rep->peer_id = req->peer_id;
rep->stream_index = req->stream_index;
rep->last_global_sequence_included = s->last_global_sequence_processed;
/* Setup for serialize to append to catchup snapshot. */
serialize_open_vector (&m, args->catchup_snapshot);
m.stream.current_buffer_index = vec_len (m.stream.buffer);
serialize (&m, serialize_mc_stream, s);
args->catchup_snapshot = serialize_close_vector (&m);
/* Actually copy internal state */
args->catchup_snapshot = s->config.catchup_snapshot
(mcm, args->catchup_snapshot, rep->last_global_sequence_included);
rep = (void *) args->catchup_snapshot;
rep->n_data_bytes = vec_len (args->catchup_snapshot) - sizeof (rep[0]);
mc_byte_swap_msg_catchup_reply (rep);
}
/* now go send it... */
vlib_process_signal_event (vm, mcm->catchup_process,
EVENT_MC_SEND_CATCHUP_DATA,
args - mcm->catchup_process_args);
}
#define EVENT_MC_UNSERIALIZE_BUFFER 0
#define EVENT_MC_UNSERIALIZE_CATCHUP 1
void
mc_msg_catchup_reply_handler (mc_main_t * mcm, mc_msg_catchup_reply_t * mp,
u32 catchup_opaque)
{
vlib_process_signal_event (mcm->vlib_main,
mcm->unserialize_process,
EVENT_MC_UNSERIALIZE_CATCHUP,
pointer_to_uword (mp));
}
static void
perform_catchup (mc_main_t * mcm, mc_msg_catchup_reply_t * mp)
{
mc_stream_t *s;
i32 seq_cmp_result;
mc_byte_swap_msg_catchup_reply (mp);
s = mc_stream_by_index (mcm, mp->stream_index);
/* Never heard of this stream or already caught up. */
if (!s || s->state == MC_STREAM_STATE_ready)
return;
{
serialize_main_t m;
mc_stream_peer_t *p;
u32 n_stream_bytes;
/* For offline sim replay: save the entire catchup snapshot... */
if (s->config.save_snapshot)
s->config.save_snapshot (mcm, /* is_catchup */ 1, mp->data,
mp->n_data_bytes);
unserialize_open_data (&m, mp->data, mp->n_data_bytes);
unserialize (&m, unserialize_mc_stream, s);
/* Make sure we start numbering our messages as expected */
/* *INDENT-OFF* */
pool_foreach (p, s->peers, ({
if (p->id.as_u64 == mcm->transport.our_ack_peer_id.as_u64)
s->our_local_sequence = p->last_sequence_received + 1;
}));
/* *INDENT-ON* */
n_stream_bytes = m.stream.current_buffer_index;
/* No need to unserialize close; nothing to free. */
/* After serialized stream is user's catchup data. */
s->config.catchup (mcm, mp->data + n_stream_bytes,
mp->n_data_bytes - n_stream_bytes);
}
/* Vector could have been moved by catchup.
This can only happen for mc-internal stream. */
s = mc_stream_by_index (mcm, mp->stream_index);
s->last_global_sequence_processed = mp->last_global_sequence_included;
while (clib_fifo_elts (s->catchup_fifo))
{
mc_msg_user_request_t *gp;
u32 bi;
vlib_buffer_t *b;
clib_fifo_sub1 (s->catchup_fifo, bi);
b = vlib_get_buffer (mcm->vlib_main, bi);
gp = vlib_buffer_get_current (b);
/* Make sure we're replaying "new" news */
seq_cmp_result = mc_seq_cmp (gp->global_sequence,
mp->last_global_sequence_included);
if (seq_cmp_result > 0)
{
vlib_buffer_advance (b, sizeof (gp[0]));
s->config.rx_buffer (mcm, s, gp->peer_id, bi);
s->last_global_sequence_processed = gp->global_sequence;
if (MC_EVENT_LOGGING)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "catchup replay local sequence 0x%x",
.format_args = "i4",
};
/* *INDENT-ON* */
struct
{
u32 local_sequence;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->local_sequence = gp->local_sequence;
}
}
else
{
if (MC_EVENT_LOGGING)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (t) =
{
.format = "catchup discard local sequence 0x%x",
.format_args = "i4",
};
/* *INDENT-ON* */
struct
{
u32 local_sequence;
} *ed;
ed = ELOG_DATA (mcm->elog_main, t);
ed->local_sequence = gp->local_sequence;
}
vlib_buffer_free_one (mcm->vlib_main, bi);
}
}
s->state = MC_STREAM_STATE_ready;
/* Now that we are caught up wake up joining process. */
{
vlib_one_time_waiting_process_t *wp;
vec_foreach (wp, s->procs_waiting_for_join_done)
vlib_signal_one_time_waiting_process (mcm->vlib_main, wp);
if (s->procs_waiting_for_join_done)
_vec_len (s->procs_waiting_for_join_done) = 0;
}
}
static void
this_node_maybe_master (mc_main_t * mcm)
{
vlib_main_t *vm = mcm->vlib_main;
mc_msg_master_assert_t *mp;
uword event_type;
int timeouts = 0;
int is_master = mcm->relay_state == MC_RELAY_STATE_MASTER;
clib_error_t *error;
f64 now, time_last_master_assert = -1;
u32 bi;
while (1)
{
if (!mcm->we_can_be_relay_master)
{
mcm->relay_state = MC_RELAY_STATE_SLAVE;
if (MC_EVENT_LOGGING)
{
ELOG_TYPE (e, "become slave (config)");
ELOG (mcm->elog_main, e, 0);
}
return;
}
now = vlib_time_now (vm);
if (now >= time_last_master_assert + 1)
{
time_last_master_assert = now;
mp = mc_get_vlib_buffer (mcm->vlib_main, sizeof (mp[0]), &bi);
mp->peer_id = mcm->transport.our_ack_peer_id;
mp->global_sequence = mcm->relay_global_sequence;
/*
* these messages clog the event log, set MC_EVENT_LOGGING higher
* if you want them
*/
if (MC_EVENT_LOGGING > 1)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "tx-massert: peer %s global seq %u",
.format_args = "T4i4",
};
/* *INDENT-ON* */
struct
{
u32 peer, global_sequence;
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, mp->peer_id.as_u64);
ed->global_sequence = mp->global_sequence;
}
mc_byte_swap_msg_master_assert (mp);
error =
mcm->transport.tx_buffer (mcm->transport.opaque,
MC_TRANSPORT_MASTERSHIP, bi);
if (error)
clib_error_report (error);
}
vlib_process_wait_for_event_or_clock (vm, 1.0);
event_type = vlib_process_get_events (vm, /* no event data */ 0);
switch (event_type)
{
case ~0:
if (!is_master && timeouts++ > 2)
{
mcm->relay_state = MC_RELAY_STATE_MASTER;
mcm->relay_master_peer_id =
mcm->transport.our_ack_peer_id.as_u64;
if (MC_EVENT_LOGGING)
{
ELOG_TYPE (e, "become master (was maybe_master)");
ELOG (mcm->elog_main, e, 0);
}
return;
}
break;
case MC_RELAY_STATE_SLAVE:
mcm->relay_state = MC_RELAY_STATE_SLAVE;
if (MC_EVENT_LOGGING && mcm->relay_state != MC_RELAY_STATE_SLAVE)
{
ELOG_TYPE (e, "become slave (was maybe_master)");
ELOG (mcm->elog_main, e, 0);
}
return;
}
}
}
static void
this_node_slave (mc_main_t * mcm)
{
vlib_main_t *vm = mcm->vlib_main;
uword event_type;
int timeouts = 0;
if (MC_EVENT_LOGGING)
{
ELOG_TYPE (e, "become slave");
ELOG (mcm->elog_main, e, 0);
}
while (1)
{
vlib_process_wait_for_event_or_clock (vm, 1.0);
event_type = vlib_process_get_events (vm, /* no event data */ 0);
switch (event_type)
{
case ~0:
if (timeouts++ > 2)
{
mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
mcm->relay_master_peer_id = ~0ULL;
if (MC_EVENT_LOGGING)
{
ELOG_TYPE (e, "timeouts; negoitate mastership");
ELOG (mcm->elog_main, e, 0);
}
return;
}
break;
case MC_RELAY_STATE_SLAVE:
mcm->relay_state = MC_RELAY_STATE_SLAVE;
timeouts = 0;
break;
}
}
}
static uword
mc_mastership_process (vlib_main_t * vm,
vlib_node_runtime_t * node, vlib_frame_t * f)
{
mc_main_t *mcm = mc_node_get_main (node);
while (1)
{
switch (mcm->relay_state)
{
case MC_RELAY_STATE_NEGOTIATE:
case MC_RELAY_STATE_MASTER:
this_node_maybe_master (mcm);
break;
case MC_RELAY_STATE_SLAVE:
this_node_slave (mcm);
break;
}
}
return 0; /* not likely */
}
void
mc_enable_disable_mastership (mc_main_t * mcm, int we_can_be_master)
{
if (we_can_be_master != mcm->we_can_be_relay_master)
{
mcm->we_can_be_relay_master = we_can_be_master;
vlib_process_signal_event (mcm->vlib_main,
mcm->mastership_process,
MC_RELAY_STATE_NEGOTIATE, 0);
}
}
void
mc_msg_master_assert_handler (mc_main_t * mcm, mc_msg_master_assert_t * mp,
u32 buffer_index)
{
mc_peer_id_t his_peer_id, our_peer_id;
i32 seq_cmp_result;
u8 signal_slave = 0;
u8 update_global_sequence = 0;
mc_byte_swap_msg_master_assert (mp);
his_peer_id = mp->peer_id;
our_peer_id = mcm->transport.our_ack_peer_id;
/* compare the incoming global sequence with ours */
seq_cmp_result = mc_seq_cmp (mp->global_sequence,
mcm->relay_global_sequence);
/* If the sender has a lower peer id and the sender's sequence >=
our global sequence, we become a slave. Otherwise we are master. */
if (mc_peer_id_compare (his_peer_id, our_peer_id) < 0
&& seq_cmp_result >= 0)
{
vlib_process_signal_event (mcm->vlib_main,
mcm->mastership_process,
MC_RELAY_STATE_SLAVE, 0);
signal_slave = 1;
}
/* Update our global sequence. */
if (seq_cmp_result > 0)
{
mcm->relay_global_sequence = mp->global_sequence;
update_global_sequence = 1;
}
{
uword *q = mhash_get (&mcm->mastership_peer_index_by_id, &his_peer_id);
mc_mastership_peer_t *p;
if (q)
p = vec_elt_at_index (mcm->mastership_peers, q[0]);
else
{
vec_add2 (mcm->mastership_peers, p, 1);
p->peer_id = his_peer_id;
mhash_set (&mcm->mastership_peer_index_by_id, &p->peer_id,
p - mcm->mastership_peers,
/* old_value */ 0);
}
p->time_last_master_assert_received = vlib_time_now (mcm->vlib_main);
}
/*
* these messages clog the event log, set MC_EVENT_LOGGING higher
* if you want them.
*/
if (MC_EVENT_LOGGING > 1)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "rx-massert: peer %s global seq %u upd %d slave %d",
.format_args = "T4i4i1i1",
};
/* *INDENT-ON* */
struct
{
u32 peer;
u32 global_sequence;
u8 update_sequence;
u8 slave;
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->peer = elog_id_for_peer_id (mcm, his_peer_id.as_u64);
ed->global_sequence = mp->global_sequence;
ed->update_sequence = update_global_sequence;
ed->slave = signal_slave;
}
}
static void
mc_serialize_init (mc_main_t * mcm)
{
mc_serialize_msg_t *m;
vlib_main_t *vm = vlib_get_main ();
mcm->global_msg_index_by_name
= hash_create_string ( /* elts */ 0, sizeof (uword));
m = vm->mc_msg_registrations;
while (m)
{
m->global_index = vec_len (mcm->global_msgs);
hash_set_mem (mcm->global_msg_index_by_name, m->name, m->global_index);
vec_add1 (mcm->global_msgs, m);
m = m->next_registration;
}
}
clib_error_t *
mc_serialize_va (mc_main_t * mc,
u32 stream_index,
u32 multiple_messages_per_vlib_buffer,
mc_serialize_msg_t * msg, va_list * va)
{
mc_stream_t *s;
clib_error_t *error;
serialize_main_t *m = &mc->serialize_mains[VLIB_TX];
vlib_serialize_buffer_main_t *sbm = &mc->serialize_buffer_mains[VLIB_TX];
u32 bi, n_before, n_after, n_total, n_this_msg;
u32 si, gi;
if (!sbm->vlib_main)
{
sbm->tx.max_n_data_bytes_per_chain = 4096;
sbm->tx.free_list_index = VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX;
}
if (sbm->first_buffer == 0)
serialize_open_vlib_buffer (m, mc->vlib_main, sbm);
n_before = serialize_vlib_buffer_n_bytes (m);
s = mc_stream_by_index (mc, stream_index);
gi = msg->global_index;
ASSERT (msg == vec_elt (mc->global_msgs, gi));
si = ~0;
if (gi < vec_len (s->stream_msg_index_by_global_index))
si = s->stream_msg_index_by_global_index[gi];
serialize_likely_small_unsigned_integer (m, si);
/* For first time message is sent, use name to identify message. */
if (si == ~0 || MSG_ID_DEBUG)
serialize_cstring (m, msg->name);
if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "serialize-msg: %s index %d",
.format_args = "T4i4",
};
/* *INDENT-ON* */
struct
{
u32 c[2];
} *ed;
ed = ELOG_DATA (mc->elog_main, e);
ed->c[0] = elog_id_for_msg_name (mc, msg->name);
ed->c[1] = si;
}
error = va_serialize (m, va);
n_after = serialize_vlib_buffer_n_bytes (m);
n_this_msg = n_after - n_before;
n_total = n_after + sizeof (mc_msg_user_request_t);
/* For max message size ignore first message where string name is sent. */
if (si != ~0)
msg->max_n_bytes_serialized =
clib_max (msg->max_n_bytes_serialized, n_this_msg);
if (!multiple_messages_per_vlib_buffer
|| si == ~0
|| n_total + msg->max_n_bytes_serialized >
mc->transport.max_packet_size)
{
bi = serialize_close_vlib_buffer (m);
sbm->first_buffer = 0;
if (!error)
mc_stream_send (mc, stream_index, bi);
else if (bi != ~0)
vlib_buffer_free_one (mc->vlib_main, bi);
}
return error;
}
clib_error_t *
mc_serialize_internal (mc_main_t * mc,
u32 stream_index,
u32 multiple_messages_per_vlib_buffer,
mc_serialize_msg_t * msg, ...)
{
vlib_main_t *vm = mc->vlib_main;
va_list va;
clib_error_t *error;
if (stream_index == ~0)
{
if (vm->mc_main && vm->mc_stream_index == ~0)
vlib_current_process_wait_for_one_time_event_vector
(vm, &vm->procs_waiting_for_mc_stream_join);
stream_index = vm->mc_stream_index;
}
va_start (va, msg);
error = mc_serialize_va (mc, stream_index,
multiple_messages_per_vlib_buffer, msg, &va);
va_end (va);
return error;
}
uword
mc_unserialize_message (mc_main_t * mcm,
mc_stream_t * s, serialize_main_t * m)
{
mc_serialize_stream_msg_t *sm;
u32 gi, si;
si = unserialize_likely_small_unsigned_integer (m);
if (!(si == ~0 || MSG_ID_DEBUG))
{
sm = vec_elt_at_index (s->stream_msgs, si);
gi = sm->global_index;
}
else
{
char *name;
unserialize_cstring (m, &name);
if (MSG_ID_DEBUG && MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "unserialize-msg: %s rx index %d",
.format_args = "T4i4",
};
/* *INDENT-ON* */
struct
{
u32 c[2];
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->c[0] = elog_id_for_msg_name (mcm, name);
ed->c[1] = si;
}
{
uword *p = hash_get_mem (mcm->global_msg_index_by_name, name);
gi = p ? p[0] : ~0;
}
/* Unknown message? */
if (gi == ~0)
{
vec_free (name);
goto done;
}
vec_validate_init_empty (s->stream_msg_index_by_global_index, gi, ~0);
si = s->stream_msg_index_by_global_index[gi];
/* Stream local index unknown? Create it. */
if (si == ~0)
{
vec_add2 (s->stream_msgs, sm, 1);
si = sm - s->stream_msgs;
sm->global_index = gi;
s->stream_msg_index_by_global_index[gi] = si;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "msg-bind: stream %d %s to index %d",
.format_args = "i4T4i4",
};
/* *INDENT-ON* */
struct
{
u32 c[3];
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->c[0] = s->index;
ed->c[1] = elog_id_for_msg_name (mcm, name);
ed->c[2] = si;
}
}
else
{
sm = vec_elt_at_index (s->stream_msgs, si);
if (gi != sm->global_index && MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "msg-id-ERROR: %s index %d expected %d",
.format_args = "T4i4i4",
};
/* *INDENT-ON* */
struct
{
u32 c[3];
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->c[0] = elog_id_for_msg_name (mcm, name);
ed->c[1] = si;
ed->c[2] = ~0;
if (sm->global_index <
vec_len (s->stream_msg_index_by_global_index))
ed->c[2] =
s->stream_msg_index_by_global_index[sm->global_index];
}
}
vec_free (name);
}
if (gi != ~0)
{
mc_serialize_msg_t *msg;
msg = vec_elt (mcm->global_msgs, gi);
unserialize (m, msg->unserialize, mcm);
}
done:
return gi != ~0;
}
void
mc_unserialize_internal (mc_main_t * mcm, u32 stream_and_buffer_index)
{
vlib_main_t *vm = mcm->vlib_main;
serialize_main_t *m = &mcm->serialize_mains[VLIB_RX];
vlib_serialize_buffer_main_t *sbm = &mcm->serialize_buffer_mains[VLIB_RX];
mc_stream_and_buffer_t *sb;
mc_stream_t *stream;
u32 buffer_index;
sb =
pool_elt_at_index (mcm->mc_unserialize_stream_and_buffers,
stream_and_buffer_index);
buffer_index = sb->buffer_index;
stream = vec_elt_at_index (mcm->stream_vector, sb->stream_index);
pool_put (mcm->mc_unserialize_stream_and_buffers, sb);
if (stream->config.save_snapshot)
{
u32 n_bytes = vlib_buffer_index_length_in_chain (vm, buffer_index);
static u8 *contents;
vec_reset_length (contents);
vec_validate (contents, n_bytes - 1);
vlib_buffer_contents (vm, buffer_index, contents);
stream->config.save_snapshot (mcm, /* is_catchup */ 0, contents,
n_bytes);
}
ASSERT (vlib_in_process_context (vm));
unserialize_open_vlib_buffer (m, vm, sbm);
clib_fifo_add1 (sbm->rx.buffer_fifo, buffer_index);
while (unserialize_vlib_buffer_n_bytes (m) > 0)
mc_unserialize_message (mcm, stream, m);
/* Frees buffer. */
unserialize_close_vlib_buffer (m);
}
void
mc_unserialize (mc_main_t * mcm, mc_stream_t * s, u32 buffer_index)
{
vlib_main_t *vm = mcm->vlib_main;
mc_stream_and_buffer_t *sb;
pool_get (mcm->mc_unserialize_stream_and_buffers, sb);
sb->stream_index = s->index;
sb->buffer_index = buffer_index;
vlib_process_signal_event (vm, mcm->unserialize_process,
EVENT_MC_UNSERIALIZE_BUFFER,
sb - mcm->mc_unserialize_stream_and_buffers);
}
static uword
mc_unserialize_process (vlib_main_t * vm,
vlib_node_runtime_t * node, vlib_frame_t * f)
{
mc_main_t *mcm = mc_node_get_main (node);
uword event_type, *event_data = 0;
int i;
while (1)
{
if (event_data)
_vec_len (event_data) = 0;
vlib_process_wait_for_event (vm);
event_type = vlib_process_get_events (vm, &event_data);
switch (event_type)
{
case EVENT_MC_UNSERIALIZE_BUFFER:
for (i = 0; i < vec_len (event_data); i++)
mc_unserialize_internal (mcm, event_data[i]);
break;
case EVENT_MC_UNSERIALIZE_CATCHUP:
for (i = 0; i < vec_len (event_data); i++)
{
u8 *mp = uword_to_pointer (event_data[i], u8 *);
perform_catchup (mcm, (void *) mp);
vec_free (mp);
}
break;
default:
break;
}
}
return 0; /* not likely */
}
void
serialize_mc_main (serialize_main_t * m, va_list * va)
{
mc_main_t *mcm = va_arg (*va, mc_main_t *);
mc_stream_t *s;
mc_serialize_stream_msg_t *sm;
mc_serialize_msg_t *msg;
serialize_integer (m, vec_len (mcm->stream_vector), sizeof (u32));
vec_foreach (s, mcm->stream_vector)
{
/* Stream name. */
serialize_cstring (m, s->config.name);
/* Serialize global names for all sent messages. */
serialize_integer (m, vec_len (s->stream_msgs), sizeof (u32));
vec_foreach (sm, s->stream_msgs)
{
msg = vec_elt (mcm->global_msgs, sm->global_index);
serialize_cstring (m, msg->name);
}
}
}
void
unserialize_mc_main (serialize_main_t * m, va_list * va)
{
mc_main_t *mcm = va_arg (*va, mc_main_t *);
u32 i, n_streams, n_stream_msgs;
char *name;
mc_stream_t *s;
mc_serialize_stream_msg_t *sm;
unserialize_integer (m, &n_streams, sizeof (u32));
for (i = 0; i < n_streams; i++)
{
unserialize_cstring (m, &name);
if (i != MC_STREAM_INDEX_INTERNAL && !mc_stream_by_name (mcm, name))
{
vec_validate (mcm->stream_vector, i);
s = vec_elt_at_index (mcm->stream_vector, i);
mc_stream_init (s);
s->index = s - mcm->stream_vector;
s->config.name = name;
s->state = MC_STREAM_STATE_name_known;
hash_set_mem (mcm->stream_index_by_name, s->config.name, s->index);
}
else
vec_free (name);
s = vec_elt_at_index (mcm->stream_vector, i);
vec_free (s->stream_msgs);
vec_free (s->stream_msg_index_by_global_index);
unserialize_integer (m, &n_stream_msgs, sizeof (u32));
vec_resize (s->stream_msgs, n_stream_msgs);
vec_foreach (sm, s->stream_msgs)
{
uword *p;
u32 si, gi;
unserialize_cstring (m, &name);
p = hash_get (mcm->global_msg_index_by_name, name);
gi = p ? p[0] : ~0;
si = sm - s->stream_msgs;
if (MC_EVENT_LOGGING > 0)
{
/* *INDENT-OFF* */
ELOG_TYPE_DECLARE (e) =
{
.format = "catchup-bind: %s to %d global index %d stream %d",
.format_args = "T4i4i4i4",
};
/* *INDENT-ON* */
struct
{
u32 c[4];
} *ed;
ed = ELOG_DATA (mcm->elog_main, e);
ed->c[0] = elog_id_for_msg_name (mcm, name);
ed->c[1] = si;
ed->c[2] = gi;
ed->c[3] = s->index;
}
vec_free (name);
sm->global_index = gi;
if (gi != ~0)
{
vec_validate_init_empty (s->stream_msg_index_by_global_index,
gi, ~0);
s->stream_msg_index_by_global_index[gi] = si;
}
}
}
}
void
mc_main_init (mc_main_t * mcm, char *tag)
{
vlib_main_t *vm = vlib_get_main ();
mcm->vlib_main = vm;
mcm->elog_main = &vm->elog_main;
mcm->relay_master_peer_id = ~0ULL;
mcm->relay_state = MC_RELAY_STATE_NEGOTIATE;
mcm->stream_index_by_name
= hash_create_string ( /* elts */ 0, /* value size */ sizeof (uword));
{
vlib_node_registration_t r;
memset (&r, 0, sizeof (r));
r.type = VLIB_NODE_TYPE_PROCESS;
/* Point runtime data to main instance. */
r.runtime_data = &mcm;
r.runtime_data_bytes = sizeof (&mcm);
r.name = (char *) format (0, "mc-mastership-%s", tag);
r.function = mc_mastership_process;
mcm->mastership_process = vlib_register_node (vm, &r);
r.name = (char *) format (0, "mc-join-ager-%s", tag);
r.function = mc_join_ager_process;
mcm->join_ager_process = vlib_register_node (vm, &r);
r.name = (char *) format (0, "mc-retry-%s", tag);
r.function = mc_retry_process;
mcm->retry_process = vlib_register_node (vm, &r);
r.name = (char *) format (0, "mc-catchup-%s", tag);
r.function = mc_catchup_process;
mcm->catchup_process = vlib_register_node (vm, &r);
r.name = (char *) format (0, "mc-unserialize-%s", tag);
r.function = mc_unserialize_process;
mcm->unserialize_process = vlib_register_node (vm, &r);
}
if (MC_EVENT_LOGGING > 0)
mhash_init (&mcm->elog_id_by_peer_id, sizeof (uword),
sizeof (mc_peer_id_t));
mhash_init (&mcm->mastership_peer_index_by_id, sizeof (uword),
sizeof (mc_peer_id_t));
mc_serialize_init (mcm);
}
static u8 *
format_mc_relay_state (u8 * s, va_list * args)
{
mc_relay_state_t state = va_arg (*args, mc_relay_state_t);
char *t = 0;
switch (state)
{
case MC_RELAY_STATE_NEGOTIATE:
t = "negotiate";
break;
case MC_RELAY_STATE_MASTER:
t = "master";
break;
case MC_RELAY_STATE_SLAVE:
t = "slave";
break;
default:
return format (s, "unknown 0x%x", state);
}
return format (s, "%s", t);
}
static u8 *
format_mc_stream_state (u8 * s, va_list * args)
{
mc_stream_state_t state = va_arg (*args, mc_stream_state_t);
char *t = 0;
switch (state)
{
#define _(f) case MC_STREAM_STATE_##f: t = #f; break;
foreach_mc_stream_state
#undef _
default:
return format (s, "unknown 0x%x", state);
}
return format (s, "%s", t);
}
static int
mc_peer_comp (void *a1, void *a2)
{
mc_stream_peer_t *p1 = a1;
mc_stream_peer_t *p2 = a2;
return mc_peer_id_compare (p1->id, p2->id);
}
u8 *
format_mc_main (u8 * s, va_list * args)
{
mc_main_t *mcm = va_arg (*args, mc_main_t *);
mc_stream_t *t;
mc_stream_peer_t *p, *ps;
u32 indent = format_get_indent (s);
s = format (s, "MC state %U, %d streams joined, global sequence 0x%x",
format_mc_relay_state, mcm->relay_state,
vec_len (mcm->stream_vector), mcm->relay_global_sequence);
{
mc_mastership_peer_t *mp;
f64 now = vlib_time_now (mcm->vlib_main);
s = format (s, "\n%UMost recent mastership peers:",
format_white_space, indent + 2);
vec_foreach (mp, mcm->mastership_peers)
{
s = format (s, "\n%U%-30U%.4e",
format_white_space, indent + 4,
mcm->transport.format_peer_id, mp->peer_id,
now - mp->time_last_master_assert_received);
}
}
vec_foreach (t, mcm->stream_vector)
{
s = format (s, "\n%Ustream `%s' index %d",
format_white_space, indent + 2, t->config.name, t->index);
s = format (s, "\n%Ustate %U",
format_white_space, indent + 4,
format_mc_stream_state, t->state);
s =
format (s,
"\n%Uretries: interval %.0f sec, limit %d, pool elts %d, %Ld sent",
format_white_space, indent + 4, t->config.retry_interval,
t->config.retry_limit, pool_elts (t->retry_pool),
t->stats.n_retries - t->stats_last_clear.n_retries);
s = format (s, "\n%U%Ld/%Ld user requests sent/received",
format_white_space, indent + 4,
t->user_requests_sent, t->user_requests_received);
s = format (s, "\n%U%d peers, local/global sequence 0x%x/0x%x",
format_white_space, indent + 4,
pool_elts (t->peers),
t->our_local_sequence, t->last_global_sequence_processed);
ps = 0;
/* *INDENT-OFF* */
pool_foreach (p, t->peers,
({
if (clib_bitmap_get (t->all_peer_bitmap, p - t->peers))
vec_add1 (ps, p[0]);
}));
/* *INDENT-ON* */
vec_sort_with_function (ps, mc_peer_comp);
s = format (s, "\n%U%=30s%10s%16s%16s",
format_white_space, indent + 6,
"Peer", "Last seq", "Retries", "Future");
vec_foreach (p, ps)
{
s = format (s, "\n%U%-30U0x%08x%16Ld%16Ld%s",
format_white_space, indent + 6,
mcm->transport.format_peer_id, p->id.as_u64,
p->last_sequence_received,
p->stats.n_msgs_from_past -
p->stats_last_clear.n_msgs_from_past,
p->stats.n_msgs_from_future -
p->stats_last_clear.n_msgs_from_future,
(mcm->transport.our_ack_peer_id.as_u64 ==
p->id.as_u64 ? " (self)" : ""));
}
vec_free (ps);
}
return s;
}
/*
* fd.io coding-style-patch-verification: ON
*
* Local Variables:
* eval: (c-set-style "gnu")
* End:
*/