vapi: support api clients within vpp process

Add vapi_connect_from_vpp() and vapi_disconnect_from_vpp()
calls to allow API clients from within VPP process.

Add a new memclnt_create version that gives the user a
knob to enable or disable dead client scans (keepalive).

Type: feature
Signed-off-by: Ole Troan <ot@cisco.com>
Change-Id: Id0b7bb89308db3a3aed2d3fcbedf4e1282dcd03f
Signed-off-by: Ole Troan <ot@cisco.com>
diff --git a/src/vpp-api/vapi/vapi.c b/src/vpp-api/vapi/vapi.c
index ca46f8d..7808bec 100644
--- a/src/vpp-api/vapi/vapi.c
+++ b/src/vpp-api/vapi/vapi.c
@@ -30,10 +30,18 @@
 #include <vlib/vlib.h>
 #include <vlibapi/api_common.h>
 #include <vlibmemory/memory_client.h>
+#include <vlibmemory/memory_api.h>
+#include <vlibmemory/api.h>
 
 #include <vapi/memclnt.api.vapi.h>
 #include <vapi/vlib.api.vapi.h>
 
+#include <vlibmemory/vl_memory_msg_enum.h>
+
+#define vl_typedefs /* define message structures */
+#include <vlibmemory/vl_memory_api_h.h>
+#undef vl_typedefs
+
 /* we need to use control pings for some stuff and because we're forced to put
  * the code in headers, we need a way to be able to grab the ids of these
  * messages - so declare them here as extern */
@@ -89,6 +97,11 @@
   bool connected;
   bool handle_keepalives;
   pthread_mutex_t requests_mutex;
+
+  svm_queue_t *vl_input_queue;
+  u32 my_client_index;
+  /** client message index hash table */
+  uword *msg_index_by_name_and_crc;
 };
 
 u32
@@ -221,7 +234,7 @@
     {
       return NULL;
     }
-  void *rv = vl_msg_api_alloc_or_null (size);
+  void *rv = vl_msg_api_alloc_as_if_client_or_null (size);
   if (rv)
     {
       clib_memset (rv, 0, size);
@@ -302,13 +315,174 @@
   return vapi_lookup_vl_msg_id (ctx, id) != UINT16_MAX;
 }
 
-vapi_error_e
-vapi_connect (vapi_ctx_t ctx, const char *name,
-	      const char *chroot_prefix,
-	      int max_outstanding_requests,
-	      int response_queue_size, vapi_mode_e mode,
-	      bool handle_keepalives)
+/* Cut and paste to avoid adding dependency to client library */
+__clib_nosanitize_addr static void
+VL_API_VEC_UNPOISON (const void *v)
 {
+  const vec_header_t *vh = &((vec_header_t *) v)[-1];
+  clib_mem_unpoison (vh, sizeof (*vh) + vec_len (v));
+}
+
+static void
+vapi_api_name_and_crc_free (vapi_ctx_t ctx)
+{
+  int i;
+  u8 **keys = 0;
+  hash_pair_t *hp;
+
+  if (!ctx->msg_index_by_name_and_crc)
+    return;
+  hash_foreach_pair (hp, ctx->msg_index_by_name_and_crc,
+		     ({ vec_add1 (keys, (u8 *) hp->key); }));
+  for (i = 0; i < vec_len (keys); i++)
+    vec_free (keys[i]);
+  vec_free (keys);
+  hash_free (ctx->msg_index_by_name_and_crc);
+}
+
+static void
+vapi_memclnt_create_v2_reply_t_handler (vapi_ctx_t ctx,
+					vl_api_memclnt_create_v2_reply_t *mp)
+{
+  serialize_main_t _sm, *sm = &_sm;
+  u8 *tblv;
+  u32 nmsgs;
+  int i;
+  u8 *name_and_crc;
+  u32 msg_index;
+
+  ctx->my_client_index = mp->index;
+
+  /* Clean out any previous hash table (unlikely) */
+  vapi_api_name_and_crc_free (ctx);
+
+  ctx->msg_index_by_name_and_crc = hash_create_string (0, sizeof (uword));
+
+  /* Recreate the vnet-side API message handler table */
+  tblv = uword_to_pointer (mp->message_table, u8 *);
+  unserialize_open_data (sm, tblv, vec_len (tblv));
+  unserialize_integer (sm, &nmsgs, sizeof (u32));
+
+  VL_API_VEC_UNPOISON (tblv);
+
+  for (i = 0; i < nmsgs; i++)
+    {
+      msg_index = unserialize_likely_small_unsigned_integer (sm);
+      unserialize_cstring (sm, (char **) &name_and_crc);
+      hash_set_mem (ctx->msg_index_by_name_and_crc, name_and_crc, msg_index);
+    }
+}
+
+static void
+vapi_memclnt_delete_reply_t_handler (vapi_ctx_t ctx,
+				     vl_api_memclnt_delete_reply_t *mp)
+{
+  void *oldheap;
+  oldheap = vl_msg_push_heap ();
+  svm_queue_free (ctx->vl_input_queue);
+  vl_msg_pop_heap (oldheap);
+
+  ctx->my_client_index = ~0;
+  ctx->vl_input_queue = 0;
+}
+
+int
+vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
+		     int input_queue_size, bool keepalive)
+{
+  vl_api_memclnt_create_v2_t *mp;
+  vl_api_memclnt_create_v2_reply_t *rp;
+  svm_queue_t *vl_input_queue;
+  vl_shmem_hdr_t *shmem_hdr;
+  int rv = 0;
+  void *oldheap;
+  api_main_t *am = vlibapi_get_main ();
+
+  shmem_hdr = am->shmem_hdr;
+
+  if (shmem_hdr == 0 || shmem_hdr->vl_input_queue == 0)
+    {
+      clib_warning ("shmem_hdr / input queue NULL");
+      return -1;
+    }
+
+  clib_mem_unpoison (shmem_hdr, sizeof (*shmem_hdr));
+  VL_MSG_API_SVM_QUEUE_UNPOISON (shmem_hdr->vl_input_queue);
+
+  oldheap = vl_msg_push_heap ();
+  vl_input_queue =
+    svm_queue_alloc_and_init (input_queue_size, sizeof (uword), getpid ());
+  vl_msg_pop_heap (oldheap);
+
+  ctx->my_client_index = ~0;
+  ctx->vl_input_queue = vl_input_queue;
+
+  mp = vl_msg_api_alloc_as_if_client (sizeof (vl_api_memclnt_create_v2_t));
+  clib_memset (mp, 0, sizeof (*mp));
+  mp->_vl_msg_id = ntohs (VL_API_MEMCLNT_CREATE_V2);
+  mp->ctx_quota = ctx_quota;
+  mp->input_queue = (uword) vl_input_queue;
+  strncpy ((char *) mp->name, name, sizeof (mp->name) - 1);
+  mp->keepalive = keepalive;
+
+  vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) &mp);
+
+  while (1)
+    {
+      int qstatus;
+      struct timespec ts, tsrem;
+      int i;
+
+      /* Wait up to 10 seconds */
+      for (i = 0; i < 1000; i++)
+	{
+	  qstatus =
+	    svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0);
+	  if (qstatus == 0)
+	    goto read_one_msg;
+	  ts.tv_sec = 0;
+	  ts.tv_nsec = 10000 * 1000; /* 10 ms */
+	  while (nanosleep (&ts, &tsrem) < 0)
+	    ts = tsrem;
+	}
+      /* Timeout... */
+      return -1;
+
+    read_one_msg:
+      VL_MSG_API_UNPOISON (rp);
+      if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_CREATE_V2_REPLY)
+	{
+	  clib_warning ("unexpected reply: id %d", ntohs (rp->_vl_msg_id));
+	  continue;
+	}
+      rv = clib_net_to_host_u32 (rp->response);
+      vapi_memclnt_create_v2_reply_t_handler (ctx, rp);
+      break;
+    }
+  return (rv);
+}
+
+u32
+vapi_api_get_msg_index (vapi_ctx_t ctx, u8 *name_and_crc)
+{
+  uword *p;
+
+  if (ctx->msg_index_by_name_and_crc)
+    {
+      p = hash_get_mem (ctx->msg_index_by_name_and_crc, name_and_crc);
+      if (p)
+	return p[0];
+    }
+  return ~0;
+}
+
+vapi_error_e
+vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix,
+	      int max_outstanding_requests, int response_queue_size,
+	      vapi_mode_e mode, bool handle_keepalives)
+{
+  int rv;
+
   if (response_queue_size <= 0 || max_outstanding_requests <= 0)
     {
       return VAPI_EINVAL;
@@ -317,6 +491,7 @@
     {
       return VAPI_ENOMEM;
     }
+
   ctx->requests_size = max_outstanding_requests;
   const size_t size = ctx->requests_size * sizeof (*ctx->requests);
   void *tmp = realloc (ctx->requests, size);
@@ -328,6 +503,7 @@
   clib_memset (ctx->requests, 0, size);
   /* coverity[MISSING_LOCK] - 177211 requests_mutex is not needed here */
   ctx->requests_start = ctx->requests_count = 0;
+
   if (chroot_prefix)
     {
       VAPI_DBG ("set memory root path `%s'", chroot_prefix);
@@ -335,12 +511,13 @@
     }
   static char api_map[] = "/vpe-api";
   VAPI_DBG ("client api map `%s'", api_map);
-  if ((vl_client_api_map (api_map)) < 0)
+  if ((rv = vl_map_shmem (api_map, 0 /* is_vlib */)) < 0)
     {
       return VAPI_EMAP_FAIL;
     }
   VAPI_DBG ("connect client `%s'", name);
-  if (vl_client_connect ((char *) name, 0, response_queue_size) < 0)
+  if (vapi_client_connect (ctx, (char *) name, 0, response_queue_size, true) <
+      0)
     {
       vl_client_api_unmap ();
       return VAPI_ECON_FAIL;
@@ -348,14 +525,15 @@
 #if VAPI_DEBUG_CONNECT
   VAPI_DBG ("start probing messages");
 #endif
-  int rv;
+
   int i;
   for (i = 0; i < __vapi_metadata.count; ++i)
     {
       vapi_message_desc_t *m = __vapi_metadata.msgs[i];
       u8 scratch[m->name_with_crc_len + 1];
       memcpy (scratch, m->name_with_crc, m->name_with_crc_len + 1);
-      u32 id = vl_msg_api_get_msg_index (scratch);
+      u32 id = vapi_api_get_msg_index (ctx, scratch);
+
       if (VAPI_INVALID_MSG_ID != id)
 	{
 	  if (id > UINT16_MAX)
@@ -367,10 +545,9 @@
 	    }
 	  if (id > ctx->vl_msg_id_max)
 	    {
-	      vapi_msg_id_t *tmp = realloc (ctx->vl_msg_id_to_vapi_msg_t,
-					    sizeof
-					    (*ctx->vl_msg_id_to_vapi_msg_t) *
-					    (id + 1));
+	      vapi_msg_id_t *tmp =
+		realloc (ctx->vl_msg_id_to_vapi_msg_t,
+			 sizeof (*ctx->vl_msg_id_to_vapi_msg_t) * (id + 1));
 	      if (!tmp)
 		{
 		  rv = VAPI_ENOMEM;
@@ -398,8 +575,8 @@
   if (!vapi_is_msg_available (ctx, vapi_msg_id_control_ping) ||
       !vapi_is_msg_available (ctx, vapi_msg_id_control_ping_reply))
     {
-      VAPI_ERR
-	("control ping or control ping reply not available, cannot connect");
+      VAPI_ERR (
+	"control ping or control ping reply not available, cannot connect");
       rv = VAPI_EINCOMPATIBLE;
       goto fail;
     }
@@ -420,6 +597,157 @@
   return rv;
 }
 
+/*
+ * API client running in the same process as VPP
+ */
+vapi_error_e
+vapi_connect_from_vpp (vapi_ctx_t ctx, const char *name,
+		       int max_outstanding_requests, int response_queue_size,
+		       vapi_mode_e mode, bool handle_keepalives)
+{
+  int rv;
+
+  if (response_queue_size <= 0 || max_outstanding_requests <= 0)
+    {
+      return VAPI_EINVAL;
+    }
+
+  ctx->requests_size = max_outstanding_requests;
+  const size_t size = ctx->requests_size * sizeof (*ctx->requests);
+  void *tmp = realloc (ctx->requests, size);
+  if (!tmp)
+    {
+      return VAPI_ENOMEM;
+    }
+  ctx->requests = tmp;
+  clib_memset (ctx->requests, 0, size);
+  /* coverity[MISSING_LOCK] - 177211 requests_mutex is not needed here */
+  ctx->requests_start = ctx->requests_count = 0;
+
+  VAPI_DBG ("connect client `%s'", name);
+  if (vapi_client_connect (ctx, (char *) name, 0, response_queue_size,
+			   handle_keepalives) < 0)
+    {
+      return VAPI_ECON_FAIL;
+    }
+
+  int i;
+  for (i = 0; i < __vapi_metadata.count; ++i)
+    {
+      vapi_message_desc_t *m = __vapi_metadata.msgs[i];
+      u8 scratch[m->name_with_crc_len + 1];
+      memcpy (scratch, m->name_with_crc, m->name_with_crc_len + 1);
+      u32 id = vapi_api_get_msg_index (ctx, scratch);
+      if (VAPI_INVALID_MSG_ID != id)
+	{
+	  if (id > UINT16_MAX)
+	    {
+	      VAPI_ERR ("Returned vl_msg_id `%u' > UINT16MAX `%u'!", id,
+			UINT16_MAX);
+	      rv = VAPI_EINVAL;
+	      goto fail;
+	    }
+	  if (id > ctx->vl_msg_id_max)
+	    {
+	      vapi_msg_id_t *tmp =
+		realloc (ctx->vl_msg_id_to_vapi_msg_t,
+			 sizeof (*ctx->vl_msg_id_to_vapi_msg_t) * (id + 1));
+	      if (!tmp)
+		{
+		  rv = VAPI_ENOMEM;
+		  goto fail;
+		}
+	      ctx->vl_msg_id_to_vapi_msg_t = tmp;
+	      ctx->vl_msg_id_max = id;
+	    }
+	  ctx->vl_msg_id_to_vapi_msg_t[id] = m->id;
+	  ctx->vapi_msg_id_t_to_vl_msg_id[m->id] = id;
+	}
+      else
+	{
+	  ctx->vapi_msg_id_t_to_vl_msg_id[m->id] = UINT16_MAX;
+	  VAPI_DBG ("Message `%s' not available", m->name_with_crc);
+	}
+    }
+  if (!vapi_is_msg_available (ctx, vapi_msg_id_control_ping) ||
+      !vapi_is_msg_available (ctx, vapi_msg_id_control_ping_reply))
+    {
+      VAPI_ERR (
+	"control ping or control ping reply not available, cannot connect");
+      rv = VAPI_EINCOMPATIBLE;
+      goto fail;
+    }
+  ctx->mode = mode;
+  ctx->connected = true;
+  if (vapi_is_msg_available (ctx, vapi_msg_id_memclnt_keepalive))
+    {
+      ctx->handle_keepalives = handle_keepalives;
+    }
+  else
+    {
+      ctx->handle_keepalives = false;
+    }
+  return VAPI_OK;
+fail:
+  vl_client_disconnect ();
+  return rv;
+}
+
+vapi_error_e
+vapi_disconnect_from_vpp (vapi_ctx_t ctx)
+{
+  if (!ctx->connected)
+    {
+      return VAPI_EINVAL;
+    }
+  vl_api_memclnt_delete_reply_t *rp;
+  svm_queue_t *vl_input_queue;
+  time_t begin;
+  vl_input_queue = ctx->vl_input_queue;
+  vl_client_send_disconnect (0 /* wait for reply */);
+
+  /*
+   * Have to be careful here, in case the client is disconnecting
+   * because e.g. the vlib process died, or is unresponsive.
+   */
+  begin = time (0);
+  vapi_error_e rv = VAPI_OK;
+  while (1)
+    {
+      time_t now;
+
+      now = time (0);
+
+      if (now >= (begin + 2))
+	{
+	  clib_warning ("peer unresponsive, give up");
+	  ctx->my_client_index = ~0;
+	  rv = VAPI_ENORESP;
+	  goto fail;
+	}
+      if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0)
+	continue;
+
+      VL_MSG_API_UNPOISON (rp);
+
+      /* drain the queue */
+      if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_DELETE_REPLY)
+	{
+	  clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id));
+	  vl_msg_api_free (rp);
+	  continue;
+	}
+      vapi_memclnt_delete_reply_t_handler (
+	ctx, (void *) rp /*, ntohl (msgbuf->data_len)*/);
+      break;
+    }
+fail:
+  vapi_api_name_and_crc_free (ctx);
+
+  ctx->connected = false;
+  return rv;
+}
+
 vapi_error_e
 vapi_disconnect (vapi_ctx_t ctx)
 {
@@ -427,13 +755,57 @@
     {
       return VAPI_EINVAL;
     }
-  vl_client_disconnect ();
+
+  vl_api_memclnt_delete_reply_t *rp;
+  svm_queue_t *vl_input_queue;
+  time_t begin;
+  vl_input_queue = ctx->vl_input_queue;
+  vl_client_send_disconnect (0 /* wait for reply */);
+
+  /*
+   * Have to be careful here, in case the client is disconnecting
+   * because e.g. the vlib process died, or is unresponsive.
+   */
+  begin = time (0);
+  vapi_error_e rv = VAPI_OK;
+  while (1)
+    {
+      time_t now;
+
+      now = time (0);
+
+      if (now >= (begin + 2))
+	{
+	  clib_warning ("peer unresponsive, give up");
+	  ctx->my_client_index = ~0;
+	  rv = VAPI_ENORESP;
+	  goto fail;
+	}
+      if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0)
+	continue;
+
+      VL_MSG_API_UNPOISON (rp);
+
+      /* drain the queue */
+      if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_DELETE_REPLY)
+	{
+	  clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id));
+	  vl_msg_api_free (rp);
+	  continue;
+	}
+      vapi_memclnt_delete_reply_t_handler (
+	ctx, (void *) rp /*, ntohl (msgbuf->data_len)*/);
+      break;
+    }
+fail:
+  vapi_api_name_and_crc_free (ctx);
+
   vl_client_api_unmap ();
 #if VAPI_DEBUG_ALLOC
   vapi_to_be_freed_validate ();
 #endif
   ctx->connected = false;
-  return VAPI_OK;
+  return rv;
 }
 
 vapi_error_e
@@ -541,15 +913,10 @@
       return VAPI_EINVAL;
     }
   vapi_error_e rv = VAPI_OK;
-  api_main_t *am = vlibapi_get_main ();
   uword data;
 
-  if (am->our_pid == 0)
-    {
-      return VAPI_EINVAL;
-    }
+  svm_queue_t *q = ctx->vl_input_queue;
 
-  svm_queue_t *q = am->vl_input_queue;
 again:
   VAPI_DBG ("doing shm queue sub");
 
@@ -610,7 +977,6 @@
 	      vapi_msg_memclnt_keepalive_reply_hton (reply);
 	      while (VAPI_EAGAIN == vapi_send (ctx, reply));
 	      vapi_msg_free (ctx, *msg);
-	      VAPI_DBG ("autohandled memclnt_keepalive");
 	      goto again;
 	    }
 	}
@@ -689,9 +1055,8 @@
 	}
       if (payload_offset != -1)
 	{
-	  rv =
-	    ctx->requests[tmp].callback (ctx, ctx->requests[tmp].callback_ctx,
-					 VAPI_OK, is_last, payload);
+	  rv = ctx->requests[tmp].callback (
+	    ctx, ctx->requests[tmp].callback_ctx, VAPI_OK, is_last, payload);
 	}
       else
 	{
@@ -870,7 +1235,7 @@
 int
 vapi_get_client_index (vapi_ctx_t ctx)
 {
-  return vlibapi_get_main ()->my_client_index;
+  return ctx->my_client_index;
 }
 
 bool