session: add wrk context
Change-Id: I66ca0ddea872948507d078e405eb90f9f3a0e897
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 9c246a1..1802c0e 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -121,26 +121,24 @@
stream_session_t *
session_alloc (u32 thread_index)
{
- session_manager_main_t *smm = &session_manager_main;
+ session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
stream_session_t *s;
u8 will_expand = 0;
- pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand,
+ pool_get_aligned_will_expand (wrk->sessions, will_expand,
CLIB_CACHE_LINE_BYTES);
/* If we have peekers, let them finish */
if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
{
- clib_rwlock_writer_lock (&smm->peekers_rw_locks[thread_index]);
- pool_get_aligned (session_manager_main.sessions[thread_index], s,
- CLIB_CACHE_LINE_BYTES);
- clib_rwlock_writer_unlock (&smm->peekers_rw_locks[thread_index]);
+ clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
+ pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
+ clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
}
else
{
- pool_get_aligned (session_manager_main.sessions[thread_index], s,
- CLIB_CACHE_LINE_BYTES);
+ pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
}
clib_memset (s, 0, sizeof (*s));
- s->session_index = s - session_manager_main.sessions[thread_index];
+ s->session_index = s - wrk->sessions;
s->thread_index = thread_index;
return s;
}
@@ -148,7 +146,7 @@
void
session_free (stream_session_t * s)
{
- pool_put (session_manager_main.sessions[s->thread_index], s);
+ pool_put (session_manager_main.wrk[s->thread_index].sessions, s);
if (CLIB_DEBUG)
clib_memset (s, 0xFA, sizeof (*s));
}
@@ -391,22 +389,19 @@
{
/* Queue RX event on this fifo. Eventually these will need to be flushed
* by calling stream_server_flush_enqueue_events () */
- session_manager_main_t *smm = vnet_get_session_manager_main ();
- u32 thread_index = s->thread_index;
- u64 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index];
+ session_manager_worker_t *wrk;
- if (s->enqueue_epoch != enqueue_epoch)
+ wrk = session_manager_get_worker (s->thread_index);
+ if (s->enqueue_epoch != wrk->current_enqueue_epoch[tc->proto])
{
- s->enqueue_epoch = enqueue_epoch;
- vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
- s->session_index);
+ s->enqueue_epoch = wrk->current_enqueue_epoch[tc->proto];
+ vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
}
}
return enqueued;
}
-
int
session_enqueue_dgram_connection (stream_session_t * s,
session_dgram_hdr_t * hdr,
@@ -432,15 +427,13 @@
{
/* Queue RX event on this fifo. Eventually these will need to be flushed
* by calling stream_server_flush_enqueue_events () */
- session_manager_main_t *smm = vnet_get_session_manager_main ();
- u32 thread_index = s->thread_index;
- u64 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index];
+ session_manager_worker_t *wrk;
- if (s->enqueue_epoch != enqueue_epoch)
+ wrk = session_manager_get_worker (s->thread_index);
+ if (s->enqueue_epoch != wrk->current_enqueue_epoch[proto])
{
- s->enqueue_epoch = enqueue_epoch;
- vec_add1 (smm->session_to_enqueue[proto][thread_index],
- s->session_index);
+ s->enqueue_epoch = wrk->current_enqueue_epoch[proto];
+ vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
}
}
return enqueued;
@@ -539,12 +532,12 @@
int
session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
{
- session_manager_main_t *smm = &session_manager_main;
+ session_manager_worker_t *wrk = session_manager_get_worker (thread_index);
stream_session_t *s;
int i, errors = 0;
u32 *indices;
- indices = smm->session_to_enqueue[transport_proto][thread_index];
+ indices = wrk->session_to_enqueue[transport_proto];
for (i = 0; i < vec_len (indices); i++)
{
@@ -559,8 +552,8 @@
}
vec_reset_length (indices);
- smm->session_to_enqueue[transport_proto][thread_index] = indices;
- smm->current_enqueue_epoch[transport_proto][thread_index]++;
+ wrk->session_to_enqueue[transport_proto] = indices;
+ wrk->current_enqueue_epoch[transport_proto]++;
return errors;
}
@@ -1068,7 +1061,7 @@
stream_session_disconnect (stream_session_t * s)
{
u32 thread_index = vlib_get_thread_index ();
- session_manager_main_t *smm = &session_manager_main;
+ session_manager_worker_t *wrk;
session_event_t *evt;
if (!s)
@@ -1088,7 +1081,8 @@
* held, just append a new event to pending disconnects vector. */
if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
{
- vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+ wrk = session_manager_get_worker (thread_index);
+ vec_add2 (wrk->pending_disconnects, evt, 1);
clib_memset (evt, 0, sizeof (*evt));
evt->session_handle = session_handle (s);
evt->event_type = FIFO_EVENT_DISCONNECT;
@@ -1207,7 +1201,7 @@
else
oldheap = svm_push_data_heap (am->vlib_rp);
- for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
+ for (i = 0; i < vec_len (smm->wrk); i++)
{
svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
u32 notif_q_size = clib_max (16, evt_q_length >> 4);
@@ -1220,10 +1214,10 @@
cfg->n_rings = 2;
cfg->q_nitems = evt_q_length;
cfg->ring_cfgs = rc;
- smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
+ smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
if (smm->evt_qs_use_memfd_seg)
{
- if (svm_msg_q_alloc_consumer_eventfd (smm->vpp_event_queues[i]))
+ if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
clib_warning ("eventfd returned");
}
}
@@ -1342,6 +1336,7 @@
session_manager_main_t *smm = &session_manager_main;
vlib_thread_main_t *vtm = vlib_get_thread_main ();
u32 num_threads, preallocated_sessions_per_worker;
+ session_manager_worker_t *wrk;
int i, j;
num_threads = 1 /* main thread */ + vtm->n_threads;
@@ -1350,38 +1345,28 @@
return clib_error_return (0, "n_thread_stacks not set");
/* configure per-thread ** vectors */
- vec_validate (smm->sessions, num_threads - 1);
- vec_validate (smm->tx_buffers, num_threads - 1);
- vec_validate (smm->pending_event_vector, num_threads - 1);
- vec_validate (smm->pending_disconnects, num_threads - 1);
- vec_validate (smm->free_event_vector, num_threads - 1);
- vec_validate (smm->vpp_event_queues, num_threads - 1);
- vec_validate (smm->peekers_rw_locks, num_threads - 1);
- vec_validate (smm->dispatch_period, num_threads - 1);
- vec_validate (smm->last_vlib_time, num_threads - 1);
- vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
+ vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
for (i = 0; i < TRANSPORT_N_PROTO; i++)
{
- vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
- vec_validate (smm->session_to_enqueue[i], num_threads - 1);
for (j = 0; j < num_threads; j++)
- smm->current_enqueue_epoch[i][j] = 1;
+ smm->wrk[j].current_enqueue_epoch[i] = 1;
}
for (i = 0; i < num_threads; i++)
{
- vec_validate (smm->free_event_vector[i], 0);
- _vec_len (smm->free_event_vector[i]) = 0;
- vec_validate (smm->pending_event_vector[i], 0);
- _vec_len (smm->pending_event_vector[i]) = 0;
- vec_validate (smm->pending_disconnects[i], 0);
- _vec_len (smm->pending_disconnects[i]) = 0;
+ wrk = &smm->wrk[i];
+ vec_validate (wrk->free_event_vector, 0);
+ _vec_len (wrk->free_event_vector) = 0;
+ vec_validate (wrk->pending_event_vector, 0);
+ _vec_len (wrk->pending_event_vector) = 0;
+ vec_validate (wrk->pending_disconnects, 0);
+ _vec_len (wrk->pending_disconnects) = 0;
- smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]);
+ wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
if (num_threads > 1)
- clib_rwlock_init (&smm->peekers_rw_locks[i]);
+ clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
}
#if SESSION_DEBUG
@@ -1401,7 +1386,7 @@
{
if (num_threads == 1)
{
- pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
+ pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
}
else
{
@@ -1412,7 +1397,7 @@
for (j = 1; j < num_threads; j++)
{
- pool_init_fixed (smm->sessions[j],
+ pool_init_fixed (smm->wrk[j].sessions,
preallocated_sessions_per_worker);
}
}