vapi: uds transport support

introduce ability to connect over unix socket instead of shared memory

Type: improvement

Change-Id: Id9042c74e33ad4e418896c4d7ae48bb9106195c9
Signed-off-by: Stanislav Zaikin <stanislav.zaikin@46labs.com>
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
diff --git a/src/vpp-api/vapi/vapi.c b/src/vpp-api/vapi/vapi.c
index 46ccf37..022f023 100644
--- a/src/vpp-api/vapi/vapi.c
+++ b/src/vpp-api/vapi/vapi.c
@@ -98,8 +98,11 @@
   bool connected;
   bool handle_keepalives;
   pthread_mutex_t requests_mutex;
+  bool use_uds;
 
   svm_queue_t *vl_input_queue;
+  clib_socket_t client_socket;
+  clib_time_t time;
   u32 my_client_index;
   /** client message index hash table */
   uword *msg_index_by_name_and_crc;
@@ -230,8 +233,8 @@
 
 #endif
 
-void *
-vapi_msg_alloc (vapi_ctx_t ctx, size_t size)
+static void *
+vapi_shm_msg_alloc (vapi_ctx_t ctx, size_t size)
 {
   if (!ctx->connected)
     {
@@ -245,6 +248,23 @@
   return rv;
 }
 
+static void *
+vapi_sock_msg_alloc (size_t size)
+{
+  u8 *rv = 0;
+  vec_validate_init_empty (rv, size - 1, 0);
+  return rv;
+}
+
+void *
+vapi_msg_alloc (vapi_ctx_t ctx, size_t size)
+{
+  if (ctx->use_uds)
+    return vapi_sock_msg_alloc (size);
+
+  return vapi_shm_msg_alloc (ctx, size);
+}
+
 void
 vapi_msg_free (vapi_ctx_t ctx, void *msg)
 {
@@ -252,10 +272,19 @@
     {
       return;
     }
+
 #if VAPI_DEBUG_ALLOC
   vapi_trace_free (msg);
 #endif
-  vl_msg_api_free (msg);
+
+  if (ctx->use_uds)
+    {
+      vec_free (msg);
+    }
+  else
+    {
+      vl_msg_api_free (msg);
+    }
 }
 
 vapi_msg_id_t
@@ -294,6 +323,7 @@
     }
   pthread_mutex_init (&ctx->requests_mutex, NULL);
   *result = ctx;
+  clib_time_init (&ctx->time);
   return VAPI_OK;
 fail:
   vapi_ctx_free (ctx);
@@ -343,6 +373,205 @@
   hash_free (ctx->msg_index_by_name_and_crc);
 }
 
+static vapi_error_e
+vapi_sock_get_errno (int err)
+{
+  switch (err)
+    {
+    case ENOTSOCK:
+      return VAPI_ENOTSOCK;
+    case EACCES:
+      return VAPI_EACCES;
+    case ECONNRESET:
+      return VAPI_ECONNRESET;
+    default:
+      break;
+    }
+  return VAPI_ESOCK_FAILURE;
+}
+
+static vapi_error_e
+vapi_sock_send (vapi_ctx_t ctx, u8 *msg)
+{
+  size_t n;
+  struct msghdr hdr;
+
+  const size_t len = vec_len (msg);
+  const size_t total_len = len + sizeof (msgbuf_t);
+
+  msgbuf_t msgbuf1 = {
+    .q = 0,
+    .gc_mark_timestamp = 0,
+    .data_len = htonl (len),
+  };
+
+  struct iovec bufs[2] = {
+    [0] = { .iov_base = &msgbuf1, .iov_len = sizeof (msgbuf1) },
+    [1] = { .iov_base = msg, .iov_len = len },
+  };
+
+  clib_memset (&hdr, 0, sizeof (hdr));
+  hdr.msg_iov = bufs;
+  hdr.msg_iovlen = 2;
+
+  n = sendmsg (ctx->client_socket.fd, &hdr, 0);
+  if (n < 0)
+    {
+      return vapi_sock_get_errno (errno);
+    }
+
+  if (n < total_len)
+    {
+      return VAPI_EAGAIN;
+    }
+
+  vec_free (msg);
+
+  return VAPI_OK;
+}
+
+static vapi_error_e
+vapi_sock_send2 (vapi_ctx_t ctx, u8 *msg1, u8 *msg2)
+{
+  size_t n;
+  struct msghdr hdr;
+
+  const size_t len1 = vec_len (msg1);
+  const size_t len2 = vec_len (msg2);
+  const size_t total_len = len1 + len2 + 2 * sizeof (msgbuf_t);
+
+  msgbuf_t msgbuf1 = {
+    .q = 0,
+    .gc_mark_timestamp = 0,
+    .data_len = htonl (len1),
+  };
+
+  msgbuf_t msgbuf2 = {
+    .q = 0,
+    .gc_mark_timestamp = 0,
+    .data_len = htonl (len2),
+  };
+
+  struct iovec bufs[4] = {
+    [0] = { .iov_base = &msgbuf1, .iov_len = sizeof (msgbuf1) },
+    [1] = { .iov_base = msg1, .iov_len = len1 },
+    [2] = { .iov_base = &msgbuf2, .iov_len = sizeof (msgbuf2) },
+    [3] = { .iov_base = msg2, .iov_len = len2 },
+  };
+
+  clib_memset (&hdr, 0, sizeof (hdr));
+  hdr.msg_iov = bufs;
+  hdr.msg_iovlen = 4;
+
+  n = sendmsg (ctx->client_socket.fd, &hdr, 0);
+  if (n < 0)
+    {
+      return vapi_sock_get_errno (errno);
+    }
+
+  if (n < total_len)
+    {
+      return VAPI_EAGAIN;
+    }
+
+  vec_free (msg1);
+  vec_free (msg2);
+
+  return VAPI_OK;
+}
+
+static vapi_error_e
+vapi_sock_recv_internal (vapi_ctx_t ctx, u8 **vec_msg, u32 timeout)
+{
+  clib_socket_t *sock = &ctx->client_socket;
+  u32 data_len = 0, msg_size;
+  msgbuf_t *mbp = 0;
+  ssize_t n, current_rx_index;
+  f64 deadline;
+  vapi_error_e rv = VAPI_EAGAIN;
+
+  if (ctx->client_socket.fd == 0)
+    return VAPI_ENOTSOCK;
+
+  deadline = clib_time_now (&ctx->time) + timeout;
+
+  while (1)
+    {
+      current_rx_index = vec_len (sock->rx_buffer);
+      while (current_rx_index < sizeof (*mbp))
+	{
+	  vec_validate (sock->rx_buffer, sizeof (*mbp) - 1);
+	  n = recv (sock->fd, sock->rx_buffer + current_rx_index,
+		    sizeof (*mbp) - current_rx_index, MSG_DONTWAIT);
+	  if (n < 0)
+	    {
+	      if (errno == EAGAIN && clib_time_now (&ctx->time) >= deadline)
+		return VAPI_EAGAIN;
+
+	      if (errno == EAGAIN)
+		continue;
+
+	      clib_unix_warning ("socket_read");
+	      vec_set_len (sock->rx_buffer, current_rx_index);
+	      return vapi_sock_get_errno (errno);
+	    }
+	  current_rx_index += n;
+	}
+      vec_set_len (sock->rx_buffer, current_rx_index);
+
+      mbp = (msgbuf_t *) (sock->rx_buffer);
+      data_len = ntohl (mbp->data_len);
+      current_rx_index = vec_len (sock->rx_buffer);
+      vec_validate (sock->rx_buffer, current_rx_index + data_len);
+      mbp = (msgbuf_t *) (sock->rx_buffer);
+      msg_size = data_len + sizeof (*mbp);
+
+      while (current_rx_index < msg_size)
+	{
+	  n = recv (sock->fd, sock->rx_buffer + current_rx_index,
+		    msg_size - current_rx_index, MSG_DONTWAIT);
+	  if (n < 0)
+	    {
+	      if (errno == EAGAIN && clib_time_now (&ctx->time) >= deadline)
+		return VAPI_EAGAIN;
+
+	      if (errno == EAGAIN)
+		continue;
+
+	      clib_unix_warning ("socket_read");
+	      vec_set_len (sock->rx_buffer, current_rx_index);
+	      return vapi_sock_get_errno (errno);
+	    }
+	  current_rx_index += n;
+	}
+      vec_set_len (sock->rx_buffer, current_rx_index);
+
+      if (vec_len (sock->rx_buffer) >= data_len + sizeof (*mbp))
+	{
+	  if (data_len)
+	    {
+	      vec_add (*vec_msg, mbp->data, data_len);
+	      rv = VAPI_OK;
+	    }
+	  else
+	    {
+	      *vec_msg = 0;
+	    }
+
+	  if (vec_len (sock->rx_buffer) == data_len + sizeof (*mbp))
+	    vec_set_len (sock->rx_buffer, 0);
+	  else
+	    vec_delete (sock->rx_buffer, data_len + sizeof (*mbp), 0);
+	  mbp = 0;
+
+	  /* Quit if we're out of data, and not expecting a ping reply */
+	  if (vec_len (sock->rx_buffer) == 0)
+	    break;
+	}
+    }
+  return rv;
+}
+
 static void
 vapi_memclnt_create_v2_reply_t_handler (vapi_ctx_t ctx,
 					vl_api_memclnt_create_v2_reply_t *mp)
@@ -377,6 +606,28 @@
 }
 
 static void
+vapi_sockclnt_create_reply_t_handler (vapi_ctx_t ctx,
+				      vl_api_sockclnt_create_reply_t *mp)
+{
+  int i;
+  u8 *name_and_crc;
+
+  ctx->my_client_index = mp->index;
+
+  /* Clean out any previous hash table (unlikely) */
+  vapi_api_name_and_crc_free (ctx);
+
+  ctx->msg_index_by_name_and_crc = hash_create_string (0, sizeof (uword));
+
+  for (i = 0; i < be16toh (mp->count); i++)
+    {
+      name_and_crc = format (0, "%s%c", mp->message_table[i].name, 0);
+      hash_set_mem (ctx->msg_index_by_name_and_crc, name_and_crc,
+		    be16toh (mp->message_table[i].index));
+    }
+}
+
+static void
 vapi_memclnt_delete_reply_t_handler (vapi_ctx_t ctx,
 				     vl_api_memclnt_delete_reply_t *mp)
 {
@@ -389,9 +640,17 @@
   ctx->vl_input_queue = 0;
 }
 
+static void
+vapi_sockclnt_delete_reply_t_handler (vapi_ctx_t ctx,
+				      vl_api_sockclnt_delete_reply_t *mp)
+{
+  ctx->my_client_index = ~0;
+  ctx->vl_input_queue = 0;
+}
+
 static int
-vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
-		     int input_queue_size, bool keepalive)
+vapi_shm_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
+			 int input_queue_size, bool keepalive)
 {
   vl_api_memclnt_create_v2_t *mp;
   vl_api_memclnt_create_v2_reply_t *rp;
@@ -406,7 +665,7 @@
   if (shmem_hdr == 0 || shmem_hdr->vl_input_queue == 0)
     {
       clib_warning ("shmem_hdr / input queue NULL");
-      return -1;
+      return VAPI_ECON_FAIL;
     }
 
   clib_mem_unpoison (shmem_hdr, sizeof (*shmem_hdr));
@@ -449,7 +708,7 @@
 	    ts = tsrem;
 	}
       /* Timeout... */
-      return -1;
+      return VAPI_ECON_FAIL;
 
     read_one_msg:
       VL_MSG_API_UNPOISON (rp);
@@ -465,8 +724,83 @@
   return (rv);
 }
 
+static int
+vapi_sock_client_connect (vapi_ctx_t ctx, char *path, const char *name)
+{
+  clib_error_t *error;
+  clib_socket_t *sock;
+  vl_api_sockclnt_create_t *mp;
+  vl_api_sockclnt_create_reply_t *rp;
+  int rv = 0;
+  u8 *msg = 0;
+
+  ctx->my_client_index = ~0;
+
+  if (ctx->client_socket.fd)
+    return VAPI_EINVAL;
+
+  if (name == 0)
+    return VAPI_EINVAL;
+
+  sock = &ctx->client_socket;
+  sock->config = path ? path : API_SOCKET_FILE;
+  sock->flags = CLIB_SOCKET_F_IS_CLIENT;
+
+  if ((error = clib_socket_init (sock)))
+    {
+      clib_error_report (error);
+      return VAPI_ECON_FAIL;
+    }
+
+  mp = vapi_sock_msg_alloc (sizeof (vl_api_sockclnt_create_t));
+  mp->_vl_msg_id = ntohs (VL_API_SOCKCLNT_CREATE);
+  strncpy ((char *) mp->name, name, sizeof (mp->name) - 1);
+
+  if (vapi_sock_send (ctx, (void *) mp) != VAPI_OK)
+    {
+      return VAPI_ECON_FAIL;
+    }
+
+  while (1)
+    {
+      int qstatus;
+      struct timespec ts, tsrem;
+      int i;
+
+      /* Wait up to 10 seconds */
+      for (i = 0; i < 1000; i++)
+	{
+	  qstatus = vapi_sock_recv_internal (ctx, &msg, 0);
+
+	  if (qstatus == 0)
+	    goto read_one_msg;
+	  ts.tv_sec = 0;
+	  ts.tv_nsec = 10000 * 1000; /* 10 ms */
+	  while (nanosleep (&ts, &tsrem) < 0)
+	    ts = tsrem;
+	}
+      /* Timeout... */
+      return -1;
+
+    read_one_msg:
+      if (vec_len (msg) == 0)
+	continue;
+
+      rp = (void *) msg;
+      if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_CREATE_REPLY)
+	{
+	  clib_warning ("unexpected reply: id %d", ntohs (rp->_vl_msg_id));
+	  continue;
+	}
+      rv = clib_net_to_host_u32 (rp->response);
+      vapi_sockclnt_create_reply_t_handler (ctx, rp);
+      break;
+    }
+  return (rv);
+}
+
 static void
-vapi_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup)
+vapi_shm_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup)
 {
   vl_api_memclnt_delete_t *mp;
   vl_shmem_hdr_t *shmem_hdr;
@@ -485,8 +819,21 @@
   vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) &mp);
 }
 
+static vapi_error_e
+vapi_sock_client_send_disconnect (vapi_ctx_t ctx)
+{
+  vl_api_sockclnt_delete_t *mp;
+
+  mp = vapi_msg_alloc (ctx, sizeof (vl_api_sockclnt_delete_t));
+  clib_memset (mp, 0, sizeof (*mp));
+  mp->_vl_msg_id = ntohs (VL_API_SOCKCLNT_DELETE);
+  mp->client_index = ctx->my_client_index;
+
+  return vapi_sock_send (ctx, (void *) mp);
+}
+
 static int
-vapi_client_disconnect (vapi_ctx_t ctx)
+vapi_shm_client_disconnect (vapi_ctx_t ctx)
 {
   vl_api_memclnt_delete_reply_t *rp;
   svm_queue_t *vl_input_queue;
@@ -494,7 +841,7 @@
   msgbuf_t *msgbuf;
 
   vl_input_queue = ctx->vl_input_queue;
-  vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
+  vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */);
 
   /*
    * Have to be careful here, in case the client is disconnecting
@@ -511,7 +858,7 @@
 	{
 	  clib_warning ("peer unresponsive, give up");
 	  ctx->my_client_index = ~0;
-	  return -1;
+	  return VAPI_ENORESP;
 	}
       if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0)
 	continue;
@@ -535,6 +882,65 @@
   return 0;
 }
 
+static vapi_error_e
+vapi_sock_client_disconnect (vapi_ctx_t ctx)
+{
+  vl_api_sockclnt_delete_reply_t *rp;
+  u8 *msg = 0;
+  msgbuf_t *msgbuf;
+  int rv;
+  f64 deadline;
+
+  deadline = clib_time_now (&ctx->time) + 2;
+
+  do
+    {
+      rv = vapi_sock_client_send_disconnect (ctx);
+    }
+  while (clib_time_now (&ctx->time) < deadline && rv != VAPI_OK);
+
+  while (1)
+    {
+      if (clib_time_now (&ctx->time) >= deadline)
+	{
+	  clib_warning ("peer unresponsive, give up");
+	  ctx->my_client_index = ~0;
+	  return VAPI_ENORESP;
+	}
+
+      if (vapi_sock_recv_internal (ctx, &msg, 0) != VAPI_OK)
+	continue;
+
+      msgbuf = (void *) msg;
+      rp = (void *) msgbuf->data;
+      /* drain the queue */
+      if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_DELETE_REPLY)
+	{
+	  clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id));
+	  msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data));
+	  vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len));
+	  continue;
+	}
+      msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data));
+      vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len));
+      break;
+    }
+
+  clib_socket_close (&ctx->client_socket);
+  vapi_api_name_and_crc_free (ctx);
+  return VAPI_OK;
+}
+
+int
+vapi_client_disconnect (vapi_ctx_t ctx)
+{
+  if (ctx->use_uds)
+    {
+      return vapi_sock_client_disconnect (ctx);
+    }
+  return vapi_shm_client_disconnect (ctx);
+}
+
 u32
 vapi_api_get_msg_index (vapi_ctx_t ctx, u8 *name_and_crc)
 {
@@ -550,9 +956,9 @@
 }
 
 vapi_error_e
-vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix,
-	      int max_outstanding_requests, int response_queue_size,
-	      vapi_mode_e mode, bool handle_keepalives)
+vapi_connect_ex (vapi_ctx_t ctx, const char *name, const char *path,
+		 int max_outstanding_requests, int response_queue_size,
+		 vapi_mode_e mode, bool handle_keepalives, bool use_uds)
 {
   int rv;
 
@@ -560,7 +966,8 @@
     {
       return VAPI_EINVAL;
     }
-  if (!clib_mem_get_per_cpu_heap () && !clib_mem_init (0, 1024 * 1024 * 32))
+
+  if (!clib_mem_get_per_cpu_heap () && !clib_mem_init (0, 1024L * 1024 * 32))
     {
       return VAPI_ENOMEM;
     }
@@ -576,28 +983,39 @@
   clib_memset (ctx->requests, 0, size);
   /* coverity[MISSING_LOCK] - 177211 requests_mutex is not needed here */
   ctx->requests_start = ctx->requests_count = 0;
+  ctx->use_uds = use_uds;
 
-  if (chroot_prefix)
+  if (use_uds)
     {
-      VAPI_DBG ("set memory root path `%s'", chroot_prefix);
-      vl_set_memory_root_path ((char *) chroot_prefix);
+      if (vapi_sock_client_connect (ctx, (char *) path, name) < 0)
+	{
+	  return VAPI_ECON_FAIL;
+	}
     }
-  static char api_map[] = "/vpe-api";
-  VAPI_DBG ("client api map `%s'", api_map);
-  if ((rv = vl_map_shmem (api_map, 0 /* is_vlib */)) < 0)
+  else
     {
-      return VAPI_EMAP_FAIL;
-    }
-  VAPI_DBG ("connect client `%s'", name);
-  if (vapi_client_connect (ctx, (char *) name, 0, response_queue_size, true) <
-      0)
-    {
-      vl_client_api_unmap ();
-      return VAPI_ECON_FAIL;
-    }
+      if (path)
+	{
+	  VAPI_DBG ("set memory root path `%s'", path);
+	  vl_set_memory_root_path ((char *) path);
+	}
+      static char api_map[] = "/vpe-api";
+      VAPI_DBG ("client api map `%s'", api_map);
+      if ((rv = vl_map_shmem (api_map, 0 /* is_vlib */)) < 0)
+	{
+	  return VAPI_EMAP_FAIL;
+	}
+      VAPI_DBG ("connect client `%s'", name);
+      if (vapi_shm_client_connect (ctx, (char *) name, 0, response_queue_size,
+				   true) < 0)
+	{
+	  vl_client_api_unmap ();
+	  return VAPI_ECON_FAIL;
+	}
 #if VAPI_DEBUG_CONNECT
   VAPI_DBG ("start probing messages");
 #endif
+    }
 
   int i;
   for (i = 0; i < __vapi_metadata.count; ++i)
@@ -670,6 +1088,15 @@
   return rv;
 }
 
+vapi_error_e
+vapi_connect (vapi_ctx_t ctx, const char *name, const char *chroot_prefix,
+	      int max_outstanding_requests, int response_queue_size,
+	      vapi_mode_e mode, bool handle_keepalives)
+{
+  return vapi_connect_ex (ctx, name, chroot_prefix, max_outstanding_requests,
+			  response_queue_size, mode, handle_keepalives, false);
+}
+
 /*
  * API client running in the same process as VPP
  */
@@ -680,6 +1107,11 @@
 {
   int rv;
 
+  if (ctx->use_uds)
+    {
+      return VAPI_ENOTSUP;
+    }
+
   if (response_queue_size <= 0 || max_outstanding_requests <= 0)
     {
       return VAPI_EINVAL;
@@ -698,8 +1130,8 @@
   ctx->requests_start = ctx->requests_count = 0;
 
   VAPI_DBG ("connect client `%s'", name);
-  if (vapi_client_connect (ctx, (char *) name, 0, response_queue_size,
-			   handle_keepalives) < 0)
+  if (vapi_shm_client_connect (ctx, (char *) name, 0, response_queue_size,
+			       handle_keepalives) < 0)
     {
       return VAPI_ECON_FAIL;
     }
@@ -773,11 +1205,17 @@
     {
       return VAPI_EINVAL;
     }
+
+  if (ctx->use_uds)
+    {
+      return VAPI_ENOTSUP;
+    }
+
   vl_api_memclnt_delete_reply_t *rp;
   svm_queue_t *vl_input_queue;
   time_t begin;
   vl_input_queue = ctx->vl_input_queue;
-  vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
+  vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */);
 
   /*
    * Have to be careful here, in case the client is disconnecting
@@ -821,19 +1259,14 @@
   return rv;
 }
 
-vapi_error_e
-vapi_disconnect (vapi_ctx_t ctx)
+static vapi_error_e
+vapi_shm_disconnect (vapi_ctx_t ctx)
 {
-  if (!ctx->connected)
-    {
-      return VAPI_EINVAL;
-    }
-
   vl_api_memclnt_delete_reply_t *rp;
   svm_queue_t *vl_input_queue;
   time_t begin;
   vl_input_queue = ctx->vl_input_queue;
-  vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
+  vapi_shm_client_send_disconnect (ctx, 0 /* wait for reply */);
 
   /*
    * Have to be careful here, in case the client is disconnecting
@@ -881,12 +1314,128 @@
   return rv;
 }
 
+static vapi_error_e
+vapi_sock_disconnect (vapi_ctx_t ctx)
+{
+  vl_api_sockclnt_delete_reply_t *rp;
+  time_t begin;
+  u8 *msg = 0;
+
+  vapi_sock_client_send_disconnect (ctx);
+
+  begin = time (0);
+  vapi_error_e rv = VAPI_OK;
+  while (1)
+    {
+      time_t now;
+
+      now = time (0);
+
+      if (now >= (begin + 2))
+	{
+	  clib_warning ("peer unresponsive, give up");
+	  ctx->my_client_index = ~0;
+	  rv = VAPI_ENORESP;
+	  goto fail;
+	}
+      if (vapi_sock_recv_internal (ctx, &msg, 0) < 0)
+	continue;
+
+      if (vec_len (msg) == 0)
+	continue;
+
+      rp = (void *) msg;
+
+      /* drain the queue */
+      if (ntohs (rp->_vl_msg_id) != VL_API_SOCKCLNT_DELETE_REPLY)
+	{
+	  clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id));
+	  continue;
+	}
+      vapi_sockclnt_delete_reply_t_handler (
+	ctx, (void *) rp /*, ntohl (msgbuf->data_len)*/);
+      break;
+    }
+fail:
+  clib_socket_close (&ctx->client_socket);
+  vapi_api_name_and_crc_free (ctx);
+
+  ctx->connected = false;
+  return rv;
+}
+
+vapi_error_e
+vapi_disconnect (vapi_ctx_t ctx)
+{
+  if (!ctx->connected)
+    {
+      return VAPI_EINVAL;
+    }
+
+  if (ctx->use_uds)
+    {
+      return vapi_sock_disconnect (ctx);
+    }
+  return vapi_shm_disconnect (ctx);
+}
+
 vapi_error_e
 vapi_get_fd (vapi_ctx_t ctx, int *fd)
 {
+  if (ctx->use_uds && fd)
+    {
+      *fd = ctx->client_socket.fd;
+      return VAPI_OK;
+    }
   return VAPI_ENOTSUP;
 }
 
+#if VAPI_DEBUG
+static void
+vapi_debug_log (vapi_ctx_t ctx, void *msg, const char *fun)
+{
+  unsigned msgid = be16toh (*(u16 *) msg);
+  if (msgid <= ctx->vl_msg_id_max)
+    {
+      vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid];
+      if (id < __vapi_metadata.count)
+	{
+	  VAPI_DBG ("%s msg@%p:%u[%s]", fun, msg, msgid,
+		    __vapi_metadata.msgs[id]->name);
+	}
+      else
+	{
+	  VAPI_DBG ("%s msg@%p:%u[UNKNOWN]", fun, msg, msgid);
+	}
+    }
+  else
+    {
+      VAPI_DBG ("%s msg@%p:%u[UNKNOWN]", fun, msg, msgid);
+    }
+}
+#endif
+
+static vapi_error_e
+vapi_shm_send (vapi_ctx_t ctx, void *msg)
+{
+  int rv = VAPI_OK;
+  int tmp;
+  svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue;
+#if VAPI_DEBUG
+  vapi_debug_log (ctx, msg, "send");
+#endif
+  tmp =
+    svm_queue_add (q, (u8 *) &msg, VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1);
+  if (tmp < 0)
+    {
+      rv = VAPI_EAGAIN;
+    }
+  else
+    VL_MSG_API_POISON (msg);
+
+  return rv;
+}
+
 vapi_error_e
 vapi_send (vapi_ctx_t ctx, void *msg)
 {
@@ -896,38 +1445,39 @@
       rv = VAPI_EINVAL;
       goto out;
     }
-  int tmp;
-  svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue;
-#if VAPI_DEBUG
-  unsigned msgid = be16toh (*(u16 *) msg);
-  if (msgid <= ctx->vl_msg_id_max)
+
+  if (ctx->use_uds)
     {
-      vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid];
-      if (id < __vapi_metadata.count)
-	{
-	  VAPI_DBG ("send msg@%p:%u[%s]", msg, msgid,
-		    __vapi_metadata.msgs[id]->name);
-	}
-      else
-	{
-	  VAPI_DBG ("send msg@%p:%u[UNKNOWN]", msg, msgid);
-	}
+      rv = vapi_sock_send (ctx, msg);
     }
   else
     {
-      VAPI_DBG ("send msg@%p:%u[UNKNOWN]", msg, msgid);
+      rv = vapi_shm_send (ctx, msg);
     }
+
+out:
+  VAPI_DBG ("vapi_send() rv = %d", rv);
+  return rv;
+}
+
+static vapi_error_e
+vapi_shm_send2 (vapi_ctx_t ctx, void *msg1, void *msg2)
+{
+  vapi_error_e rv = VAPI_OK;
+  svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue;
+#if VAPI_DEBUG
+  vapi_debug_log (ctx, msg1, "send2");
+  vapi_debug_log (ctx, msg2, "send2");
 #endif
-  tmp = svm_queue_add (q, (u8 *) & msg,
-		       VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1);
+  int tmp = svm_queue_add2 (q, (u8 *) &msg1, (u8 *) &msg2,
+			    VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1);
   if (tmp < 0)
     {
       rv = VAPI_EAGAIN;
     }
   else
-    VL_MSG_API_POISON (msg);
-out:
-  VAPI_DBG ("vapi_send() rv = %d", rv);
+    VL_MSG_API_POISON (msg1);
+
   return rv;
 }
 
@@ -940,63 +1490,39 @@
       rv = VAPI_EINVAL;
       goto out;
     }
-  svm_queue_t *q = vlibapi_get_main ()->shmem_hdr->vl_input_queue;
-#if VAPI_DEBUG
-  unsigned msgid1 = be16toh (*(u16 *) msg1);
-  unsigned msgid2 = be16toh (*(u16 *) msg2);
-  const char *name1 = "UNKNOWN";
-  const char *name2 = "UNKNOWN";
-  if (msgid1 <= ctx->vl_msg_id_max)
+
+  if (ctx->use_uds)
     {
-      vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid1];
-      if (id < __vapi_metadata.count)
-	{
-	  name1 = __vapi_metadata.msgs[id]->name;
-	}
-    }
-  if (msgid2 <= ctx->vl_msg_id_max)
-    {
-      vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid2];
-      if (id < __vapi_metadata.count)
-	{
-	  name2 = __vapi_metadata.msgs[id]->name;
-	}
-    }
-  VAPI_DBG ("send two: %u[%s], %u[%s]", msgid1, name1, msgid2, name2);
-#endif
-  int tmp = svm_queue_add2 (q, (u8 *) & msg1, (u8 *) & msg2,
-			    VAPI_MODE_BLOCKING == ctx->mode ? 0 : 1);
-  if (tmp < 0)
-    {
-      rv = VAPI_EAGAIN;
+      rv = vapi_sock_send2 (ctx, msg1, msg2);
     }
   else
-    VL_MSG_API_POISON (msg1);
+    {
+      rv = vapi_shm_send2 (ctx, msg1, msg2);
+    }
+
 out:
   VAPI_DBG ("vapi_send() rv = %d", rv);
   return rv;
 }
 
-vapi_error_e
-vapi_recv (vapi_ctx_t ctx, void **msg, size_t * msg_size,
-	   svm_q_conditional_wait_t cond, u32 time)
+static vapi_error_e
+vapi_shm_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size,
+	       svm_q_conditional_wait_t cond, u32 time)
 {
-  if (!ctx || !ctx->connected || !msg || !msg_size)
-    {
-      return VAPI_EINVAL;
-    }
   vapi_error_e rv = VAPI_OK;
   uword data;
 
   svm_queue_t *q = ctx->vl_input_queue;
 
-again:
   VAPI_DBG ("doing shm queue sub");
 
   int tmp = svm_queue_sub (q, (u8 *) & data, cond, time);
 
-  if (tmp == 0)
+  if (tmp != 0)
     {
+      return VAPI_EAGAIN;
+    }
+
       VL_MSG_API_UNPOISON ((void *) data);
 #if VAPI_DEBUG_ALLOC
       vapi_add_to_be_freed ((void *) data);
@@ -1010,60 +1536,94 @@
 	}
       *msg = (u8 *) data;
       *msg_size = ntohl (msgbuf->data_len);
+
 #if VAPI_DEBUG
-      unsigned msgid = be16toh (*(u16 *) * msg);
-      if (msgid <= ctx->vl_msg_id_max)
-	{
-	  vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[msgid];
-	  if (id < __vapi_metadata.count)
-	    {
-	      VAPI_DBG ("recv msg@%p:%u[%s]", *msg, msgid,
-			__vapi_metadata.msgs[id]->name);
-	    }
-	  else
-	    {
-	      VAPI_DBG ("recv msg@%p:%u[UNKNOWN]", *msg, msgid);
-	    }
-	}
-      else
-	{
-	  VAPI_DBG ("recv msg@%p:%u[UNKNOWN]", *msg, msgid);
-	}
+      vapi_debug_log (ctx, msg, "recv");
 #endif
-      if (ctx->handle_keepalives)
-	{
-	  unsigned msgid = be16toh (*(u16 *) * msg);
-	  if (msgid ==
-	      vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive))
-	    {
-	      vapi_msg_memclnt_keepalive_reply *reply = NULL;
-	      do
-		{
-		  reply = vapi_msg_alloc (ctx, sizeof (*reply));
-		}
-	      while (!reply);
-	      reply->header.context = vapi_get_client_index (ctx);
-	      reply->header._vl_msg_id =
-		vapi_lookup_vl_msg_id (ctx,
-				       vapi_msg_id_memclnt_keepalive_reply);
-	      reply->payload.retval = 0;
-	      vapi_msg_memclnt_keepalive_reply_hton (reply);
-	      while (VAPI_EAGAIN == vapi_send (ctx, reply));
-	      vapi_msg_free (ctx, *msg);
-	      goto again;
-	    }
-	}
+
+      return rv;
+}
+
+static vapi_error_e
+vapi_sock_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size, u32 time)
+{
+  vapi_error_e rv = VAPI_OK;
+  u8 *data = 0;
+  if (time == 0 && ctx->mode == VAPI_MODE_BLOCKING)
+    time = 1;
+
+  rv = vapi_sock_recv_internal (ctx, &data, time);
+
+  if (rv != VAPI_OK)
+    {
+      return rv;
+    }
+
+  *msg = data;
+  *msg_size = vec_len (data);
+
+#if VAPI_DEBUG
+  vapi_debug_log (ctx, msg, "recv");
+#endif
+
+  return rv;
+}
+
+vapi_error_e
+vapi_recv (vapi_ctx_t ctx, void **msg, size_t *msg_size,
+	   svm_q_conditional_wait_t cond, u32 time)
+{
+  if (!ctx || !ctx->connected || !msg || !msg_size)
+    {
+      return VAPI_EINVAL;
+    }
+  vapi_error_e rv = VAPI_OK;
+
+again:
+  if (ctx->use_uds)
+    {
+      rv = vapi_sock_recv (ctx, msg, msg_size, time);
     }
   else
     {
-      rv = VAPI_EAGAIN;
+      rv = vapi_shm_recv (ctx, msg, msg_size, cond, time);
     }
+
+  if (rv != VAPI_OK)
+    return rv;
+
+  if (ctx->handle_keepalives)
+    {
+      unsigned msgid = be16toh (*(u16 *) *msg);
+      if (msgid == vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive))
+	{
+	  vapi_msg_memclnt_keepalive_reply *reply = NULL;
+	  do
+	    {
+	      reply = vapi_msg_alloc (ctx, sizeof (*reply));
+	    }
+	  while (!reply);
+	  reply->header.context = vapi_get_client_index (ctx);
+	  reply->header._vl_msg_id =
+	    vapi_lookup_vl_msg_id (ctx, vapi_msg_id_memclnt_keepalive_reply);
+	  reply->payload.retval = 0;
+	  vapi_msg_memclnt_keepalive_reply_hton (reply);
+	  while (VAPI_EAGAIN == vapi_send (ctx, reply))
+	    ;
+	  vapi_msg_free (ctx, *msg);
+	  goto again;
+	}
+    }
+
   return rv;
 }
 
 vapi_error_e
 vapi_wait (vapi_ctx_t ctx)
 {
+  if (ctx->use_uds)
+    return VAPI_ENOTSUP;
+
   svm_queue_lock (ctx->vl_input_queue);
   svm_queue_wait (ctx->vl_input_queue);
   svm_queue_unlock (ctx->vl_input_queue);
diff --git a/src/vpp-api/vapi/vapi.h b/src/vpp-api/vapi/vapi.h
index c131920..970c508 100644
--- a/src/vpp-api/vapi/vapi.h
+++ b/src/vpp-api/vapi/vapi.h
@@ -108,6 +108,25 @@
 			   bool handle_keepalives);
 
 /**
+ * @brief connect to vpp
+ *
+ * @param ctx opaque vapi context, must be allocated using vapi_ctx_alloc first
+ * @param name application name
+ * @param path shared memory prefix or path to unix socket
+ * @param max_outstanding_requests max number of outstanding requests queued
+ * @param response_queue_size size of the response queue
+ * @param mode mode of operation - blocking or nonblocking
+ * @param handle_keepalives - if true, automatically handle memclnt_keepalive
+ * @param use_uds - if true, use unix domain socket transport
+ *
+ * @return VAPI_OK on success, other error code on error
+ */
+vapi_error_e vapi_connect_ex (vapi_ctx_t ctx, const char *name,
+			      const char *path, int max_outstanding_requests,
+			      int response_queue_size, vapi_mode_e mode,
+			      bool handle_keepalives, bool use_uds);
+
+/**
  * @brief connect to vpp from a client in same process
  * @remark This MUST be called from a separate thread. If called
  *         from the main thread, it will deadlock.
diff --git a/src/vpp-api/vapi/vapi.hpp b/src/vpp-api/vapi/vapi.hpp
index e0dd742..109af9f 100644
--- a/src/vpp-api/vapi/vapi.hpp
+++ b/src/vpp-api/vapi/vapi.hpp
@@ -203,13 +203,14 @@
    *
    * @return VAPI_OK on success, other error code on error
    */
-  vapi_error_e connect (const char *name, const char *chroot_prefix,
-                        int max_outstanding_requests, int response_queue_size,
-                        bool handle_keepalives = true)
+  vapi_error_e
+  connect (const char *name, const char *chroot_prefix,
+	   int max_outstanding_requests, int response_queue_size,
+	   bool handle_keepalives = true, bool use_uds = false)
   {
-    return vapi_connect (vapi_ctx, name, chroot_prefix,
-                         max_outstanding_requests, response_queue_size,
-                         VAPI_MODE_BLOCKING, handle_keepalives);
+    return vapi_connect_ex (vapi_ctx, name, chroot_prefix,
+			    max_outstanding_requests, response_queue_size,
+			    VAPI_MODE_BLOCKING, handle_keepalives, use_uds);
   }
 
   /**
diff --git a/src/vpp-api/vapi/vapi_c_test.c b/src/vpp-api/vapi/vapi_c_test.c
index 3f88ded..7a0e462 100644
--- a/src/vpp-api/vapi/vapi_c_test.c
+++ b/src/vpp-api/vapi/vapi_c_test.c
@@ -43,6 +43,7 @@
 
 static char *app_name = NULL;
 static char *api_prefix = NULL;
+static bool use_uds = false;
 static const int max_outstanding_requests = 64;
 static const int response_queue_size = 32;
 
@@ -65,8 +66,9 @@
   ck_assert_ptr_eq (NULL, sv);
   rv = vapi_send (ctx, sv);
   ck_assert_int_eq (VAPI_EINVAL, rv);
-  rv = vapi_connect (ctx, app_name, api_prefix, max_outstanding_requests,
-		     response_queue_size, VAPI_MODE_BLOCKING, true);
+  rv =
+    vapi_connect_ex (ctx, app_name, api_prefix, max_outstanding_requests,
+		     response_queue_size, VAPI_MODE_BLOCKING, true, use_uds);
   ck_assert_int_eq (VAPI_OK, rv);
   rv = vapi_send (ctx, NULL);
   ck_assert_int_eq (VAPI_EINVAL, rv);
@@ -367,8 +369,9 @@
   vapi_ctx_t ctx;
   vapi_error_e rv = vapi_ctx_alloc (&ctx);
   ck_assert_int_eq (VAPI_OK, rv);
-  rv = vapi_connect (ctx, app_name, api_prefix, max_outstanding_requests,
-		     response_queue_size, VAPI_MODE_BLOCKING, true);
+  rv =
+    vapi_connect_ex (ctx, app_name, api_prefix, max_outstanding_requests,
+		     response_queue_size, VAPI_MODE_BLOCKING, true, use_uds);
   ck_assert_int_eq (VAPI_OK, rv);
   rv = vapi_disconnect (ctx);
   ck_assert_int_eq (VAPI_OK, rv);
@@ -384,8 +387,9 @@
 {
   vapi_error_e rv = vapi_ctx_alloc (&ctx);
   ck_assert_int_eq (VAPI_OK, rv);
-  rv = vapi_connect (ctx, app_name, api_prefix, max_outstanding_requests,
-		     response_queue_size, VAPI_MODE_BLOCKING, true);
+  rv =
+    vapi_connect_ex (ctx, app_name, api_prefix, max_outstanding_requests,
+		     response_queue_size, VAPI_MODE_BLOCKING, true, use_uds);
   ck_assert_int_eq (VAPI_OK, rv);
 }
 
@@ -394,8 +398,9 @@
 {
   vapi_error_e rv = vapi_ctx_alloc (&ctx);
   ck_assert_int_eq (VAPI_OK, rv);
-  rv = vapi_connect (ctx, app_name, api_prefix, max_outstanding_requests,
-		     response_queue_size, VAPI_MODE_NONBLOCKING, true);
+  rv = vapi_connect_ex (ctx, app_name, api_prefix, max_outstanding_requests,
+			response_queue_size, VAPI_MODE_NONBLOCKING, true,
+			use_uds);
   ck_assert_int_eq (VAPI_OK, rv);
 }
 
@@ -1065,13 +1070,23 @@
 int
 main (int argc, char *argv[])
 {
-  if (3 != argc)
+  if (4 != argc)
     {
       printf ("Invalid argc==`%d'\n", argc);
       return EXIT_FAILURE;
     }
   app_name = argv[1];
   api_prefix = argv[2];
+  if (!strcmp (argv[3], "shm"))
+    use_uds = 0;
+  else if (!strcmp (argv[3], "uds"))
+    use_uds = 1;
+  else
+    {
+      printf ("Unrecognised required argument '%s', expected 'uds' or 'shm'.",
+	      argv[3]);
+      return EXIT_FAILURE;
+    }
   printf ("App name: `%s', API prefix: `%s'\n", app_name, api_prefix);
 
   int number_failed;
diff --git a/src/vpp-api/vapi/vapi_common.h b/src/vpp-api/vapi/vapi_common.h
index bf438f7..69b9b78 100644
--- a/src/vpp-api/vapi/vapi_common.h
+++ b/src/vpp-api/vapi/vapi_common.h
@@ -22,30 +22,34 @@
 extern "C" {
 #endif
 
-typedef enum
-{
-  VAPI_OK = 0,	      /**< success */
-  VAPI_EINVAL,	      /**< invalid value encountered */
-  VAPI_EAGAIN,	      /**< operation would block */
-  VAPI_ENOTSUP,	      /**< operation not supported */
-  VAPI_ENOMEM,	      /**< out of memory */
-  VAPI_ENORESP,	      /**< no response to request */
-  VAPI_EMAP_FAIL,     /**< failure while mapping api */
-  VAPI_ECON_FAIL,     /**< failure while connecting to vpp */
-  VAPI_EINCOMPATIBLE, /**< fundamental incompatibility while connecting to vpp
-                           (control ping/control ping reply mismatch) */
-  VAPI_MUTEX_FAILURE, /**< failure manipulating internal mutex(es) */
-  VAPI_EUSER,	      /**< user error used for breaking dispatch,
-                           never used by VAPI */
-} vapi_error_e;
+  typedef enum
+  {
+    VAPI_OK = 0,	/**< success */
+    VAPI_EINVAL,	/**< invalid value encountered */
+    VAPI_EAGAIN,	/**< operation would block */
+    VAPI_ENOTSUP,	/**< operation not supported */
+    VAPI_ENOMEM,	/**< out of memory */
+    VAPI_ENORESP,	/**< no response to request */
+    VAPI_EMAP_FAIL,	/**< failure while mapping api */
+    VAPI_ECON_FAIL,	/**< failure while connecting to vpp */
+    VAPI_EINCOMPATIBLE, /**< fundamental incompatibility while connecting to
+			   vpp (control ping/control ping reply mismatch) */
+    VAPI_MUTEX_FAILURE, /**< failure manipulating internal mutex(es) */
+    VAPI_EUSER,		/**< user error used for breaking dispatch,
+			     never used by VAPI */
+    VAPI_ENOTSOCK,	/**< vapi socket doesn't refer to a socket */
+    VAPI_EACCES,	/**< no write permission for socket */
+    VAPI_ECONNRESET,	/**< connection reset by peer*/
+    VAPI_ESOCK_FAILURE, /**< generic socket failure, check errno */
+  } vapi_error_e;
 
-typedef enum
-{
-  VAPI_MODE_BLOCKING = 1,    /**< operations block until response received */
-  VAPI_MODE_NONBLOCKING = 2, /**< operations never block */
-} vapi_mode_e;
+  typedef enum
+  {
+    VAPI_MODE_BLOCKING = 1,    /**< operations block until response received */
+    VAPI_MODE_NONBLOCKING = 2, /**< operations never block */
+  } vapi_mode_e;
 
-typedef unsigned int vapi_msg_id_t;
+  typedef unsigned int vapi_msg_id_t;
 
 #define VAPI_INVALID_MSG_ID ((vapi_msg_id_t)(~0))
 
diff --git a/src/vpp-api/vapi/vapi_cpp_test.cpp b/src/vpp-api/vapi/vapi_cpp_test.cpp
index 56ebb39..918c759 100644
--- a/src/vpp-api/vapi/vapi_cpp_test.cpp
+++ b/src/vpp-api/vapi/vapi_cpp_test.cpp
@@ -35,6 +35,7 @@
 
 static char *app_name = nullptr;
 static char *api_prefix = nullptr;
+static bool use_uds = false;
 static const int max_outstanding_requests = 32;
 static const int response_queue_size = 32;
 
@@ -60,8 +61,9 @@
 
 void setup (void)
 {
-  vapi_error_e rv = con.connect (
-      app_name, api_prefix, max_outstanding_requests, response_queue_size);
+  vapi_error_e rv =
+    con.connect (app_name, api_prefix, max_outstanding_requests,
+		 response_queue_size, true, use_uds);
   ck_assert_int_eq (VAPI_OK, rv);
 }
 
@@ -452,14 +454,25 @@
 
 int main (int argc, char *argv[])
 {
-  if (3 != argc)
+  if (4 != argc)
     {
       printf ("Invalid argc==`%d'\n", argc);
       return EXIT_FAILURE;
     }
   app_name = argv[1];
   api_prefix = argv[2];
-  printf ("App name: `%s', API prefix: `%s'\n", app_name, api_prefix);
+  if (!strcmp (argv[3], "shm"))
+    use_uds = 0;
+  else if (!strcmp (argv[3], "uds"))
+    use_uds = 1;
+  else
+    {
+      printf ("Unrecognised required argument '%s', expected 'uds' or 'shm'.",
+	      argv[3]);
+      return EXIT_FAILURE;
+    }
+  printf ("App name: `%s', API prefix: `%s', use unix sockets %d\n", app_name,
+	  api_prefix, use_uds);
 
   int number_failed;
   Suite *s;