PAPI: Use UNIX domain sockets instead of shared memory
Adds support for running the API purely across Unix domain sockets.
Usage: vpp = VPP(use_socket=True)
Change-Id: Iafc1301e03dd3edc3f4d702dd6c0b98d3b50b69e
Signed-off-by: Ole Troan <ot@cisco.com>
diff --git a/src/vlibapi/api_common.h b/src/vlibapi/api_common.h
index 9432082..497a1e8 100644
--- a/src/vlibapi/api_common.h
+++ b/src/vlibapi/api_common.h
@@ -64,7 +64,7 @@
u32 clib_file_index; /**< Socket only: file index */
i8 *unprocessed_input; /**< Socket only: pending input */
u32 unprocessed_msg_length; /**< Socket only: unprocssed length */
- u8 *output_vector; /**< Socket only: output vecto */
+ u8 *output_vector; /**< Socket only: output vector */
int *additional_fds_to_close;
/* socket client only */
diff --git a/src/vlibmemory/api.h b/src/vlibmemory/api.h
index 2146b16..d66c439 100644
--- a/src/vlibmemory/api.h
+++ b/src/vlibmemory/api.h
@@ -55,10 +55,11 @@
always_inline vl_api_registration_t *
vl_api_client_index_to_registration (u32 index)
{
- if (PREDICT_FALSE (socket_main.current_rp != 0))
- return socket_main.current_rp;
-
- return (vl_mem_api_client_index_to_registration (index));
+ vl_api_registration_t *reg =
+ vl_socket_api_client_index_to_registration (index);
+ if (reg && reg->registration_type != REGISTRATION_TYPE_FREE)
+ return reg;
+ return vl_mem_api_client_index_to_registration (index);
}
always_inline u32
diff --git a/src/vlibmemory/memclnt.api b/src/vlibmemory/memclnt.api
index f88e5bd..451bc0e 100644
--- a/src/vlibmemory/memclnt.api
+++ b/src/vlibmemory/memclnt.api
@@ -17,7 +17,7 @@
option version = "2.0.0";
/*
- * Define services not following the normal convetions here
+ * Define services not following the normal conventions here
*/
service {
rpc memclnt_rx_thread_suspend returns null;
@@ -145,6 +145,12 @@
u16 last_msg_id;
};
+typedef message_table_entry
+{
+ u16 index;
+ u8 name[64];
+};
+
/*
* Create a socket client registration.
*/
@@ -154,23 +160,26 @@
};
define sockclnt_create_reply {
+ u32 client_index;
u32 context; /* opaque value from the create request */
i32 response; /* Non-negative = success */
- u64 handle; /* handle by which vlib knows this client */
u32 index; /* index, used e.g. by API trace replay */
+ u16 count;
+ vl_api_message_table_entry_t message_table[count];
};
/*
* Delete a client registration
*/
define sockclnt_delete {
+ u32 client_index;
+ u32 context;
u32 index; /* index, used e.g. by API trace replay */
- u64 handle; /* handle by which vlib knows this client */
};
define sockclnt_delete_reply {
+ u32 context;
i32 response; /* Non-negative = success */
- u64 handle; /* in case the client wonders */
};
/*
diff --git a/src/vlibmemory/socket_api.c b/src/vlibmemory/socket_api.c
index afe02d2..4787069 100644
--- a/src/vlibmemory/socket_api.c
+++ b/src/vlibmemory/socket_api.c
@@ -1,6 +1,6 @@
/*
*------------------------------------------------------------------
- * socksvr_vlib.c
+ * socket_api.c
*
* Copyright (c) 2009 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -74,6 +74,20 @@
/* *INDENT-ON* */
}
+vl_api_registration_t *
+vl_socket_api_client_index_to_registration (u32 index)
+{
+ socket_main_t *sm = &socket_main;
+ if (pool_is_free_index (sm->registration_pool, ntohl (index)))
+ {
+#if DEBUG > 2
+ clib_warning ("Invalid index %d\n", ntohl (index));
+#endif
+ return 0;
+ }
+ return pool_elt_at_index (sm->registration_pool, ntohl (index));
+}
+
void
vl_socket_api_send (vl_api_registration_t * rp, u8 * elem)
{
@@ -289,26 +303,37 @@
rp = pool_elt_at_index (socket_main.registration_pool, uf->private_data);
/* Flush output vector. */
- n = write (uf->file_descriptor, rp->output_vector,
- vec_len (rp->output_vector));
- if (n < 0)
+ size_t total_bytes = vec_len (rp->output_vector);
+ size_t bytes_to_send, remaining_bytes = total_bytes;
+ void *p = rp->output_vector;
+ while (remaining_bytes > 0)
{
-#if DEBUG > 2
- clib_warning ("write error, close the file...\n");
-#endif
- clib_file_del (fm, uf);
- vl_socket_free_registration_index (rp - socket_main.registration_pool);
- return 0;
- }
- else if (n > 0)
- {
- vec_delete (rp->output_vector, n, 0);
- if (vec_len (rp->output_vector) <= 0
- && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE))
+ bytes_to_send = remaining_bytes > 4096 ? 4096 : remaining_bytes;
+ n = write (uf->file_descriptor, p, bytes_to_send);
+ if (n < 0)
{
- uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
- fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY);
+ if (errno == EAGAIN)
+ {
+ break;
+ }
+#if DEBUG > 2
+ clib_warning ("write error, close the file...\n");
+#endif
+ clib_file_del (fm, uf);
+ vl_socket_free_registration_index (rp -
+ socket_main.registration_pool);
+ return 0;
}
+ remaining_bytes -= bytes_to_send;
+ p += bytes_to_send;
+ }
+
+ vec_delete (rp->output_vector, total_bytes - remaining_bytes, 0);
+ if (vec_len (rp->output_vector) <= 0
+ && (uf->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE))
+ {
+ uf->flags &= ~UNIX_FILE_DATA_AVAILABLE_TO_WRITE;
+ fm->file_update (uf, UNIX_FILE_UPDATE_MODIFY);
}
return 0;
@@ -379,7 +404,11 @@
{
vl_api_registration_t *regp;
vl_api_sockclnt_create_reply_t *rp;
+ api_main_t *am = &api_main;
+ hash_pair_t *hp;
int rv = 0;
+ u32 nmsg = hash_elts (am->msg_index_by_name_and_crc);
+ u32 i = 0;
regp = socket_main.current_rp;
@@ -387,13 +416,22 @@
regp->name = format (0, "%s%c", mp->name, 0);
- rp = vl_msg_api_alloc (sizeof (*rp));
+ u32 size = sizeof (*rp) + (nmsg * sizeof (vl_api_message_table_entry_t));
+ rp = vl_msg_api_alloc (size);
rp->_vl_msg_id = htons (VL_API_SOCKCLNT_CREATE_REPLY);
- rp->handle = (uword) regp;
- rp->index = (uword) regp->vl_api_registration_pool_index;
+ rp->index = htonl (regp->vl_api_registration_pool_index);
rp->context = mp->context;
rp->response = htonl (rv);
+ rp->count = htons (nmsg);
+ /* *INDENT-OFF* */
+ hash_foreach_pair (hp, am->msg_index_by_name_and_crc,
+ ({
+ rp->message_table[i].index = htons(hp->value[0]);
+ strncpy((char *)rp->message_table[i].name, (char *)hp->key, 64-1);
+ i++;
+ }));
+ /* *INDENT-ON* */
vl_api_send_msg (regp, (u8 *) rp);
}
@@ -406,23 +444,28 @@
vl_api_registration_t *regp;
vl_api_sockclnt_delete_reply_t *rp;
- if (!pool_is_free_index (socket_main.registration_pool, mp->index))
+ regp = vl_api_client_index_to_registration (mp->client_index);
+ if (!regp)
+ return;
+
+ u32 reg_index = ntohl (mp->index);
+ rp = vl_msg_api_alloc (sizeof (*rp));
+ rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY);
+ rp->context = mp->context;
+
+ if (!pool_is_free_index (socket_main.registration_pool, reg_index))
{
- regp = pool_elt_at_index (socket_main.registration_pool, mp->index);
-
- rp = vl_msg_api_alloc (sizeof (*rp));
- rp->_vl_msg_id = htons (VL_API_SOCKCLNT_DELETE_REPLY);
- rp->handle = mp->handle;
rp->response = htonl (1);
-
vl_api_send_msg (regp, (u8 *) rp);
vl_api_registration_del_file (regp);
- vl_socket_free_registration_index (mp->index);
+ vl_socket_free_registration_index (reg_index);
}
else
{
- clib_warning ("unknown client ID %d", mp->index);
+ clib_warning ("unknown client ID %d", reg_index);
+ rp->response = htonl (-1);
+ vl_api_send_msg (regp, (u8 *) rp);
}
}
diff --git a/src/vlibmemory/socket_api.h b/src/vlibmemory/socket_api.h
index 1e99550..00985fe 100644
--- a/src/vlibmemory/socket_api.h
+++ b/src/vlibmemory/socket_api.h
@@ -75,6 +75,8 @@
clib_error_t *vl_sock_api_recv_fd_msg (int socket_fd, int fds[], int n_fds,
u32 wait);
+vl_api_registration_t *vl_socket_api_client_index_to_registration (u32 index);
+
#endif /* SRC_VLIBMEMORY_SOCKET_API_H_ */
/*
diff --git a/src/vlibmemory/vlib_api.c b/src/vlibmemory/vlib_api.c
index 65fa34b..15a0ba8 100644
--- a/src/vlibmemory/vlib_api.c
+++ b/src/vlibmemory/vlib_api.c
@@ -165,8 +165,8 @@
vl_api_send_msg (reg, (u8 *) rmp);
}
-#define foreach_vlib_api_msg \
-_(GET_FIRST_MSG_ID, get_first_msg_id) \
+#define foreach_vlib_api_msg \
+_(GET_FIRST_MSG_ID, get_first_msg_id) \
_(API_VERSIONS, api_versions)
/*
diff --git a/src/vpp-api/client/client.c b/src/vpp-api/client/client.c
index 6c922db..68269bb 100644
--- a/src/vpp-api/client/client.c
+++ b/src/vpp-api/client/client.c
@@ -507,7 +507,7 @@
u32 client_index;
}) vl_api_header_t;
-static unsigned int
+static u32
vac_client_index (void)
{
return (api_main.my_client_index);
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index 4f765ec..e1a7059 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -26,8 +26,6 @@
import fnmatch
import weakref
import atexit
-from cffi import FFI
-import cffi
from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
from . vpp_serializer import VPPMessage
@@ -36,41 +34,15 @@
else:
import queue as queue
-ffi = FFI()
-ffi.cdef("""
-typedef void (*vac_callback_t)(unsigned char * data, int len);
-typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
-int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
- int rx_qlen);
-int vac_disconnect(void);
-int vac_read(char **data, int *l, unsigned short timeout);
-int vac_write(char *data, int len);
-void vac_free(void * msg);
-
-int vac_get_msg_index(unsigned char * name);
-int vac_msg_table_size(void);
-int vac_msg_table_max_index(void);
-
-void vac_rx_suspend (void);
-void vac_rx_resume (void);
-void vac_set_error_handler(vac_error_callback_t);
- """)
-
-# Barfs on failure, no need to check success.
-vpp_api = ffi.dlopen('libvppapiclient.so')
-
def vpp_atexit(vpp_weakref):
"""Clean up VPP connection on shutdown."""
vpp_instance = vpp_weakref()
- if vpp_instance and vpp_instance.connected:
+ if vpp_instance and vpp_instance.transport.connected:
vpp_instance.logger.debug('Cleaning up VPP on exit')
vpp_instance.disconnect()
-vpp_object = None
-
-
def vpp_iterator(d):
if sys.version[0] == '2':
return d.iteritems()
@@ -78,21 +50,6 @@
return d.items()
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_sync(data, len):
- vpp_object.msg_handler_sync(ffi.buffer(data, len))
-
-
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_async(data, len):
- vpp_object.msg_handler_async(ffi.buffer(data, len))
-
-
-@ffi.callback("void(void *, unsigned char *, int)")
-def vac_error_handler(arg, msg, msg_len):
- vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
-
-
class VppApiDynamicMethodHolder(object):
pass
@@ -168,7 +125,8 @@
def __init__(self, apifiles=None, testmode=False, async_thread=True,
logger=logging.getLogger('vpp_papi'), loglevel='debug',
- read_timeout=0):
+ read_timeout=5, use_socket=False,
+ server_address='/run/vpp-api.sock'):
"""Create a VPP API object.
apifiles is a list of files containing API
@@ -181,9 +139,6 @@
loglevel, if supplied, is the log level this logger is set
to report at (from the loglevels in the logging module).
"""
- global vpp_object
- vpp_object = self
-
if logger is None:
logger = logging.getLogger(__name__)
if loglevel is not None:
@@ -193,16 +148,19 @@
self.messages = {}
self.id_names = []
self.id_msgdef = []
- self.connected = False
self.header = VPPType('header', [['u16', 'msgid'],
['u32', 'client_index']])
self.apifiles = []
self.event_callback = None
self.message_queue = queue.Queue()
self.read_timeout = read_timeout
- self.vpp_api = vpp_api
self.async_thread = async_thread
+ if use_socket:
+ from . vpp_transport_socket import VppTransport
+ else:
+ from . vpp_transport_shmem import VppTransport
+
if not apifiles:
# Pick up API definitions from default directory
try:
@@ -224,22 +182,11 @@
if len(self.messages) == 0 and not testmode:
raise ValueError(1, 'Missing JSON message definitions')
+ self.transport = VppTransport(self, read_timeout=read_timeout,
+ server_address=server_address)
# Make sure we allow VPP to clean up the message rings.
atexit.register(vpp_atexit, weakref.ref(self))
- # Register error handler
- if not testmode:
- vpp_api.vac_set_error_handler(vac_error_handler)
-
- # Support legacy CFFI
- # from_buffer supported from 1.8.0
- (major, minor, patch) = [int(s) for s in
- cffi.__version__.split('.', 3)]
- if major >= 1 and minor >= 8:
- self._write = self._write_new_cffi
- else:
- self._write = self._write_legacy_cffi
-
class ContextId(object):
"""Thread-safe provider of unique context IDs."""
def __init__(self):
@@ -377,11 +324,6 @@
return api_files
- def status(self):
- """Debug function: report current VPP API status to stdout."""
- print('Connected') if self.connected else print('Not Connected')
- print('Read API definitions from', ', '.join(self.apifiles))
-
@property
def api(self):
if not hasattr(self, "_api"):
@@ -408,7 +350,7 @@
self._api = VppApiDynamicMethodHolder()
for name, msg in vpp_iterator(self.messages):
n = name + '_' + msg.crc[2:]
- i = vpp_api.vac_get_msg_index(n.encode())
+ i = self.transport.get_msg_index(n.encode())
if i > 0:
self.id_msgdef[i] = msg
self.id_names[i] = name
@@ -420,43 +362,19 @@
self.logger.debug(
'No such message type or failed CRC checksum: %s', n)
- def _write_new_cffi(self, buf):
- """Send a binary-packed message to VPP."""
- if not self.connected:
- raise IOError(1, 'Not connected')
- return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
-
- def _write_legacy_cffi(self, buf):
- """Send a binary-packed message to VPP."""
- if not self.connected:
- raise IOError(1, 'Not connected')
- return vpp_api.vac_write(bytes(buf), len(buf))
-
- def _read(self):
- if not self.connected:
- raise IOError(1, 'Not connected')
- mem = ffi.new("char **")
- size = ffi.new("int *")
- rv = vpp_api.vac_read(mem, size, self.read_timeout)
- if rv:
- raise IOError(rv, 'vac_read failed')
- msg = bytes(ffi.buffer(mem[0], size[0]))
- vpp_api.vac_free(mem[0])
- return msg
-
def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
do_async):
- pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
- rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
+ pfx = chroot_prefix.encode() if chroot_prefix else None
+
+ rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
if rv != 0:
raise IOError(2, 'Connect failed')
- self.connected = True
- self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
+ self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
self._register_functions(do_async=do_async)
# Initialise control ping
crc = self.messages['control_ping'].crc
- self.control_ping_index = vpp_api.vac_get_msg_index(
+ self.control_ping_index = self.transport.get_msg_index(
('control_ping' + '_' + crc[2:]).encode())
self.control_ping_msgdef = self.messages['control_ping']
if self.async_thread:
@@ -475,7 +393,7 @@
rx_qlen - the length of the VPP message receive queue between
client and server.
"""
- msg_handler = vac_callback_sync if not do_async else vac_callback_async
+ msg_handler = self.transport.get_callback(do_async)
return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
do_async)
@@ -488,13 +406,12 @@
client and server.
"""
- return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
+ return self.connect_internal(name, None, chroot_prefix, rx_qlen,
do_async=False)
def disconnect(self):
"""Detach from VPP."""
- rv = vpp_api.vac_disconnect()
- self.connected = False
+ rv = self.transport.disconnect()
self.message_queue.put("terminate event thread")
return rv
@@ -586,10 +503,16 @@
context = kwargs['context']
kwargs['_vl_msg_id'] = i
+ try:
+ if self.transport.socket_index:
+ kwargs['client_index'] = self.transport.socket_index
+ except AttributeError:
+ pass
self.validate_args(msg, kwargs)
b = msg.pack(kwargs)
- vpp_api.vac_rx_suspend()
- self._write(b)
+ self.transport.suspend()
+
+ self.transport.write(b)
if multipart:
# Send a ping after the request - we use its response
@@ -599,12 +522,13 @@
# Block until we get a reply.
rl = []
while (True):
- msg = self._read()
+ msg = self.transport.read()
if not msg:
raise IOError(2, 'VPP API client: read failed')
r = self.decode_incoming_msg(msg)
msgname = type(r).__name__
if context not in r or r.context == 0 or context != r.context:
+ # Message being queued
self.message_queue.put_nowait(r)
continue
@@ -616,7 +540,7 @@
rl.append(r)
- vpp_api.vac_rx_resume()
+ self.transport.resume()
return rl
@@ -634,11 +558,15 @@
kwargs['context'] = context
else:
context = kwargs['context']
- kwargs['client_index'] = 0
+ try:
+ if self.transport.socket_index:
+ kwargs['client_index'] = self.transport.socket_index
+ except AttributeError:
+ kwargs['client_index'] = 0
kwargs['_vl_msg_id'] = i
b = msg.pack(kwargs)
- self._write(b)
+ self.transport.write(b)
def register_event_callback(self, callback):
"""Register a callback for async messages.
@@ -659,7 +587,7 @@
self.event_callback = callback
def thread_msg_handler(self):
- """Python thread calling the user registerd message handler.
+ """Python thread calling the user registered message handler.
This is to emulate the old style event callback scheme. Modern
clients should provide their own thread to poll the event
diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py
index 2177cdb..103a078 100644
--- a/src/vpp-api/python/vpp_papi/vpp_serializer.py
+++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py
@@ -80,7 +80,7 @@
if len(data[offset:]) < self.num:
raise ValueError('Invalid array length for "{}" got {}'
' expected {}'
- .format(self.name, len(data), self.num))
+ .format(self.name, len(data[offset:]), self.num))
return self.packer.unpack(data, offset)
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py
new file mode 100644
index 0000000..a20295b
--- /dev/null
+++ b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py
@@ -0,0 +1,117 @@
+#
+# A transport class. With two implementations.
+# One for socket and one for shared memory.
+#
+
+from cffi import FFI
+import cffi
+
+ffi = FFI()
+ffi.cdef("""
+typedef void (*vac_callback_t)(unsigned char * data, int len);
+typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
+int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
+ int rx_qlen);
+int vac_disconnect(void);
+int vac_read(char **data, int *l, unsigned short timeout);
+int vac_write(char *data, int len);
+void vac_free(void * msg);
+
+int vac_get_msg_index(unsigned char * name);
+int vac_msg_table_size(void);
+int vac_msg_table_max_index(void);
+
+void vac_rx_suspend (void);
+void vac_rx_resume (void);
+void vac_set_error_handler(vac_error_callback_t);
+ """)
+
+vpp_object = None
+
+# Barfs on failure, no need to check success.
+vpp_api = ffi.dlopen('libvppapiclient.so')
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_sync(data, len):
+ vpp_object.msg_handler_sync(ffi.buffer(data, len))
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_async(data, len):
+ vpp_object.msg_handler_async(ffi.buffer(data, len))
+
+
+@ffi.callback("void(void *, unsigned char *, int)")
+def vac_error_handler(arg, msg, msg_len):
+ vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
+
+
+class VppTransport:
+ def __init__(self, parent, read_timeout, server_address):
+ self.connected = False
+ self.read_timeout = read_timeout
+ self.parent = parent
+ global vpp_object
+ vpp_object = parent
+
+ # Register error handler
+ vpp_api.vac_set_error_handler(vac_error_handler)
+
+ # Support legacy CFFI
+ # from_buffer supported from 1.8.0
+ (major, minor, patch) = [int(s) for s in
+ cffi.__version__.split('.', 3)]
+ if major >= 1 and minor >= 8:
+ self.write = self._write_new_cffi
+ else:
+ self.write = self._write_legacy_cffi
+
+ def connect(self, name, pfx, msg_handler, rx_qlen):
+ self.connected = True
+ if not pfx:
+ pfx = ffi.NULL
+ return vpp_api.vac_connect(name, pfx, msg_handler, rx_qlen)
+
+ def disconnect(self):
+ self.connected = False
+ vpp_api.vac_disconnect()
+
+ def suspend(self):
+ vpp_api.vac_rx_suspend()
+
+ def resume(self):
+ vpp_api.vac_rx_resume()
+
+ def get_callback(self, async):
+ return vac_callback_sync if not async else vac_callback_async
+
+ def get_msg_index(self, name):
+ return vpp_api.vac_get_msg_index(name)
+
+ def msg_table_max_index(self):
+ return vpp_api.vac_msg_table_max_index()
+
+ def _write_new_cffi(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
+
+ def _write_legacy_cffi(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return vpp_api.vac_write(bytes(buf), len(buf))
+
+ def read(self):
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ mem = ffi.new("char **")
+ size = ffi.new("int *")
+ rv = vpp_api.vac_read(mem, size, self.read_timeout)
+ if rv:
+ raise IOError(rv, 'vac_read failed')
+ msg = bytes(ffi.buffer(mem[0], size[0]))
+ vpp_api.vac_free(mem[0])
+ return msg
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
new file mode 100644
index 0000000..1822deb
--- /dev/null
+++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
@@ -0,0 +1,176 @@
+#
+# VPP Unix Domain Socket Transport.
+#
+import socket
+import struct
+import threading
+import select
+import multiprocessing
+import logging
+
+
+class VppTransport:
+ def __init__(self, parent, read_timeout, server_address):
+ self.connected = False
+ self.read_timeout = read_timeout if read_timeout > 0 else 1
+ self.parent = parent
+ self.server_address = server_address
+ self.header = struct.Struct('>QII')
+ self.message_table = {}
+ self.sque = multiprocessing.Queue()
+ self.q = multiprocessing.Queue()
+ self.message_thread = threading.Thread(target=self.msg_thread_func)
+
+ def msg_thread_func(self):
+ while True:
+ try:
+ rlist, _, _ = select.select([self.socket,
+ self.sque._reader], [], [])
+ except socket.error:
+ # Terminate thread
+ logging.error('select failed')
+ self.q.put(None)
+ return
+
+ for r in rlist:
+ if r == self.sque._reader:
+ # Terminate
+ self.q.put(None)
+ return
+
+ elif r == self.socket:
+ try:
+ msg = self._read()
+ if not msg:
+ self.q.put(None)
+ return
+ except socket.error:
+ self.q.put(None)
+ return
+ # Put either to local queue or if context == 0
+ # callback queue
+ r = self.parent.decode_incoming_msg(msg)
+ if hasattr(r, 'context') and r.context > 0:
+ self.q.put(msg)
+ else:
+ self.parent.msg_handler_async(msg)
+ else:
+ raise IOError(2, 'Unknown response from select')
+
+ def connect(self, name, pfx, msg_handler, rx_qlen):
+
+ # Create a UDS socket
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ self.socket.settimeout(self.read_timeout)
+
+ # Connect the socket to the port where the server is listening
+ try:
+ self.socket.connect(self.server_address)
+ except socket.error as msg:
+ logging.error(msg)
+ raise
+
+ self.connected = True
+
+ # Initialise sockclnt_create
+ sockclnt_create = self.parent.messages['sockclnt_create']
+ sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
+
+ args = {'_vl_msg_id': 15,
+ 'name': name,
+ 'context': 124}
+ b = sockclnt_create.pack(args)
+ self.write(b)
+ msg = self._read()
+ hdr, length = self.parent.header.unpack(msg, 0)
+ if hdr.msgid != 16:
+ raise IOError('Invalid reply message')
+
+ r, length = sockclnt_create_reply.unpack(msg)
+ self.socket_index = r.index
+ for m in r.message_table:
+ n = m.name.rstrip(b'\x00\x13')
+ self.message_table[n] = m.index
+
+ self.message_thread.daemon = True
+ self.message_thread.start()
+
+ return 0
+
+ def disconnect(self):
+ try: # Might fail, if VPP closes socket before packet makes it out
+ rv = self.parent.api.sockclnt_delete(index=self.socket_index)
+ except IOError:
+ pass
+ self.connected = False
+ self.socket.close()
+ self.sque.put(True) # Terminate listening thread
+ self.message_thread.join()
+
+ def suspend(self):
+ pass
+
+ def resume(self):
+ pass
+
+ def callback(self):
+ raise NotImplemented
+
+ def get_callback(self, async):
+ return self.callback
+
+ def get_msg_index(self, name):
+ try:
+ return self.message_table[name]
+ except KeyError:
+ return 0
+
+ def msg_table_max_index(self):
+ return len(self.message_table)
+
+ def write(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+
+ # Send header
+ header = self.header.pack(0, len(buf), 0)
+ n = self.socket.send(header)
+ n = self.socket.send(buf)
+
+ def _read(self):
+ # Header and message
+ try:
+ msg = self.socket.recv(4096)
+ if len(msg) == 0:
+ return None
+ except socket.error as message:
+ logging.error(message)
+ raise
+
+ (_, l, _) = self.header.unpack(msg[:16])
+
+ if l > len(msg):
+ buf = bytearray(l + 16)
+ view = memoryview(buf)
+ view[:4096] = msg
+ view = view[4096:]
+ # Read rest of message
+ remaining_bytes = l - 4096 + 16
+ while remaining_bytes > 0:
+ bytes_to_read = (remaining_bytes if remaining_bytes
+ <= 4096 else 4096)
+ nbytes = self.socket.recv_into(view, bytes_to_read)
+ if nbytes == 0:
+ logging.error('recv failed')
+ break
+ view = view[nbytes:]
+ remaining_bytes -= nbytes
+ else:
+ buf = msg
+ return buf[16:]
+
+ def read(self):
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return self.q.get()