Repair vlib API socket server

- Teach vpp_api_test to send/receive API messages over sockets
- Add memfd-based shared memory
- Add api messages to create memfd-based shared memory segments
- vpp_api_test supports both socket and shared memory segment connections
- vpp_api_test pivot from socket to shared memory API messaging
- add socket client support to libvlibclient.so
- dead client reaper sends ping messages, container-friendly
- dead client reaper falls back to kill (<pid>, 0) live checking
  if e.g. a python app goes silent for tens of seconds
- handle ping messages in python client support code
- teach show api ring about pairwise shared-memory segments
- fix ip probing of already resolved destinations (VPP-998)

We'll need this work to implement proper host-stack client isolation

Change-Id: Ic23b65f75c854d0393d9a2e9d6b122a9551be769
Signed-off-by: Dave Barach <dave@barachs.net>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vlibmemory/memory_vlib.c b/src/vlibmemory/memory_vlib.c
index d305ea6..c9b3183 100644
--- a/src/vlibmemory/memory_vlib.c
+++ b/src/vlibmemory/memory_vlib.c
@@ -96,17 +96,7 @@
 #include <vlibmemory/vl_memory_api_h.h>
 #undef vl_endianfun
 
-void vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
-  __attribute__ ((weak));
-
-void
-vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
-{
-  static int count;
-
-  if (count++ < 5)
-    clib_warning ("need to link against -lvlibsocket, msg not sent!");
-}
+extern void vl_socket_api_send (vl_api_registration_t * rp, u8 * elem);
 
 void
 vl_msg_api_send (vl_api_registration_t * rp, u8 * elem)
@@ -117,7 +107,7 @@
     }
   else
     {
-      vl_msg_api_send_shmem (rp->vl_input_queue, elem);
+      vl_msg_api_send_shmem (rp->vl_input_queue, (u8 *) & elem);
     }
 }
 
@@ -196,6 +186,7 @@
   int rv = 0;
   void *oldheap;
   api_main_t *am = &api_main;
+  u8 *serialized_message_table_in_shmem;
 
   /*
    * This is tortured. Maintain a vlib-address-space private
@@ -235,6 +226,8 @@
   memset (regp, 0, sizeof (*regp));
   regp->registration_type = REGISTRATION_TYPE_SHMEM;
   regp->vl_api_registration_pool_index = regpp - am->vl_clients;
+  regp->vlib_rp = svm;
+  regp->shmem_hdr = am->shmem_hdr;
 
   q = regp->vl_input_queue = (unix_shared_memory_queue_t *) (uword)
     mp->input_queue;
@@ -242,11 +235,11 @@
   regp->name = format (0, "%s", mp->name);
   vec_add1 (regp->name, 0);
 
+  serialized_message_table_in_shmem = vl_api_serialize_message_table (am, 0);
+
   pthread_mutex_unlock (&svm->mutex);
   svm_pop_heap (oldheap);
 
-  ASSERT (am->serialized_message_table_in_shmem);
-
   rp = vl_msg_api_alloc (sizeof (*rp));
   rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_REPLY);
   rp->handle = (uword) regp;
@@ -255,8 +248,7 @@
      am->shmem_hdr->application_restarts);
   rp->context = mp->context;
   rp->response = ntohl (rv);
-  rp->message_table =
-    pointer_to_uword (am->serialized_message_table_in_shmem);
+  rp->message_table = pointer_to_uword (serialized_message_table_in_shmem);
 
   vl_msg_api_send_shmem (q, (u8 *) & rp);
 }
@@ -313,11 +305,15 @@
 
   if (!pool_is_free (am->vl_clients, regpp))
     {
+      int i;
       regp = *regpp;
       svm = am->vlib_rp;
+      int private_registration = 0;
 
-      /* $$$ check the input queue for e.g. punted sf's */
-
+      /*
+       * Note: the API message handling path will set am->vlib_rp
+       * as appropriate for pairwise / private memory segments
+       */
       rp = vl_msg_api_alloc (sizeof (*rp));
       rp->_vl_msg_id = ntohs (VL_API_MEMCLNT_DELETE_REPLY);
       rp->handle = mp->handle;
@@ -333,18 +329,56 @@
 	  return;
 	}
 
+      /* For horizontal scaling, add a hash table... */
+      for (i = 0; i < vec_len (am->vlib_private_rps); i++)
+	{
+	  /* Is this a pairwise / private API segment? */
+	  if (am->vlib_private_rps[i] == svm)
+	    {
+	      /* Note: account for the memfd header page */
+	      u64 virtual_base = svm->virtual_base - MMAP_PAGESIZE;
+	      u64 virtual_size = svm->virtual_size + MMAP_PAGESIZE;
+
+	      /*
+	       * Kill the registration pool element before we make
+	       * the index vanish forever
+	       */
+	      pool_put_index (am->vl_clients,
+			      regp->vl_api_registration_pool_index);
+
+	      vec_delete (am->vlib_private_rps, 1, i);
+	      /* Kill it, accounting for the memfd header page */
+	      if (munmap ((void *) virtual_base, virtual_size) < 0)
+		clib_unix_warning ("munmap");
+	      /* Reset the queue-length-address cache */
+	      vec_reset_length (vl_api_queue_cursizes);
+	      private_registration = 1;
+	      break;
+	    }
+	}
+
       /* No dangling references, please */
       *regpp = 0;
 
-      pool_put_index (am->vl_clients, regp->vl_api_registration_pool_index);
-
-      pthread_mutex_lock (&svm->mutex);
-      oldheap = svm_push_data_heap (svm);
-      /* Poison the old registration */
-      memset (regp, 0xF1, sizeof (*regp));
-      clib_mem_free (regp);
-      pthread_mutex_unlock (&svm->mutex);
-      svm_pop_heap (oldheap);
+      if (private_registration == 0)
+	{
+	  pool_put_index (am->vl_clients,
+			  regp->vl_api_registration_pool_index);
+	  pthread_mutex_lock (&svm->mutex);
+	  oldheap = svm_push_data_heap (svm);
+	  /* Poison the old registration */
+	  memset (regp, 0xF1, sizeof (*regp));
+	  clib_mem_free (regp);
+	  pthread_mutex_unlock (&svm->mutex);
+	  svm_pop_heap (oldheap);
+	  /*
+	   * These messages must be freed manually, since they're set up
+	   * as "bounce" messages. In the private_registration == 1 case,
+	   * we kill the shared-memory segment which contains the message
+	   * with munmap.
+	   */
+	  vl_msg_api_free (mp);
+	}
     }
   else
     {
@@ -392,10 +426,54 @@
   vl_msg_api_send_shmem (q, (u8 *) & rmp);
 }
 
-#define foreach_vlib_api_msg                    \
-_(MEMCLNT_CREATE, memclnt_create)               \
-_(MEMCLNT_DELETE, memclnt_delete)               \
-_(GET_FIRST_MSG_ID, get_first_msg_id)
+/**
+ * client answered a ping, stave off the grim reaper...
+ */
+
+void
+  vl_api_memclnt_keepalive_reply_t_handler
+  (vl_api_memclnt_keepalive_reply_t * mp)
+{
+  vl_api_registration_t *regp;
+  vlib_main_t *vm = vlib_get_main ();
+
+  regp = vl_api_client_index_to_registration (mp->context);
+  if (regp)
+    {
+      regp->last_heard = vlib_time_now (vm);
+      regp->unanswered_pings = 0;
+    }
+  else
+    clib_warning ("BUG: anonymous memclnt_keepalive_reply");
+}
+
+/**
+ * We can send ourselves these messages if someone uses the
+ * builtin binary api test tool...
+ */
+static void
+vl_api_memclnt_keepalive_t_handler (vl_api_memclnt_keepalive_t * mp)
+{
+  vl_api_memclnt_keepalive_reply_t *rmp;
+  api_main_t *am;
+  vl_shmem_hdr_t *shmem_hdr;
+
+  am = &api_main;
+  shmem_hdr = am->shmem_hdr;
+
+  rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
+  memset (rmp, 0, sizeof (*rmp));
+  rmp->_vl_msg_id = ntohs (VL_API_MEMCLNT_KEEPALIVE_REPLY);
+  rmp->context = mp->context;
+  vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) & rmp);
+}
+
+#define foreach_vlib_api_msg                            \
+_(MEMCLNT_CREATE, memclnt_create)                       \
+_(MEMCLNT_DELETE, memclnt_delete)                       \
+_(GET_FIRST_MSG_ID, get_first_msg_id)                   \
+_(MEMCLNT_KEEPALIVE, memclnt_keepalive)                 \
+_(MEMCLNT_KEEPALIVE_REPLY, memclnt_keepalive_reply)
 
 /*
  * vl_api_init
@@ -404,6 +482,7 @@
 memory_api_init (const char *region_name)
 {
   int rv;
+  api_main_t *am = &api_main;
   vl_msg_api_msg_config_t cfg;
   vl_msg_api_msg_config_t *c = &cfg;
 
@@ -428,6 +507,13 @@
   foreach_vlib_api_msg;
 #undef _
 
+  /*
+   * special-case freeing of memclnt_delete messages, so we can
+   * simply munmap pairwise / private API segments...
+   */
+  am->message_bounce[VL_API_MEMCLNT_DELETE] = 1;
+  am->is_mp_safe[VL_API_MEMCLNT_KEEPALIVE_REPLY] = 1;
+
   return 0;
 }
 
@@ -474,6 +560,203 @@
   vl_msg_api_send_shmem (q, (u8 *) & mp);
 }
 
+static void
+send_memclnt_keepalive (vl_api_registration_t * regp, f64 now)
+{
+  vl_api_memclnt_keepalive_t *mp;
+  unix_shared_memory_queue_t *q;
+  api_main_t *am = &api_main;
+  svm_region_t *save_vlib_rp = am->vlib_rp;
+  vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
+
+  q = regp->vl_input_queue;
+
+  /*
+   * If the queue head is moving, assume that the client is processing
+   * messages and skip the ping. This heuristic may fail if the queue
+   * is in the same position as last time, net of wrapping; in which
+   * case, the client will receive a keepalive.
+   */
+  if (regp->last_queue_head != q->head)
+    {
+      regp->last_heard = now;
+      regp->unanswered_pings = 0;
+      regp->last_queue_head = q->head;
+      return;
+    }
+
+  /*
+   * push/pop shared memory segment, so this routine
+   * will work with "normal" as well as "private segment"
+   * memory clients..
+   */
+
+  am->vlib_rp = regp->vlib_rp;
+  am->shmem_hdr = regp->shmem_hdr;
+
+  mp = vl_msg_api_alloc (sizeof (*mp));
+  memset (mp, 0, sizeof (*mp));
+  mp->_vl_msg_id = clib_host_to_net_u16 (VL_API_MEMCLNT_KEEPALIVE);
+  mp->context = mp->client_index =
+    vl_msg_api_handle_from_index_and_epoch
+    (regp->vl_api_registration_pool_index,
+     am->shmem_hdr->application_restarts);
+
+  regp->unanswered_pings++;
+
+  /* Failure-to-send due to a stuffed queue is absolutely expected */
+  if (unix_shared_memory_queue_add (q, (u8 *) & mp, 1 /* nowait */ ))
+    vl_msg_api_free (mp);
+
+  am->vlib_rp = save_vlib_rp;
+  am->shmem_hdr = save_shmem_hdr;
+}
+
+static void
+dead_client_scan (api_main_t * am, vl_shmem_hdr_t * shm, f64 now)
+{
+
+  vl_api_registration_t **regpp;
+  vl_api_registration_t *regp;
+  static u32 *dead_indices;
+  static u32 *confused_indices;
+
+  vec_reset_length (dead_indices);
+  vec_reset_length (confused_indices);
+
+  /* *INDENT-OFF* */
+  pool_foreach (regpp, am->vl_clients,
+  ({
+    regp = *regpp;
+    if (regp)
+      {
+        /* If we haven't heard from this client recently... */
+        if (regp->last_heard < (now - 10.0))
+          {
+            if (regp->unanswered_pings == 2)
+              {
+                unix_shared_memory_queue_t *q;
+                q = regp->vl_input_queue;
+                if (kill (q->consumer_pid, 0) >=0)
+                  {
+                    clib_warning ("REAPER: lazy binary API client '%s'",
+                                  regp->name);
+                    regp->unanswered_pings = 0;
+                    regp->last_heard = now;
+                  }
+                else
+                  {
+                    clib_warning ("REAPER: binary API client '%s' died",
+                                  regp->name);
+                    vec_add1(dead_indices, regpp - am->vl_clients);
+                  }
+              }
+            else
+              send_memclnt_keepalive (regp, now);
+          }
+        else
+          regp->unanswered_pings = 0;
+      }
+    else
+      {
+        clib_warning ("NULL client registration index %d",
+                      regpp - am->vl_clients);
+        vec_add1 (confused_indices, regpp - am->vl_clients);
+      }
+  }));
+  /* *INDENT-ON* */
+  /* This should "never happen," but if it does, fix it... */
+  if (PREDICT_FALSE (vec_len (confused_indices) > 0))
+    {
+      int i;
+      for (i = 0; i < vec_len (confused_indices); i++)
+	{
+	  pool_put_index (am->vl_clients, confused_indices[i]);
+	}
+    }
+
+  if (PREDICT_FALSE (vec_len (dead_indices) > 0))
+    {
+      int i;
+      svm_region_t *svm;
+      void *oldheap;
+
+      /* Allow the application to clean up its registrations */
+      for (i = 0; i < vec_len (dead_indices); i++)
+	{
+	  regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
+	  if (regpp)
+	    {
+	      u32 handle;
+
+	      handle = vl_msg_api_handle_from_index_and_epoch
+		(dead_indices[i], shm->application_restarts);
+	      (void) call_reaper_functions (handle);
+	    }
+	}
+
+      svm = am->vlib_rp;
+      pthread_mutex_lock (&svm->mutex);
+      oldheap = svm_push_data_heap (svm);
+
+      for (i = 0; i < vec_len (dead_indices); i++)
+	{
+	  regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
+	  if (regpp)
+	    {
+	      /* Is this a pairwise SVM segment? */
+	      if ((*regpp)->vlib_rp != svm)
+		{
+		  int i;
+		  svm_region_t *dead_rp = (*regpp)->vlib_rp;
+		  /* Note: account for the memfd header page */
+		  u64 virtual_base = dead_rp->virtual_base - MMAP_PAGESIZE;
+		  u64 virtual_size = dead_rp->virtual_size + MMAP_PAGESIZE;
+
+		  /* For horizontal scaling, add a hash table... */
+		  for (i = 0; i < vec_len (am->vlib_private_rps); i++)
+		    if (am->vlib_private_rps[i] == dead_rp)
+		      {
+			vec_delete (am->vlib_private_rps, 1, i);
+			goto found;
+		      }
+		  clib_warning ("private rp %llx AWOL", dead_rp);
+
+		found:
+		  /* Kill it, accounting for the memfd header page */
+		  if (munmap ((void *) virtual_base, virtual_size) < 0)
+		    clib_unix_warning ("munmap");
+		  /* Reset the queue-length-address cache */
+		  vec_reset_length (vl_api_queue_cursizes);
+		}
+	      else
+		{
+		  /* Poison the old registration */
+		  memset (*regpp, 0xF3, sizeof (**regpp));
+		  clib_mem_free (*regpp);
+		}
+	      /* no dangling references, please */
+	      *regpp = 0;
+	    }
+	  else
+	    {
+	      svm_pop_heap (oldheap);
+	      clib_warning ("Duplicate free, client index %d",
+			    regpp - am->vl_clients);
+	      oldheap = svm_push_data_heap (svm);
+	    }
+	}
+
+      svm_client_scan_this_region_nolock (am->vlib_rp);
+
+      pthread_mutex_unlock (&svm->mutex);
+      svm_pop_heap (oldheap);
+      for (i = 0; i < vec_len (dead_indices); i++)
+	pool_put_index (am->vl_clients, dead_indices[i]);
+    }
+}
+
+
 static uword
 memclnt_process (vlib_main_t * vm,
 		 vlib_node_runtime_t * node, vlib_frame_t * f)
@@ -487,17 +770,29 @@
   f64 dead_client_scan_time;
   f64 sleep_time, start_time;
   f64 vector_rate;
+  clib_error_t *socksvr_api_init (vlib_main_t * vm);
+  clib_error_t *error;
   int i;
-  u8 *serialized_message_table = 0;
-  svm_region_t *svm;
-  void *oldheap;
+  vl_socket_args_for_process_t *a;
+  uword event_type;
+  uword *event_data = 0;
+  int private_segment_rotor = 0;
+  svm_region_t *vlib_rp;
+  f64 now;
 
   vlib_set_queue_signal_callback (vm, memclnt_queue_callback);
 
   if ((rv = memory_api_init (am->region_name)) < 0)
     {
-      clib_warning ("memory_api_init returned %d, wait for godot...", rv);
-      vlib_process_suspend (vm, 1e70);
+      clib_warning ("memory_api_init returned %d, quitting...", rv);
+      return 0;
+    }
+
+  if ((error = socksvr_api_init (vm)))
+    {
+      clib_error_report (error);
+      clib_warning ("socksvr_api_init failed, quitting...");
+      return 0;
     }
 
   shm = am->shmem_hdr;
@@ -510,8 +805,8 @@
   if (e)
     clib_error_report (e);
 
-  sleep_time = 20.0;
-  dead_client_scan_time = vlib_time_now (vm) + 20.0;
+  sleep_time = 10.0;
+  dead_client_scan_time = vlib_time_now (vm) + 10.0;
 
   /*
    * Send plugin message range messages for each plugin we loaded
@@ -524,26 +819,17 @@
     }
 
   /*
-   * Snapshoot the api message table.
-   */
-  serialized_message_table = vl_api_serialize_message_table (am, 0);
-
-  svm = am->vlib_rp;
-  pthread_mutex_lock (&svm->mutex);
-  oldheap = svm_push_data_heap (svm);
-
-  am->serialized_message_table_in_shmem = vec_dup (serialized_message_table);
-
-  pthread_mutex_unlock (&svm->mutex);
-  svm_pop_heap (oldheap);
-
-  /*
    * Save the api message table snapshot, if configured
    */
   if (am->save_msg_table_filename)
     {
       int fd, rv;
       u8 *chroot_file;
+      u8 *serialized_message_table;
+
+      /*
+       * Snapshoot the api message table.
+       */
       if (strstr ((char *) am->save_msg_table_filename, "..")
 	  || index ((char *) am->save_msg_table_filename, '/'))
 	{
@@ -561,6 +847,9 @@
 	  clib_unix_warning ("creat");
 	  goto skip_save;
 	}
+
+      serialized_message_table = vl_api_serialize_message_table (am, 0);
+
       rv = write (fd, serialized_message_table,
 		  vec_len (serialized_message_table));
 
@@ -572,15 +861,14 @@
 	clib_unix_warning ("close");
 
       vec_free (chroot_file);
+      vec_free (serialized_message_table);
     }
 
 skip_save:
-  vec_free (serialized_message_table);
 
   /* $$$ pay attention to frame size, control CPU usage */
   while (1)
     {
-      uword event_type __attribute__ ((unused));
       i8 *headp;
       int need_broadcast;
 
@@ -665,104 +953,89 @@
 	    }
 	}
 
-      event_type = vlib_process_wait_for_event_or_clock (vm, sleep_time);
-      vm->queue_signal_pending = 0;
-      vlib_process_get_events (vm, 0 /* event_data */ );
-
-      if (vlib_time_now (vm) > dead_client_scan_time)
+      /*
+       * see if we have any private api shared-memory segments
+       * If so, push required context variables, and process
+       * a message.
+       */
+      if (PREDICT_FALSE (vec_len (am->vlib_private_rps)))
 	{
-	  vl_api_registration_t **regpp;
-	  vl_api_registration_t *regp;
-	  unix_shared_memory_queue_t *q;
-	  static u32 *dead_indices;
-	  static u32 *confused_indices;
+	  unix_shared_memory_queue_t *save_vlib_input_queue = q;
+	  vl_shmem_hdr_t *save_shmem_hdr = am->shmem_hdr;
+	  svm_region_t *save_vlib_rp = am->vlib_rp;
 
-	  vec_reset_length (dead_indices);
-	  vec_reset_length (confused_indices);
+	  vlib_rp = am->vlib_rp = am->vlib_private_rps[private_segment_rotor];
 
-          /* *INDENT-OFF* */
-          pool_foreach (regpp, am->vl_clients,
-          ({
-            regp = *regpp;
-            if (regp)
-              {
-                q = regp->vl_input_queue;
-                if (kill (q->consumer_pid, 0) < 0)
-                  {
-                    vec_add1(dead_indices, regpp - am->vl_clients);
-                  }
-              }
-            else
-              {
-                clib_warning ("NULL client registration index %d",
-                              regpp - am->vl_clients);
-                vec_add1 (confused_indices, regpp - am->vl_clients);
-              }
-          }));
-          /* *INDENT-ON* */
-	  /* This should "never happen," but if it does, fix it... */
-	  if (PREDICT_FALSE (vec_len (confused_indices) > 0))
+	  am->shmem_hdr = (void *) vlib_rp->user_ctx;
+	  q = am->shmem_hdr->vl_input_queue;
+
+	  pthread_mutex_lock (&q->mutex);
+	  if (q->cursize > 0)
 	    {
-	      int i;
-	      for (i = 0; i < vec_len (confused_indices); i++)
-		{
-		  pool_put_index (am->vl_clients, confused_indices[i]);
-		}
-	    }
+	      headp = (i8 *) (q->data + sizeof (uword) * q->head);
+	      clib_memcpy (&mp, headp, sizeof (uword));
 
-	  if (PREDICT_FALSE (vec_len (dead_indices) > 0))
+	      q->head++;
+	      need_broadcast = (q->cursize == q->maxsize / 2);
+	      q->cursize--;
+
+	      if (PREDICT_FALSE (q->head == q->maxsize))
+		q->head = 0;
+	      pthread_mutex_unlock (&q->mutex);
+	      if (need_broadcast)
+		(void) pthread_cond_broadcast (&q->condvar);
+
+	      pthread_mutex_unlock (&q->mutex);
+
+	      vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+	    }
+	  else
+	    pthread_mutex_unlock (&q->mutex);
+
+	  q = save_vlib_input_queue;
+	  am->shmem_hdr = save_shmem_hdr;
+	  am->vlib_rp = save_vlib_rp;
+
+	  private_segment_rotor++;
+	  if (private_segment_rotor >= vec_len (am->vlib_private_rps))
+	    private_segment_rotor = 0;
+	}
+
+      vlib_process_wait_for_event_or_clock (vm, sleep_time);
+      vec_reset_length (event_data);
+      event_type = vlib_process_get_events (vm, &event_data);
+      now = vlib_time_now (vm);
+
+      switch (event_type)
+	{
+	case QUEUE_SIGNAL_EVENT:
+	  vm->queue_signal_pending = 0;
+	  break;
+
+	case SOCKET_READ_EVENT:
+	  for (i = 0; i < vec_len (event_data); i++)
 	    {
-	      int i;
-	      svm_region_t *svm;
-	      void *oldheap;
-
-	      /* Allow the application to clean up its registrations */
-	      for (i = 0; i < vec_len (dead_indices); i++)
-		{
-		  regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
-		  if (regpp)
-		    {
-		      u32 handle;
-
-		      handle = vl_msg_api_handle_from_index_and_epoch
-			(dead_indices[i], shm->application_restarts);
-		      (void) call_reaper_functions (handle);
-		    }
-		}
-
-	      svm = am->vlib_rp;
-	      pthread_mutex_lock (&svm->mutex);
-	      oldheap = svm_push_data_heap (svm);
-
-	      for (i = 0; i < vec_len (dead_indices); i++)
-		{
-		  regpp = pool_elt_at_index (am->vl_clients, dead_indices[i]);
-		  if (regpp)
-		    {
-		      /* Poison the old registration */
-		      memset (*regpp, 0xF3, sizeof (**regpp));
-		      clib_mem_free (*regpp);
-		      /* no dangling references, please */
-		      *regpp = 0;
-		    }
-		  else
-		    {
-		      svm_pop_heap (oldheap);
-		      clib_warning ("Duplicate free, client index %d",
-				    regpp - am->vl_clients);
-		      oldheap = svm_push_data_heap (svm);
-		    }
-		}
-
-	      svm_client_scan_this_region_nolock (am->vlib_rp);
-
-	      pthread_mutex_unlock (&svm->mutex);
-	      svm_pop_heap (oldheap);
-	      for (i = 0; i < vec_len (dead_indices); i++)
-		pool_put_index (am->vl_clients, dead_indices[i]);
+	      a = pool_elt_at_index (socket_main.process_args, event_data[i]);
+	      vl_api_socket_process_msg (a->clib_file, a->regp,
+					 (i8 *) a->data);
+	      vec_free (a->data);
+	      pool_put (socket_main.process_args, a);
 	    }
+	  break;
 
-	  dead_client_scan_time = vlib_time_now (vm) + 20.0;
+	  /* Timeout... */
+	case -1:
+	  break;
+
+	default:
+	  clib_warning ("unknown event type %d", event_type);
+	  break;
+	}
+
+      if (now > dead_client_scan_time)
+	{
+	  dead_client_scan (am, shm, now);
+	  dead_client_scan_time = vlib_time_now (vm) + 10.0;
 	}
 
       if (TRACE_VLIB_MEMORY_QUEUE)
@@ -785,11 +1058,12 @@
   return 0;
 }
 /* *INDENT-OFF* */
-VLIB_REGISTER_NODE (memclnt_node,static) = {
-    .function = memclnt_process,
-    .type = VLIB_NODE_TYPE_PROCESS,
-    .name = "api-rx-from-ring",
-    .state = VLIB_NODE_STATE_DISABLED,
+VLIB_REGISTER_NODE (memclnt_node) =
+{
+  .function = memclnt_process,
+  .type = VLIB_NODE_TYPE_PROCESS,
+  .name = "api-rx-from-ring",
+  .state = VLIB_NODE_STATE_DISABLED,
 };
 /* *INDENT-ON* */
 
@@ -865,14 +1139,17 @@
 };
 /* *INDENT-ON* */
 
+volatile int **vl_api_queue_cursizes;
+
 static void
 memclnt_queue_callback (vlib_main_t * vm)
 {
-  static volatile int *cursizep;
+  int i;
+  api_main_t *am = &api_main;
 
-  if (PREDICT_FALSE (cursizep == 0))
+  if (PREDICT_FALSE (vec_len (vl_api_queue_cursizes) !=
+		     1 + vec_len (am->vlib_private_rps)))
     {
-      api_main_t *am = &api_main;
       vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
       unix_shared_memory_queue_t *q;
 
@@ -882,15 +1159,30 @@
       q = shmem_hdr->vl_input_queue;
       if (q == 0)
 	return;
-      cursizep = &q->cursize;
+
+      vec_add1 (vl_api_queue_cursizes, &q->cursize);
+
+      for (i = 0; i < vec_len (am->vlib_private_rps); i++)
+	{
+	  svm_region_t *vlib_rp = am->vlib_private_rps[i];
+
+	  shmem_hdr = (void *) vlib_rp->user_ctx;
+	  q = shmem_hdr->vl_input_queue;
+	  vec_add1 (vl_api_queue_cursizes, &q->cursize);
+	}
     }
 
-  if (*cursizep >= 1)
+  for (i = 0; i < vec_len (vl_api_queue_cursizes); i++)
     {
-      vm->queue_signal_pending = 1;
-      vm->api_queue_nonempty = 1;
-      vlib_process_signal_event (vm, memclnt_node.index,
-				 /* event_type */ 0, /* event_data */ 0);
+      if (*vl_api_queue_cursizes[i])
+	{
+	  vm->queue_signal_pending = 1;
+	  vm->api_queue_nonempty = 1;
+	  vlib_process_signal_event (vm, memclnt_node.index,
+				     /* event_type */ QUEUE_SIGNAL_EVENT,
+				     /* event_data */ 0);
+	  break;
+	}
     }
 }
 
@@ -971,13 +1263,55 @@
 
 VLIB_INIT_FUNCTION (setup_memclnt_exit);
 
+u8 *
+format_api_message_rings (u8 * s, va_list * args)
+{
+  api_main_t *am = va_arg (*args, api_main_t *);
+  vl_shmem_hdr_t *shmem_hdr = va_arg (*args, vl_shmem_hdr_t *);
+  int main_segment = va_arg (*args, int);
+  ring_alloc_t *ap;
+  int i;
+
+  if (shmem_hdr == 0)
+    return format (s, "%8s %8s %8s %8s %8s\n",
+		   "Owner", "Size", "Nitems", "Hits", "Misses");
+
+  ap = shmem_hdr->vl_rings;
+
+  for (i = 0; i < vec_len (shmem_hdr->vl_rings); i++)
+    {
+      s = format (s, "%8s %8d %8d %8d %8d\n",
+		  "vlib", ap->size, ap->nitems, ap->hits, ap->misses);
+      ap++;
+    }
+
+  ap = shmem_hdr->client_rings;
+
+  for (i = 0; i < vec_len (shmem_hdr->client_rings); i++)
+    {
+      s = format (s, "%8s %8d %8d %8d %8d\n",
+		  "clnt", ap->size, ap->nitems, ap->hits, ap->misses);
+      ap++;
+    }
+
+  if (main_segment)
+    {
+      s = format (s, "%d ring miss fallback allocations\n", am->ring_misses);
+      s = format
+	(s,
+	 "%d application restarts, %d reclaimed msgs, %d garbage collects\n",
+	 shmem_hdr->application_restarts, shmem_hdr->restart_reclaims,
+	 shmem_hdr->garbage_collects);
+    }
+  return s;
+}
+
 
 static clib_error_t *
 vl_api_ring_command (vlib_main_t * vm,
 		     unformat_input_t * input, vlib_cli_command_t * cli_cmd)
 {
   int i;
-  ring_alloc_t *ap;
   vl_shmem_hdr_t *shmem_hdr;
   api_main_t *am = &api_main;
 
@@ -989,34 +1323,38 @@
       return 0;
     }
 
-  vlib_cli_output (vm, "%8s %8s %8s %8s %8s\n",
-		   "Owner", "Size", "Nitems", "Hits", "Misses");
+  vlib_cli_output (vm, "Main API segment rings:");
 
-  ap = shmem_hdr->vl_rings;
+  vlib_cli_output (vm, "%U", format_api_message_rings, am,
+		   0 /* print header */ , 0 /* notused */ );
 
-  for (i = 0; i < vec_len (shmem_hdr->vl_rings); i++)
+  vlib_cli_output (vm, "%U", format_api_message_rings, am,
+		   shmem_hdr, 1 /* main segment */ );
+
+  for (i = 0; i < vec_len (am->vlib_private_rps); i++)
     {
-      vlib_cli_output (vm, "%8s %8d %8d %8d %8d\n",
-		       "vlib", ap->size, ap->nitems, ap->hits, ap->misses);
-      ap++;
+      svm_region_t *vlib_rp = am->vlib_private_rps[i];
+      shmem_hdr = (void *) vlib_rp->user_ctx;
+      vl_api_registration_t **regpp;
+      vl_api_registration_t *regp;
+
+      /* For horizontal scaling, add a hash table... */
+      /* *INDENT-OFF* */
+      pool_foreach (regpp, am->vl_clients,
+      ({
+        regp = *regpp;
+        if (regp && regp->vlib_rp == vlib_rp)
+          {
+            vlib_cli_output (vm, "%s segment rings:", regp->name);
+            goto found;
+          }
+      }));
+      /* *INDENT-ON* */
+    found:
+      vlib_cli_output (vm, "%U", format_api_message_rings, am,
+		       shmem_hdr, 0 /* main segment */ );
     }
 
-  ap = shmem_hdr->client_rings;
-
-  for (i = 0; i < vec_len (shmem_hdr->client_rings); i++)
-    {
-      vlib_cli_output (vm, "%8s %8d %8d %8d %8d\n",
-		       "clnt", ap->size, ap->nitems, ap->hits, ap->misses);
-      ap++;
-    }
-
-  vlib_cli_output (vm, "%d ring miss fallback allocations\n",
-		   am->ring_misses);
-
-  vlib_cli_output
-    (vm, "%d application restarts, %d reclaimed msgs, %d garbage collects\n",
-     shmem_hdr->application_restarts,
-     shmem_hdr->restart_reclaims, shmem_hdr->garbage_collects);
   return 0;
 }
 
@@ -1051,15 +1389,13 @@
 
     if (regp)
       {
-        q = regp->vl_input_queue;
-        if (kill (q->consumer_pid, 0) < 0)
-          {
-            health = "DEAD";
-          }
+        if (regp->unanswered_pings > 0)
+          health = "questionable";
         else
-          {
-            health = "alive";
-          }
+          health = "OK";
+
+        q = regp->vl_input_queue;
+
         vlib_cli_output (vm, "%16s %8d %14d 0x%016llx %s\n",
                          regp->name, q->consumer_pid, q->cursize,
                          q, health);
@@ -1306,6 +1642,7 @@
 {
   api_main_t *am = &api_main;
   svm_map_region_args_t _a, *a = &_a;
+  clib_error_t *error;
 
   memset (a, 0, sizeof (*a));
   a->root_path = am->root_path;
@@ -1321,7 +1658,10 @@
      0) ? am->global_pvt_heap_size : SVM_PVT_MHEAP_SIZE;
 
   svm_region_init_args (a);
-  return 0;
+
+  error = vlib_call_init_function (vm, vlibsocket_init);
+
+  return error;
 }
 
 VLIB_INIT_FUNCTION (vlibmemory_init);
@@ -2227,7 +2567,7 @@
 
   /* Load the serialized message table from the table dump */
 
-  error = unserialize_open_unix_file (sm, (char *) filename);
+  error = unserialize_open_clib_file (sm, (char *) filename);
 
   if (error)
     return error;
@@ -2251,7 +2591,7 @@
   if (compare_current)
     {
       /* Append the current message table */
-      u8 *tblv = vec_dup (am->serialized_message_table_in_shmem);
+      u8 *tblv = vl_api_serialize_message_table (am, 0);
 
       serialize_open_vector (sm, tblv);
       unserialize_integer (sm, &nmsgs, sizeof (u32));
@@ -2268,6 +2608,7 @@
 	  item->crc = extract_crc (name_and_crc);
 	  item->which = 1;	/* current_image */
 	}
+      vec_free (tblv);
     }
 
   /* Sort the table. */