PAPI: Use UNIX domain sockets instead of shared memory

Adds support for running the API purely across Unix domain sockets.
Usage: vpp = VPP(use_socket=True)

Change-Id: Iafc1301e03dd3edc3f4d702dd6c0b98d3b50b69e
Signed-off-by: Ole Troan <ot@cisco.com>
diff --git a/src/vlibapi/api_common.h b/src/vlibapi/api_common.h
index 9432082..497a1e8 100644
--- a/src/vlibapi/api_common.h
+++ b/src/vlibapi/api_common.h
@@ -64,7 +64,7 @@
   u32 clib_file_index;		/**< Socket only: file index */
   i8 *unprocessed_input;	/**< Socket only: pending input */
   u32 unprocessed_msg_length;	/**< Socket only: unprocssed length */
-  u8 *output_vector;		/**< Socket only: output vecto  */
+  u8 *output_vector;		/**< Socket only: output vector */
   int *additional_fds_to_close;
 
   /* socket client only */
diff --git a/src/vlibmemory/api.h b/src/vlibmemory/api.h
index 2146b16..d66c439 100644
--- a/src/vlibmemory/api.h
+++ b/src/vlibmemory/api.h
@@ -55,10 +55,11 @@
 always_inline vl_api_registration_t *
 vl_api_client_index_to_registration (u32 index)
 {
-  if (PREDICT_FALSE (socket_main.current_rp != 0))
-    return socket_main.current_rp;
-
-  return (vl_mem_api_client_index_to_registration (index));
+  vl_api_registration_t *reg =
+    vl_socket_api_client_index_to_registration (index);
+  if (reg && reg->registration_type != REGISTRATION_TYPE_FREE)
+    return reg;
+  return vl_mem_api_client_index_to_registration (index);
 }
 
 always_inline u32
diff --git a/src/vlibmemory/memclnt.api b/src/vlibmemory/memclnt.api
index f88e5bd..451bc0e 100644
--- a/src/vlibmemory/memclnt.api
+++ b/src/vlibmemory/memclnt.api
@@ -17,7 +17,7 @@
 option version = "2.0.0";
 
 /*
- * Define services not following the normal convetions here
+ * Define services not following the normal conventions here
  */
 service {
   rpc memclnt_rx_thread_suspend returns null;
@@ -145,6 +145,12 @@
     u16 last_msg_id;
 };
 
+typedef message_table_entry
+{
+  u16 index;
+  u8 name[64];
+};
+
 /*
  * Create a socket client registration. 
  */
@@ -154,23 +160,26 @@
 };
 
 define sockclnt_create_reply {
+    u32 client_index;
     u32 context;                /* opaque value from the create request */
     i32 response;               /* Non-negative = success */
-    u64 handle;           	/* handle by which vlib knows this client */
     u32 index;                  /* index, used e.g. by API trace replay */
+    u16 count;
+    vl_api_message_table_entry_t message_table[count];
 };
 
 /*
  * Delete a client registration 
  */
 define sockclnt_delete {
+    u32 client_index;
+    u32 context;
     u32 index;                  /* index, used e.g. by API trace replay */
-    u64 handle;               	/* handle by which vlib knows this client */
 };
 
 define sockclnt_delete_reply {
+    u32 context;
     i32 response;               /* Non-negative = success */
-    u64 handle;               	/* in case the client wonders */
 };
 
 /*
diff --git a/src/vlibmemory/socket_api.c b/src/vlibmemory/socket_api.c
index afe02d2..4787069 100644
--- a/src/vlibmemory/socket_api.c
+++ b/src/vlibmemory/socket_api.c
@@ -1,6 +1,6 @@
 /*
  *------------------------------------------------------------------
- * socksvr_vlib.c
+ * socket_api.c
  *
  * Copyright (c) 2009 Cisco and/or its affiliates.
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -74,6 +74,20 @@
 /* *INDENT-ON* */
 }
 
+vl_api_registration_t *
+vl_socket_api_client_index_to_registration (u32 index)
+{
+  socket_main_t *sm = &socket_main;
+  if (pool_is_free_index (sm->registration_pool, ntohl (index)))
+    {
+#if DEBUG > 2
+      clib_warning ("Invalid index %d\n", ntohl (index));
+#endif
+      return 0;
+    }
+  return pool_elt_at_index (sm->registration_pool, ntohl (index));
+}
+
 void
 vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
 {
@@ -289,26 +303,37 @@
   rp = pool_elt_at_index (socket_main.registration_pool, uf->private_data);
 
   /* Flush output vector. */
-  n = write (uf->file_descriptor, rp->output_vector,
-	     vec_len (rp->output_vector));
-  if (n < 0)
+  size_t total_bytes = vec_len (rp->output_vector);
+  size_t bytes_to_send, remaining_bytes = total_bytes;
+  void *p = rp->output_vector;
+  while (remaining_bytes > 0)
     {
-#if DEBUG > 2
-      clib_warning ("write error, close the file...\n");
-#endif
-      clib_file_del (fm, uf);
-      vl_socket_free_registration_index (rp - socket_main.registration_pool);
-      return 0;
-    }
-  else if (n > 0)
-    {
-      vec_delete (rp->output_vector, n, 0);
-      if (vec_len (rp->output_vector) <= 0
-	  && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE))
+      bytes_to_send = remaining_bytes > 4096 ? 4096 : remaining_bytes;
+      n = write (uf->file_descriptor, p, bytes_to_send);
+      if (n < 0)
 	{
-	  uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
-	  fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY);
+	  if (errno == EAGAIN)
+	    {
+	      break;
+	    }
+#if DEBUG > 2
+	  clib_warning ("write error, close the file...\n");
+#endif
+	  clib_file_del (fm, uf);
+	  vl_socket_free_registration_index (rp -
+					     socket_main.registration_pool);
+	  return 0;
 	}
+      remaining_bytes -= bytes_to_send;
+      p += bytes_to_send;
+    }
+
+  vec_delete (rp->output_vector, total_bytes - remaining_bytes, 0);
+  if (vec_len (rp->output_vector) <= 0
+      && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE))
+    {
+      uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
+      fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY);
     }
 
   return 0;
@@ -379,7 +404,11 @@
 {
   vl_api_registration_t *regp;
   vl_api_sockclnt_create_reply_t *rp;
+  api_main_t *am = &api_main;
+  hash_pair_t *hp;
   int rv = 0;
+  u32 nmsg = hash_elts (am->msg_index_by_name_and_crc);
+  u32 i = 0;
 
   regp = socket_main.current_rp;
 
@@ -387,13 +416,22 @@
 
   regp->name = format (0, "%s%c", mp->name, 0);
 
-  rp = vl_msg_api_alloc (sizeof (*rp));
+  u32 size = sizeof (*rp) + (nmsg * sizeof (vl_api_message_table_entry_t));
+  rp = vl_msg_api_alloc (size);
   rp->_vl_msg_id = htons (VL_API_SOCKCLNT_CREATE_REPLY);
-  rp->handle = (uword) regp;
-  rp->index = (uword) regp->vl_api_registration_pool_index;
+  rp->index = htonl (regp->vl_api_registration_pool_index);
   rp->context = mp->context;
   rp->response = htonl (rv);
+  rp->count = htons (nmsg);
 
+  /* *INDENT-OFF* */
+  hash_foreach_pair (hp, am->msg_index_by_name_and_crc,
+  ({
+    rp->message_table[i].index = htons(hp->value[0]);
+    strncpy((char *)rp->message_table[i].name, (char *)hp->key, 64-1);
+    i++;
+  }));
+  /* *INDENT-ON* */
   vl_api_send_msg (regp, (u8 *) rp);
 }
 
@@ -406,23 +444,28 @@
   vl_api_registration_t *regp;
   vl_api_sockclnt_delete_reply_t *rp;
 
-  if (!pool_is_free_index (socket_main.registration_pool, mp->index))
+  regp = vl_api_client_index_to_registration (mp->client_index);
+  if (!regp)
+    return;
+
+  u32 reg_index = ntohl (mp->index);
+  rp = vl_msg_api_alloc (sizeof (*rp));
+  rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY);
+  rp->context = mp->context;
+
+  if (!pool_is_free_index (socket_main.registration_pool, reg_index))
     {
-      regp = pool_elt_at_index (socket_main.registration_pool, mp->index);
-
-      rp = vl_msg_api_alloc (sizeof (*rp));
-      rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY);
-      rp->handle = mp->handle;
       rp->response = htonl (1);
-
       vl_api_send_msg (regp, (u8 *) rp);
 
       vl_api_registration_del_file (regp);
-      vl_socket_free_registration_index (mp->index);
+      vl_socket_free_registration_index (reg_index);
     }
   else
     {
-      clib_warning ("unknown client ID %d", mp->index);
+      clib_warning ("unknown client ID %d", reg_index);
+      rp->response = htonl (-1);
+      vl_api_send_msg (regp, (u8 *) rp);
     }
 }
 
diff --git a/src/vlibmemory/socket_api.h b/src/vlibmemory/socket_api.h
index 1e99550..00985fe 100644
--- a/src/vlibmemory/socket_api.h
+++ b/src/vlibmemory/socket_api.h
@@ -75,6 +75,8 @@
 clib_error_t *vl_sock_api_recv_fd_msg (int socket_fd, int fds[], int n_fds,
 				       u32 wait);
 
+vl_api_registration_t *vl_socket_api_client_index_to_registration (u32 index);
+
 #endif /* SRC_VLIBMEMORY_SOCKET_API_H_ */
 
 /*
diff --git a/src/vlibmemory/vlib_api.c b/src/vlibmemory/vlib_api.c
index 65fa34b..15a0ba8 100644
--- a/src/vlibmemory/vlib_api.c
+++ b/src/vlibmemory/vlib_api.c
@@ -165,8 +165,8 @@
   vl_api_send_msg (reg, (u8 *) rmp);
 }
 
-#define foreach_vlib_api_msg                            \
-_(GET_FIRST_MSG_ID, get_first_msg_id)                   \
+#define foreach_vlib_api_msg				\
+_(GET_FIRST_MSG_ID, get_first_msg_id)			\
 _(API_VERSIONS, api_versions)
 
 /*
diff --git a/src/vpp-api/client/client.c b/src/vpp-api/client/client.c
index 6c922db..68269bb 100644
--- a/src/vpp-api/client/client.c
+++ b/src/vpp-api/client/client.c
@@ -507,7 +507,7 @@
   u32 client_index;
 }) vl_api_header_t;
 
-static unsigned int
+static u32
 vac_client_index (void)
 {
   return (api_main.my_client_index);
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index 4f765ec..e1a7059 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -26,8 +26,6 @@
 import fnmatch
 import weakref
 import atexit
-from cffi import FFI
-import cffi
 from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
 from . vpp_serializer import VPPMessage
 
@@ -36,41 +34,15 @@
 else:
     import queue as queue
 
-ffi = FFI()
-ffi.cdef("""
-typedef void (*vac_callback_t)(unsigned char * data, int len);
-typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
-int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
-    int rx_qlen);
-int vac_disconnect(void);
-int vac_read(char **data, int *l, unsigned short timeout);
-int vac_write(char *data, int len);
-void vac_free(void * msg);
-
-int vac_get_msg_index(unsigned char * name);
-int vac_msg_table_size(void);
-int vac_msg_table_max_index(void);
-
-void vac_rx_suspend (void);
-void vac_rx_resume (void);
-void vac_set_error_handler(vac_error_callback_t);
- """)
-
-# Barfs on failure, no need to check success.
-vpp_api = ffi.dlopen('libvppapiclient.so')
-
 
 def vpp_atexit(vpp_weakref):
     """Clean up VPP connection on shutdown."""
     vpp_instance = vpp_weakref()
-    if vpp_instance and vpp_instance.connected:
+    if vpp_instance and vpp_instance.transport.connected:
         vpp_instance.logger.debug('Cleaning up VPP on exit')
         vpp_instance.disconnect()
 
 
-vpp_object = None
-
-
 def vpp_iterator(d):
     if sys.version[0] == '2':
         return d.iteritems()
@@ -78,21 +50,6 @@
         return d.items()
 
 
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_sync(data, len):
-    vpp_object.msg_handler_sync(ffi.buffer(data, len))
-
-
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_async(data, len):
-    vpp_object.msg_handler_async(ffi.buffer(data, len))
-
-
-@ffi.callback("void(void *, unsigned char *, int)")
-def vac_error_handler(arg, msg, msg_len):
-    vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
-
-
 class VppApiDynamicMethodHolder(object):
     pass
 
@@ -168,7 +125,8 @@
 
     def __init__(self, apifiles=None, testmode=False, async_thread=True,
                  logger=logging.getLogger('vpp_papi'), loglevel='debug',
-                 read_timeout=0):
+                 read_timeout=5, use_socket=False,
+                 server_address='/run/vpp-api.sock'):
         """Create a VPP API object.
 
         apifiles is a list of files containing API
@@ -181,9 +139,6 @@
         loglevel, if supplied, is the log level this logger is set
         to report at (from the loglevels in the logging module).
         """
-        global vpp_object
-        vpp_object = self
-
         if logger is None:
             logger = logging.getLogger(__name__)
             if loglevel is not None:
@@ -193,16 +148,19 @@
         self.messages = {}
         self.id_names = []
         self.id_msgdef = []
-        self.connected = False
         self.header = VPPType('header', [['u16', 'msgid'],
                                          ['u32', 'client_index']])
         self.apifiles = []
         self.event_callback = None
         self.message_queue = queue.Queue()
         self.read_timeout = read_timeout
-        self.vpp_api = vpp_api
         self.async_thread = async_thread
 
+        if use_socket:
+            from . vpp_transport_socket import VppTransport
+        else:
+            from . vpp_transport_shmem import VppTransport
+
         if not apifiles:
             # Pick up API definitions from default directory
             try:
@@ -224,22 +182,11 @@
         if len(self.messages) == 0 and not testmode:
             raise ValueError(1, 'Missing JSON message definitions')
 
+        self.transport = VppTransport(self, read_timeout=read_timeout,
+                                      server_address=server_address)
         # Make sure we allow VPP to clean up the message rings.
         atexit.register(vpp_atexit, weakref.ref(self))
 
-        # Register error handler
-        if not testmode:
-            vpp_api.vac_set_error_handler(vac_error_handler)
-
-        # Support legacy CFFI
-        # from_buffer supported from 1.8.0
-        (major, minor, patch) = [int(s) for s in
-                                 cffi.__version__.split('.', 3)]
-        if major >= 1 and minor >= 8:
-            self._write = self._write_new_cffi
-        else:
-            self._write = self._write_legacy_cffi
-
     class ContextId(object):
         """Thread-safe provider of unique context IDs."""
         def __init__(self):
@@ -377,11 +324,6 @@
 
         return api_files
 
-    def status(self):
-        """Debug function: report current VPP API status to stdout."""
-        print('Connected') if self.connected else print('Not Connected')
-        print('Read API definitions from', ', '.join(self.apifiles))
-
     @property
     def api(self):
         if not hasattr(self, "_api"):
@@ -408,7 +350,7 @@
         self._api = VppApiDynamicMethodHolder()
         for name, msg in vpp_iterator(self.messages):
             n = name + '_' + msg.crc[2:]
-            i = vpp_api.vac_get_msg_index(n.encode())
+            i = self.transport.get_msg_index(n.encode())
             if i > 0:
                 self.id_msgdef[i] = msg
                 self.id_names[i] = name
@@ -420,43 +362,19 @@
                 self.logger.debug(
                     'No such message type or failed CRC checksum: %s', n)
 
-    def _write_new_cffi(self, buf):
-        """Send a binary-packed message to VPP."""
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
-
-    def _write_legacy_cffi(self, buf):
-        """Send a binary-packed message to VPP."""
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        return vpp_api.vac_write(bytes(buf), len(buf))
-
-    def _read(self):
-        if not self.connected:
-            raise IOError(1, 'Not connected')
-        mem = ffi.new("char **")
-        size = ffi.new("int *")
-        rv = vpp_api.vac_read(mem, size, self.read_timeout)
-        if rv:
-            raise IOError(rv, 'vac_read failed')
-        msg = bytes(ffi.buffer(mem[0], size[0]))
-        vpp_api.vac_free(mem[0])
-        return msg
-
     def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
                          do_async):
-        pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
-        rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
+        pfx = chroot_prefix.encode() if chroot_prefix else None
+
+        rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
         if rv != 0:
             raise IOError(2, 'Connect failed')
-        self.connected = True
-        self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
+        self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
         self._register_functions(do_async=do_async)
 
         # Initialise control ping
         crc = self.messages['control_ping'].crc
-        self.control_ping_index = vpp_api.vac_get_msg_index(
+        self.control_ping_index = self.transport.get_msg_index(
             ('control_ping' + '_' + crc[2:]).encode())
         self.control_ping_msgdef = self.messages['control_ping']
         if self.async_thread:
@@ -475,7 +393,7 @@
         rx_qlen - the length of the VPP message receive queue between
         client and server.
         """
-        msg_handler = vac_callback_sync if not do_async else vac_callback_async
+        msg_handler = self.transport.get_callback(do_async)
         return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
                                      do_async)
 
@@ -488,13 +406,12 @@
         client and server.
         """
 
-        return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
+        return self.connect_internal(name, None, chroot_prefix, rx_qlen,
                                      do_async=False)
 
     def disconnect(self):
         """Detach from VPP."""
-        rv = vpp_api.vac_disconnect()
-        self.connected = False
+        rv = self.transport.disconnect()
         self.message_queue.put("terminate event thread")
         return rv
 
@@ -586,10 +503,16 @@
             context = kwargs['context']
         kwargs['_vl_msg_id'] = i
 
+        try:
+            if self.transport.socket_index:
+                kwargs['client_index'] = self.transport.socket_index
+        except AttributeError:
+            pass
         self.validate_args(msg, kwargs)
         b = msg.pack(kwargs)
-        vpp_api.vac_rx_suspend()
-        self._write(b)
+        self.transport.suspend()
+
+        self.transport.write(b)
 
         if multipart:
             # Send a ping after the request - we use its response
@@ -599,12 +522,13 @@
         # Block until we get a reply.
         rl = []
         while (True):
-            msg = self._read()
+            msg = self.transport.read()
             if not msg:
                 raise IOError(2, 'VPP API client: read failed')
             r = self.decode_incoming_msg(msg)
             msgname = type(r).__name__
             if context not in r or r.context == 0 or context != r.context:
+                # Message being queued
                 self.message_queue.put_nowait(r)
                 continue
 
@@ -616,7 +540,7 @@
 
             rl.append(r)
 
-        vpp_api.vac_rx_resume()
+        self.transport.resume()
 
         return rl
 
@@ -634,11 +558,15 @@
             kwargs['context'] = context
         else:
             context = kwargs['context']
-        kwargs['client_index'] = 0
+        try:
+            if self.transport.socket_index:
+                kwargs['client_index'] = self.transport.socket_index
+        except AttributeError:
+            kwargs['client_index'] = 0
         kwargs['_vl_msg_id'] = i
         b = msg.pack(kwargs)
 
-        self._write(b)
+        self.transport.write(b)
 
     def register_event_callback(self, callback):
         """Register a callback for async messages.
@@ -659,7 +587,7 @@
         self.event_callback = callback
 
     def thread_msg_handler(self):
-        """Python thread calling the user registerd message handler.
+        """Python thread calling the user registered message handler.
 
         This is to emulate the old style event callback scheme. Modern
         clients should provide their own thread to poll the event
diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py
index 2177cdb..103a078 100644
--- a/src/vpp-api/python/vpp_papi/vpp_serializer.py
+++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py
@@ -80,7 +80,7 @@
         if len(data[offset:]) < self.num:
             raise ValueError('Invalid array length for "{}" got {}'
                              ' expected {}'
-                             .format(self.name, len(data), self.num))
+                             .format(self.name, len(data[offset:]), self.num))
         return self.packer.unpack(data, offset)
 
 
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py
new file mode 100644
index 0000000..a20295b
--- /dev/null
+++ b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py
@@ -0,0 +1,117 @@
+#
+# A transport class. With two implementations.
+# One for socket and one for shared memory.
+#
+
+from cffi import FFI
+import cffi
+
+ffi = FFI()
+ffi.cdef("""
+typedef void (*vac_callback_t)(unsigned char * data, int len);
+typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
+int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
+    int rx_qlen);
+int vac_disconnect(void);
+int vac_read(char **data, int *l, unsigned short timeout);
+int vac_write(char *data, int len);
+void vac_free(void * msg);
+
+int vac_get_msg_index(unsigned char * name);
+int vac_msg_table_size(void);
+int vac_msg_table_max_index(void);
+
+void vac_rx_suspend (void);
+void vac_rx_resume (void);
+void vac_set_error_handler(vac_error_callback_t);
+ """)
+
+vpp_object = None
+
+# Barfs on failure, no need to check success.
+vpp_api = ffi.dlopen('libvppapiclient.so')
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_sync(data, len):
+    vpp_object.msg_handler_sync(ffi.buffer(data, len))
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_async(data, len):
+    vpp_object.msg_handler_async(ffi.buffer(data, len))
+
+
+@ffi.callback("void(void *, unsigned char *, int)")
+def vac_error_handler(arg, msg, msg_len):
+    vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
+
+
+class VppTransport:
+    def __init__(self, parent, read_timeout, server_address):
+        self.connected = False
+        self.read_timeout = read_timeout
+        self.parent = parent
+        global vpp_object
+        vpp_object = parent
+
+        # Register error handler
+        vpp_api.vac_set_error_handler(vac_error_handler)
+
+        # Support legacy CFFI
+        # from_buffer supported from 1.8.0
+        (major, minor, patch) = [int(s) for s in
+                                 cffi.__version__.split('.', 3)]
+        if major >= 1 and minor >= 8:
+            self.write = self._write_new_cffi
+        else:
+            self.write = self._write_legacy_cffi
+
+    def connect(self, name, pfx, msg_handler, rx_qlen):
+        self.connected = True
+        if not pfx:
+            pfx = ffi.NULL
+        return vpp_api.vac_connect(name, pfx, msg_handler, rx_qlen)
+
+    def disconnect(self):
+        self.connected = False
+        vpp_api.vac_disconnect()
+
+    def suspend(self):
+        vpp_api.vac_rx_suspend()
+
+    def resume(self):
+        vpp_api.vac_rx_resume()
+
+    def get_callback(self, async):
+        return vac_callback_sync if not async else vac_callback_async
+
+    def get_msg_index(self, name):
+        return vpp_api.vac_get_msg_index(name)
+
+    def msg_table_max_index(self):
+        return vpp_api.vac_msg_table_max_index()
+
+    def _write_new_cffi(self, buf):
+        """Send a binary-packed message to VPP."""
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+        return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
+
+    def _write_legacy_cffi(self, buf):
+        """Send a binary-packed message to VPP."""
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+        return vpp_api.vac_write(bytes(buf), len(buf))
+
+    def read(self):
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+        mem = ffi.new("char **")
+        size = ffi.new("int *")
+        rv = vpp_api.vac_read(mem, size, self.read_timeout)
+        if rv:
+            raise IOError(rv, 'vac_read failed')
+        msg = bytes(ffi.buffer(mem[0], size[0]))
+        vpp_api.vac_free(mem[0])
+        return msg
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
new file mode 100644
index 0000000..1822deb
--- /dev/null
+++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
@@ -0,0 +1,176 @@
+#
+# VPP Unix Domain Socket Transport.
+#
+import socket
+import struct
+import threading
+import select
+import multiprocessing
+import logging
+
+
+class VppTransport:
+    def __init__(self, parent, read_timeout, server_address):
+        self.connected = False
+        self.read_timeout = read_timeout if read_timeout > 0 else 1
+        self.parent = parent
+        self.server_address = server_address
+        self.header = struct.Struct('>QII')
+        self.message_table = {}
+        self.sque = multiprocessing.Queue()
+        self.q = multiprocessing.Queue()
+        self.message_thread = threading.Thread(target=self.msg_thread_func)
+
+    def msg_thread_func(self):
+        while True:
+            try:
+                rlist, _, _ = select.select([self.socket,
+                                             self.sque._reader], [], [])
+            except socket.error:
+                # Terminate thread
+                logging.error('select failed')
+                self.q.put(None)
+                return
+
+            for r in rlist:
+                if r == self.sque._reader:
+                    # Terminate
+                    self.q.put(None)
+                    return
+
+                elif r == self.socket:
+                    try:
+                        msg = self._read()
+                        if not msg:
+                            self.q.put(None)
+                            return
+                    except socket.error:
+                        self.q.put(None)
+                        return
+                    # Put either to local queue or if context == 0
+                    # callback queue
+                    r = self.parent.decode_incoming_msg(msg)
+                    if hasattr(r, 'context') and r.context > 0:
+                        self.q.put(msg)
+                    else:
+                        self.parent.msg_handler_async(msg)
+                else:
+                    raise IOError(2, 'Unknown response from select')
+
+    def connect(self, name, pfx, msg_handler, rx_qlen):
+
+        # Create a UDS socket
+        self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        self.socket.settimeout(self.read_timeout)
+
+        # Connect the socket to the port where the server is listening
+        try:
+            self.socket.connect(self.server_address)
+        except socket.error as msg:
+            logging.error(msg)
+            raise
+
+        self.connected = True
+
+        # Initialise sockclnt_create
+        sockclnt_create = self.parent.messages['sockclnt_create']
+        sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
+
+        args = {'_vl_msg_id': 15,
+                'name': name,
+                'context': 124}
+        b = sockclnt_create.pack(args)
+        self.write(b)
+        msg = self._read()
+        hdr, length = self.parent.header.unpack(msg, 0)
+        if hdr.msgid != 16:
+            raise IOError('Invalid reply message')
+
+        r, length = sockclnt_create_reply.unpack(msg)
+        self.socket_index = r.index
+        for m in r.message_table:
+            n = m.name.rstrip(b'\x00\x13')
+            self.message_table[n] = m.index
+
+        self.message_thread.daemon = True
+        self.message_thread.start()
+
+        return 0
+
+    def disconnect(self):
+        try:  # Might fail, if VPP closes socket before packet makes it out
+            rv = self.parent.api.sockclnt_delete(index=self.socket_index)
+        except IOError:
+            pass
+        self.connected = False
+        self.socket.close()
+        self.sque.put(True)  # Terminate listening thread
+        self.message_thread.join()
+
+    def suspend(self):
+        pass
+
+    def resume(self):
+        pass
+
+    def callback(self):
+        raise NotImplemented
+
+    def get_callback(self, async):
+        return self.callback
+
+    def get_msg_index(self, name):
+        try:
+            return self.message_table[name]
+        except KeyError:
+            return 0
+
+    def msg_table_max_index(self):
+        return len(self.message_table)
+
+    def write(self, buf):
+        """Send a binary-packed message to VPP."""
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+
+        # Send header
+        header = self.header.pack(0, len(buf), 0)
+        n = self.socket.send(header)
+        n = self.socket.send(buf)
+
+    def _read(self):
+        # Header and message
+        try:
+            msg = self.socket.recv(4096)
+            if len(msg) == 0:
+                return None
+        except socket.error as message:
+            logging.error(message)
+            raise
+
+        (_, l, _) = self.header.unpack(msg[:16])
+
+        if l > len(msg):
+            buf = bytearray(l + 16)
+            view = memoryview(buf)
+            view[:4096] = msg
+            view = view[4096:]
+            # Read rest of message
+            remaining_bytes = l - 4096 + 16
+            while remaining_bytes > 0:
+                bytes_to_read = (remaining_bytes if remaining_bytes
+                                 <= 4096 else 4096)
+                nbytes = self.socket.recv_into(view, bytes_to_read)
+                if nbytes == 0:
+                    logging.error('recv failed')
+                    break
+                view = view[nbytes:]
+                remaining_bytes -= nbytes
+        else:
+            buf = msg
+        return buf[16:]
+
+    def read(self):
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+        return self.q.get()