session: add wrk context

Change-Id: I66ca0ddea872948507d078e405eb90f9f3a0e897
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 9c246a1..1802c0e 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -121,26 +121,24 @@
 stream_session_t *
 session_alloc (u32 thread_index)
 {
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk = &session_manager_main.wrk[thread_index];
   stream_session_t *s;
   u8 will_expand = 0;
-  pool_get_aligned_will_expand (smm->sessions[thread_index], will_expand,
+  pool_get_aligned_will_expand (wrk->sessions, will_expand,
 				CLIB_CACHE_LINE_BYTES);
   /* If we have peekers, let them finish */
   if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
     {
-      clib_rwlock_writer_lock (&smm->peekers_rw_locks[thread_index]);
-      pool_get_aligned (session_manager_main.sessions[thread_index], s,
-			CLIB_CACHE_LINE_BYTES);
-      clib_rwlock_writer_unlock (&smm->peekers_rw_locks[thread_index]);
+      clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
+      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
+      clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
     }
   else
     {
-      pool_get_aligned (session_manager_main.sessions[thread_index], s,
-			CLIB_CACHE_LINE_BYTES);
+      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
     }
   clib_memset (s, 0, sizeof (*s));
-  s->session_index = s - session_manager_main.sessions[thread_index];
+  s->session_index = s - wrk->sessions;
   s->thread_index = thread_index;
   return s;
 }
@@ -148,7 +146,7 @@
 void
 session_free (stream_session_t * s)
 {
-  pool_put (session_manager_main.sessions[s->thread_index], s);
+  pool_put (session_manager_main.wrk[s->thread_index].sessions, s);
   if (CLIB_DEBUG)
     clib_memset (s, 0xFA, sizeof (*s));
 }
@@ -391,22 +389,19 @@
     {
       /* Queue RX event on this fifo. Eventually these will need to be flushed
        * by calling stream_server_flush_enqueue_events () */
-      session_manager_main_t *smm = vnet_get_session_manager_main ();
-      u32 thread_index = s->thread_index;
-      u64 enqueue_epoch = smm->current_enqueue_epoch[tc->proto][thread_index];
+      session_manager_worker_t *wrk;
 
-      if (s->enqueue_epoch != enqueue_epoch)
+      wrk = session_manager_get_worker (s->thread_index);
+      if (s->enqueue_epoch != wrk->current_enqueue_epoch[tc->proto])
 	{
-	  s->enqueue_epoch = enqueue_epoch;
-	  vec_add1 (smm->session_to_enqueue[tc->proto][thread_index],
-		    s->session_index);
+	  s->enqueue_epoch = wrk->current_enqueue_epoch[tc->proto];
+	  vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
 	}
     }
 
   return enqueued;
 }
 
-
 int
 session_enqueue_dgram_connection (stream_session_t * s,
 				  session_dgram_hdr_t * hdr,
@@ -432,15 +427,13 @@
     {
       /* Queue RX event on this fifo. Eventually these will need to be flushed
        * by calling stream_server_flush_enqueue_events () */
-      session_manager_main_t *smm = vnet_get_session_manager_main ();
-      u32 thread_index = s->thread_index;
-      u64 enqueue_epoch = smm->current_enqueue_epoch[proto][thread_index];
+      session_manager_worker_t *wrk;
 
-      if (s->enqueue_epoch != enqueue_epoch)
+      wrk = session_manager_get_worker (s->thread_index);
+      if (s->enqueue_epoch != wrk->current_enqueue_epoch[proto])
 	{
-	  s->enqueue_epoch = enqueue_epoch;
-	  vec_add1 (smm->session_to_enqueue[proto][thread_index],
-		    s->session_index);
+	  s->enqueue_epoch = wrk->current_enqueue_epoch[proto];
+	  vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
 	}
     }
   return enqueued;
@@ -539,12 +532,12 @@
 int
 session_manager_flush_enqueue_events (u8 transport_proto, u32 thread_index)
 {
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk = session_manager_get_worker (thread_index);
   stream_session_t *s;
   int i, errors = 0;
   u32 *indices;
 
-  indices = smm->session_to_enqueue[transport_proto][thread_index];
+  indices = wrk->session_to_enqueue[transport_proto];
 
   for (i = 0; i < vec_len (indices); i++)
     {
@@ -559,8 +552,8 @@
     }
 
   vec_reset_length (indices);
-  smm->session_to_enqueue[transport_proto][thread_index] = indices;
-  smm->current_enqueue_epoch[transport_proto][thread_index]++;
+  wrk->session_to_enqueue[transport_proto] = indices;
+  wrk->current_enqueue_epoch[transport_proto]++;
 
   return errors;
 }
@@ -1068,7 +1061,7 @@
 stream_session_disconnect (stream_session_t * s)
 {
   u32 thread_index = vlib_get_thread_index ();
-  session_manager_main_t *smm = &session_manager_main;
+  session_manager_worker_t *wrk;
   session_event_t *evt;
 
   if (!s)
@@ -1088,7 +1081,8 @@
    * held, just append a new event to pending disconnects vector. */
   if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
     {
-      vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+      wrk = session_manager_get_worker (thread_index);
+      vec_add2 (wrk->pending_disconnects, evt, 1);
       clib_memset (evt, 0, sizeof (*evt));
       evt->session_handle = session_handle (s);
       evt->event_type = FIFO_EVENT_DISCONNECT;
@@ -1207,7 +1201,7 @@
   else
     oldheap = svm_push_data_heap (am->vlib_rp);
 
-  for (i = 0; i < vec_len (smm->vpp_event_queues); i++)
+  for (i = 0; i < vec_len (smm->wrk); i++)
     {
       svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
       u32 notif_q_size = clib_max (16, evt_q_length >> 4);
@@ -1220,10 +1214,10 @@
       cfg->n_rings = 2;
       cfg->q_nitems = evt_q_length;
       cfg->ring_cfgs = rc;
-      smm->vpp_event_queues[i] = svm_msg_q_alloc (cfg);
+      smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
       if (smm->evt_qs_use_memfd_seg)
 	{
-	  if (svm_msg_q_alloc_consumer_eventfd (smm->vpp_event_queues[i]))
+	  if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
 	    clib_warning ("eventfd returned");
 	}
     }
@@ -1342,6 +1336,7 @@
   session_manager_main_t *smm = &session_manager_main;
   vlib_thread_main_t *vtm = vlib_get_thread_main ();
   u32 num_threads, preallocated_sessions_per_worker;
+  session_manager_worker_t *wrk;
   int i, j;
 
   num_threads = 1 /* main thread */  + vtm->n_threads;
@@ -1350,38 +1345,28 @@
     return clib_error_return (0, "n_thread_stacks not set");
 
   /* configure per-thread ** vectors */
-  vec_validate (smm->sessions, num_threads - 1);
-  vec_validate (smm->tx_buffers, num_threads - 1);
-  vec_validate (smm->pending_event_vector, num_threads - 1);
-  vec_validate (smm->pending_disconnects, num_threads - 1);
-  vec_validate (smm->free_event_vector, num_threads - 1);
-  vec_validate (smm->vpp_event_queues, num_threads - 1);
-  vec_validate (smm->peekers_rw_locks, num_threads - 1);
-  vec_validate (smm->dispatch_period, num_threads - 1);
-  vec_validate (smm->last_vlib_time, num_threads - 1);
-  vec_validate_aligned (smm->ctx, num_threads - 1, CLIB_CACHE_LINE_BYTES);
+  vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
 
   for (i = 0; i < TRANSPORT_N_PROTO; i++)
     {
-      vec_validate (smm->current_enqueue_epoch[i], num_threads - 1);
-      vec_validate (smm->session_to_enqueue[i], num_threads - 1);
       for (j = 0; j < num_threads; j++)
-	smm->current_enqueue_epoch[i][j] = 1;
+	smm->wrk[j].current_enqueue_epoch[i] = 1;
     }
 
   for (i = 0; i < num_threads; i++)
     {
-      vec_validate (smm->free_event_vector[i], 0);
-      _vec_len (smm->free_event_vector[i]) = 0;
-      vec_validate (smm->pending_event_vector[i], 0);
-      _vec_len (smm->pending_event_vector[i]) = 0;
-      vec_validate (smm->pending_disconnects[i], 0);
-      _vec_len (smm->pending_disconnects[i]) = 0;
+      wrk = &smm->wrk[i];
+      vec_validate (wrk->free_event_vector, 0);
+      _vec_len (wrk->free_event_vector) = 0;
+      vec_validate (wrk->pending_event_vector, 0);
+      _vec_len (wrk->pending_event_vector) = 0;
+      vec_validate (wrk->pending_disconnects, 0);
+      _vec_len (wrk->pending_disconnects) = 0;
 
-      smm->last_vlib_time[i] = vlib_time_now (vlib_mains[i]);
+      wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
 
       if (num_threads > 1)
-	clib_rwlock_init (&smm->peekers_rw_locks[i]);
+	clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
     }
 
 #if SESSION_DEBUG
@@ -1401,7 +1386,7 @@
     {
       if (num_threads == 1)
 	{
-	  pool_init_fixed (smm->sessions[0], smm->preallocated_sessions);
+	  pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
 	}
       else
 	{
@@ -1412,7 +1397,7 @@
 
 	  for (j = 1; j < num_threads; j++)
 	    {
-	      pool_init_fixed (smm->sessions[j],
+	      pool_init_fixed (smm->wrk[j].sessions,
 			       preallocated_sessions_per_worker);
 	    }
 	}