blob: 6ba7d648bdb5b9018d1ae74618a3d348b2bc9fcf [file] [log] [blame]
Ole Troana03f4ef2016-12-02 12:53:55 +01001#!/usr/bin/env python
Ole Troan5f9dcff2016-08-01 04:59:13 +02002#
3# Copyright (c) 2016 Cisco and/or its affiliates.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at:
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
Ole Troana03f4ef2016-12-02 12:53:55 +010015#
Ole Troan5f9dcff2016-08-01 04:59:13 +020016
Ole Troan5f9dcff2016-08-01 04:59:13 +020017from __future__ import print_function
Ole Troana7564e82018-06-12 21:06:44 +020018from __future__ import absolute_import
Paul Vinciguerra2af6e922019-06-06 07:06:09 -040019import ctypes
Ole Troan4df97162017-07-07 16:06:08 +020020import sys
Paul Vinciguerra2af6e922019-06-06 07:06:09 -040021import multiprocessing as mp
Ole Troan4df97162017-07-07 16:06:08 +020022import os
23import logging
Paul Vinciguerra5fced042019-02-26 20:39:44 -080024import functools
Ole Troan4df97162017-07-07 16:06:08 +020025import json
26import threading
Chris Luke52bf22e2017-11-03 23:32:38 -040027import fnmatch
Klement Sekera180402d2018-02-17 10:58:37 +010028import weakref
Ole Troan4df97162017-07-07 16:06:08 +020029import atexit
Paul Vinciguerraae8819f2019-06-07 13:35:37 -040030from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType
Ole Troan53fffa12018-11-13 12:36:56 +010031from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
Ole Troan4df97162017-07-07 16:06:08 +020032
33if sys.version[0] == '2':
34 import Queue as queue
35else:
36 import queue as queue
37
Paul Vinciguerraae8819f2019-06-07 13:35:37 -040038__all__ = ('FuncWrapper', 'VPP', 'VppApiDynamicMethodHolder',
39 'VppEnum', 'VppEnumType',
40 'VPPIOError', 'VPPRuntimeError', 'VPPValueError',
41 'VPPApiClient', )
42
Ole Troanafddd832018-02-28 14:55:20 +010043
Paul Vinciguerra5fced042019-02-26 20:39:44 -080044def metaclass(metaclass):
45 @functools.wraps(metaclass)
46 def wrapper(cls):
47 return metaclass(cls.__name__, cls.__bases__, cls.__dict__.copy())
48
49 return wrapper
50
51
Ole Troan0685da42018-10-16 14:42:50 +020052class VppEnumType(type):
53 def __getattr__(cls, name):
54 t = vpp_get_type(name)
55 return t.enum
56
57
Paul Vinciguerra5fced042019-02-26 20:39:44 -080058@metaclass(VppEnumType)
Paul Vinciguerra7e713f12018-11-26 12:04:48 -080059class VppEnum(object):
Paul Vinciguerra5fced042019-02-26 20:39:44 -080060 pass
Ole Troan0685da42018-10-16 14:42:50 +020061
62
Klement Sekera180402d2018-02-17 10:58:37 +010063def vpp_atexit(vpp_weakref):
Ole Troan5016f992017-01-19 09:44:44 +010064 """Clean up VPP connection on shutdown."""
Klement Sekera180402d2018-02-17 10:58:37 +010065 vpp_instance = vpp_weakref()
Ole Troan94495f22018-08-02 11:58:12 +020066 if vpp_instance and vpp_instance.transport.connected:
Klement Sekera180402d2018-02-17 10:58:37 +010067 vpp_instance.logger.debug('Cleaning up VPP on exit')
68 vpp_instance.disconnect()
Ole Troan5016f992017-01-19 09:44:44 +010069
Ole Troan8006c6a2018-12-17 12:02:26 +010070
Ole Troan0bcad322018-12-11 13:04:01 +010071if sys.version[0] == '2':
72 def vpp_iterator(d):
Ole Troan4df97162017-07-07 16:06:08 +020073 return d.iteritems()
Ole Troan0bcad322018-12-11 13:04:01 +010074else:
75 def vpp_iterator(d):
Ole Troan4df97162017-07-07 16:06:08 +020076 return d.items()
77
78
Klement Sekera8aedf5e2018-07-06 11:07:21 +020079class VppApiDynamicMethodHolder(object):
Klement Sekera7112c542017-03-01 09:53:19 +010080 pass
81
82
83class FuncWrapper(object):
84 def __init__(self, func):
85 self._func = func
86 self.__name__ = func.__name__
Paul Vinciguerrab8daa252019-03-19 15:04:17 -070087 self.__doc__ = func.__doc__
Klement Sekera7112c542017-03-01 09:53:19 +010088
89 def __call__(self, **kwargs):
90 return self._func(**kwargs)
91
Paul Vinciguerra48664592019-06-19 22:19:02 -040092 def __repr__(self):
93 return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__)
94
Klement Sekera7112c542017-03-01 09:53:19 +010095
Paul Vinciguerra6ccc6e92018-11-27 08:15:22 -080096class VPPApiError(Exception):
97 pass
98
99
100class VPPNotImplementedError(NotImplementedError):
101 pass
102
103
104class VPPIOError(IOError):
105 pass
106
107
108class VPPRuntimeError(RuntimeError):
109 pass
110
111
112class VPPValueError(ValueError):
113 pass
114
Ole Troanedfe2c02019-07-30 15:38:13 +0200115class VPPApiJSONFiles(object):
Chris Luke52bf22e2017-11-03 23:32:38 -0400116 @classmethod
Ole Troanedfe2c02019-07-30 15:38:13 +0200117 def find_api_dir(cls, dirs):
Chris Luke52bf22e2017-11-03 23:32:38 -0400118 """Attempt to find the best directory in which API definition
119 files may reside. If the value VPP_API_DIR exists in the environment
120 then it is first on the search list. If we're inside a recognized
121 location in a VPP source tree (src/scripts and src/vpp-api/python)
122 then entries from there to the likely locations in build-root are
123 added. Finally the location used by system packages is added.
124
125 :returns: A single directory name, or None if no such directory
126 could be found.
127 """
Chris Luke52bf22e2017-11-03 23:32:38 -0400128
129 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
130 # in which case, plot a course to likely places in the src tree
131 import __main__ as main
132 if hasattr(main, '__file__'):
133 # get the path of the calling script
134 localdir = os.path.dirname(os.path.realpath(main.__file__))
135 else:
136 # use cwd if there is no calling script
Andrey "Zed" Zaikin68e2ffb2018-04-24 14:50:02 +0300137 localdir = os.getcwd()
Chris Luke52bf22e2017-11-03 23:32:38 -0400138 localdir_s = localdir.split(os.path.sep)
139
140 def dmatch(dir):
141 """Match dir against right-hand components of the script dir"""
142 d = dir.split('/') # param 'dir' assumes a / separator
Ole Troanafddd832018-02-28 14:55:20 +0100143 length = len(d)
144 return len(localdir_s) > length and localdir_s[-length:] == d
Chris Luke52bf22e2017-11-03 23:32:38 -0400145
146 def sdir(srcdir, variant):
147 """Build a path from srcdir to the staged API files of
148 'variant' (typically '' or '_debug')"""
149 # Since 'core' and 'plugin' files are staged
150 # in separate directories, we target the parent dir.
151 return os.path.sep.join((
152 srcdir,
153 'build-root',
154 'install-vpp%s-native' % variant,
155 'vpp',
156 'share',
157 'vpp',
158 'api',
159 ))
160
161 srcdir = None
162 if dmatch('src/scripts'):
163 srcdir = os.path.sep.join(localdir_s[:-2])
164 elif dmatch('src/vpp-api/python'):
165 srcdir = os.path.sep.join(localdir_s[:-3])
166 elif dmatch('test'):
167 # we're apparently running tests
168 srcdir = os.path.sep.join(localdir_s[:-1])
169
170 if srcdir:
171 # we're in the source tree, try both the debug and release
172 # variants.
Chris Luke52bf22e2017-11-03 23:32:38 -0400173 dirs.append(sdir(srcdir, '_debug'))
174 dirs.append(sdir(srcdir, ''))
175
176 # Test for staged copies of the scripts
177 # For these, since we explicitly know if we're running a debug versus
178 # release variant, target only the relevant directory
179 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
180 srcdir = os.path.sep.join(localdir_s[:-4])
181 dirs.append(sdir(srcdir, '_debug'))
182 if dmatch('build-root/install-vpp-native/vpp/bin'):
183 srcdir = os.path.sep.join(localdir_s[:-4])
184 dirs.append(sdir(srcdir, ''))
185
186 # finally, try the location system packages typically install into
187 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
188
Paul Vinciguerra19542292019-03-17 17:34:46 -0700189 # check the directories for existence; first one wins
Chris Luke52bf22e2017-11-03 23:32:38 -0400190 for dir in dirs:
191 if os.path.isdir(dir):
192 return dir
193
194 return None
195
196 @classmethod
197 def find_api_files(cls, api_dir=None, patterns='*'):
198 """Find API definition files from the given directory tree with the
199 given pattern. If no directory is given then find_api_dir() is used
200 to locate one. If no pattern is given then all definition files found
201 in the directory tree are used.
202
203 :param api_dir: A directory tree in which to locate API definition
204 files; subdirectories are descended into.
205 If this is None then find_api_dir() is called to discover it.
206 :param patterns: A list of patterns to use in each visited directory
207 when looking for files.
208 This can be a list/tuple object or a comma-separated string of
209 patterns. Each value in the list will have leading/trialing
210 whitespace stripped.
211 The pattern specifies the first part of the filename, '.api.json'
212 is appended.
213 The results are de-duplicated, thus overlapping patterns are fine.
214 If this is None it defaults to '*' meaning "all API files".
215 :returns: A list of file paths for the API files found.
216 """
217 if api_dir is None:
Ole Troanedfe2c02019-07-30 15:38:13 +0200218 api_dir = cls.find_api_dir([])
Chris Luke52bf22e2017-11-03 23:32:38 -0400219 if api_dir is None:
Paul Vinciguerra6ccc6e92018-11-27 08:15:22 -0800220 raise VPPApiError("api_dir cannot be located")
Chris Luke52bf22e2017-11-03 23:32:38 -0400221
222 if isinstance(patterns, list) or isinstance(patterns, tuple):
223 patterns = [p.strip() + '.api.json' for p in patterns]
224 else:
225 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
226
227 api_files = []
228 for root, dirnames, files in os.walk(api_dir):
229 # iterate all given patterns and de-dup the result
230 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
231 for filename in files:
232 api_files.append(os.path.join(root, filename))
233
234 return api_files
235
Ole Troanedfe2c02019-07-30 15:38:13 +0200236 @classmethod
237 def process_json_file(self, apidef_file):
238 api = json.load(apidef_file)
239 types = {}
240 services = {}
241 messages = {}
242 for t in api['enums']:
243 t[0] = 'vl_api_' + t[0] + '_t'
244 types[t[0]] = {'type': 'enum', 'data': t}
245 for t in api['unions']:
246 t[0] = 'vl_api_' + t[0] + '_t'
247 types[t[0]] = {'type': 'union', 'data': t}
248 for t in api['types']:
249 t[0] = 'vl_api_' + t[0] + '_t'
250 types[t[0]] = {'type': 'type', 'data': t}
251 for t, v in api['aliases'].items():
252 types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v}
253 services.update(api['services'])
254
255 i = 0
256 while True:
257 unresolved = {}
258 for k, v in types.items():
259 t = v['data']
260 if not vpp_get_type(k):
261 if v['type'] == 'enum':
262 try:
263 VPPEnumType(t[0], t[1:])
264 except ValueError:
265 unresolved[k] = v
266 elif v['type'] == 'union':
267 try:
268 VPPUnionType(t[0], t[1:])
269 except ValueError:
270 unresolved[k] = v
271 elif v['type'] == 'type':
272 try:
273 VPPType(t[0], t[1:])
274 except ValueError:
275 unresolved[k] = v
276 elif v['type'] == 'alias':
277 try:
278 VPPTypeAlias(k, t)
279 except ValueError:
280 unresolved[k] = v
281 if len(unresolved) == 0:
282 break
283 if i > 3:
284 raise VPPValueError('Unresolved type definitions {}'
285 .format(unresolved))
286 types = unresolved
287 i += 1
288
289 for m in api['messages']:
290 try:
291 messages[m[0]] = VPPMessage(m[0], m[1:])
292 except VPPNotImplementedError:
293 ### OLE FIXME
294 self.logger.error('Not implemented error for {}'.format(m[0]))
295 return messages, services
296
297class VPPApiClient(object):
298 """VPP interface.
299
300 This class provides the APIs to VPP. The APIs are loaded
301 from provided .api.json files and makes functions accordingly.
302 These functions are documented in the VPP .api files, as they
303 are dynamically created.
304
305 Additionally, VPP can send callback messages; this class
306 provides a means to register a callback function to receive
307 these messages in a background thread.
308 """
309 apidir = None
310 VPPApiError = VPPApiError
311 VPPRuntimeError = VPPRuntimeError
312 VPPValueError = VPPValueError
313 VPPNotImplementedError = VPPNotImplementedError
314 VPPIOError = VPPIOError
315
316
317 def __init__(self, apifiles=None, testmode=False, async_thread=True,
318 logger=None, loglevel=None,
319 read_timeout=5, use_socket=False,
320 server_address='/run/vpp/api.sock'):
321 """Create a VPP API object.
322
323 apifiles is a list of files containing API
324 descriptions that will be loaded - methods will be
325 dynamically created reflecting these APIs. If not
326 provided this will load the API files from VPP's
327 default install location.
328
329 logger, if supplied, is the logging logger object to log to.
330 loglevel, if supplied, is the log level this logger is set
331 to report at (from the loglevels in the logging module).
332 """
333 if logger is None:
334 logger = logging.getLogger(
335 "{}.{}".format(__name__, self.__class__.__name__))
336 if loglevel is not None:
337 logger.setLevel(loglevel)
338 self.logger = logger
339
340 self.messages = {}
341 self.services = {}
342 self.id_names = []
343 self.id_msgdef = []
344 self.header = VPPType('header', [['u16', 'msgid'],
345 ['u32', 'client_index']])
346 self.apifiles = []
347 self.event_callback = None
348 self.message_queue = queue.Queue()
349 self.read_timeout = read_timeout
350 self.async_thread = async_thread
351 self.event_thread = None
352 self.testmode = testmode
353 self.use_socket = use_socket
354 self.server_address = server_address
355 self._apifiles = apifiles
356
357 if use_socket:
358 from . vpp_transport_socket import VppTransport
359 else:
360 from . vpp_transport_shmem import VppTransport
361
362 if not apifiles:
363 # Pick up API definitions from default directory
364 try:
365 apifiles = VPPApiJSONFiles.find_api_files(self.apidir)
366 except RuntimeError:
367 # In test mode we don't care that we can't find the API files
368 if testmode:
369 apifiles = []
370 else:
371 raise VPPRuntimeError
372
373 for file in apifiles:
374 with open(file) as apidef_file:
375 m, s = VPPApiJSONFiles.process_json_file(apidef_file)
376 self.messages.update(m)
377 self.services.update(s)
378
379 self.apifiles = apifiles
380
381 # Basic sanity check
382 if len(self.messages) == 0 and not testmode:
383 raise VPPValueError(1, 'Missing JSON message definitions')
384
385 self.transport = VppTransport(self, read_timeout=read_timeout,
386 server_address=server_address)
387 # Make sure we allow VPP to clean up the message rings.
388 atexit.register(vpp_atexit, weakref.ref(self))
389
390 def get_function(self, name):
391 return getattr(self._api, name)
392
393
394 class ContextId(object):
395 """Multiprocessing-safe provider of unique context IDs."""
396 def __init__(self):
397 self.context = mp.Value(ctypes.c_uint, 0)
398 self.lock = mp.Lock()
399
400 def __call__(self):
401 """Get a new unique (or, at least, not recently used) context."""
402 with self.lock:
403 self.context.value += 1
404 return self.context.value
405 get_context = ContextId()
406
407 def get_type(self, name):
408 return vpp_get_type(name)
409
Klement Sekera7112c542017-03-01 09:53:19 +0100410 @property
411 def api(self):
412 if not hasattr(self, "_api"):
Paul Vinciguerra6ccc6e92018-11-27 08:15:22 -0800413 raise VPPApiError("Not connected, api definitions not available")
Klement Sekera7112c542017-03-01 09:53:19 +0100414 return self._api
415
Ole Troaneabd6072018-08-09 12:50:55 +0200416 def make_function(self, msg, i, multipart, do_async):
417 if (do_async):
Ole Troana7564e82018-06-12 21:06:44 +0200418 def f(**kwargs):
419 return self._call_vpp_async(i, msg, **kwargs)
420 else:
421 def f(**kwargs):
422 return self._call_vpp(i, msg, multipart, **kwargs)
423
424 f.__name__ = str(msg.name)
425 f.__doc__ = ", ".join(["%s %s" %
426 (msg.fieldtypes[j], k)
427 for j, k in enumerate(msg.fields)])
Ole Troanf159f582019-02-28 20:20:47 +0100428 f.msg = msg
429
Ole Troana7564e82018-06-12 21:06:44 +0200430 return f
431
Ole Troaneabd6072018-08-09 12:50:55 +0200432 def _register_functions(self, do_async=False):
Ole Troana03f4ef2016-12-02 12:53:55 +0100433 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
434 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
Klement Sekera8aedf5e2018-07-06 11:07:21 +0200435 self._api = VppApiDynamicMethodHolder()
Ole Troana7564e82018-06-12 21:06:44 +0200436 for name, msg in vpp_iterator(self.messages):
437 n = name + '_' + msg.crc[2:]
Ole Troandaa4bff2019-08-28 14:12:02 +0200438 i = self.transport.get_msg_index(n)
Ole Troan3cc49712017-03-08 12:02:24 +0100439 if i > 0:
Ole Troana7564e82018-06-12 21:06:44 +0200440 self.id_msgdef[i] = msg
Ole Troana03f4ef2016-12-02 12:53:55 +0100441 self.id_names[i] = name
Ole Troandfb984d2018-12-07 14:31:16 +0100442
443 # Create function for client side messages.
444 if name in self.services:
Ole Troan0bcad322018-12-11 13:04:01 +0100445 if 'stream' in self.services[name] and \
446 self.services[name]['stream']:
Ole Troandfb984d2018-12-07 14:31:16 +0100447 multipart = True
448 else:
449 multipart = False
450 f = self.make_function(msg, i, multipart, do_async)
451 setattr(self._api, name, FuncWrapper(f))
Ole Troan3cc49712017-03-08 12:02:24 +0100452 else:
Ole Troan4df97162017-07-07 16:06:08 +0200453 self.logger.debug(
454 'No such message type or failed CRC checksum: %s', n)
Ole Troana03f4ef2016-12-02 12:53:55 +0100455
Ole Troan4df97162017-07-07 16:06:08 +0200456 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200457 do_async):
Paul Vinciguerra3d04e332019-03-19 17:32:39 -0700458 pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None
Ole Troan94495f22018-08-02 11:58:12 +0200459
Ole Troandaa4bff2019-08-28 14:12:02 +0200460 rv = self.transport.connect(name, pfx,
Paul Vinciguerra3d04e332019-03-19 17:32:39 -0700461 msg_handler, rx_qlen)
Ole Troana03f4ef2016-12-02 12:53:55 +0100462 if rv != 0:
Paul Vinciguerra6ccc6e92018-11-27 08:15:22 -0800463 raise VPPIOError(2, 'Connect failed')
Ole Troan94495f22018-08-02 11:58:12 +0200464 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
Ole Troaneabd6072018-08-09 12:50:55 +0200465 self._register_functions(do_async=do_async)
Ole Troana03f4ef2016-12-02 12:53:55 +0100466
467 # Initialise control ping
Ole Troana7564e82018-06-12 21:06:44 +0200468 crc = self.messages['control_ping'].crc
Ole Troan94495f22018-08-02 11:58:12 +0200469 self.control_ping_index = self.transport.get_msg_index(
Ole Troandaa4bff2019-08-28 14:12:02 +0200470 ('control_ping' + '_' + crc[2:]))
Ole Troana03f4ef2016-12-02 12:53:55 +0100471 self.control_ping_msgdef = self.messages['control_ping']
Klement Sekera180402d2018-02-17 10:58:37 +0100472 if self.async_thread:
473 self.event_thread = threading.Thread(
474 target=self.thread_msg_handler)
475 self.event_thread.daemon = True
476 self.event_thread.start()
Vratko Polak94e45312019-05-27 18:36:23 +0200477 else:
478 self.event_thread = None
Ole Troan4df97162017-07-07 16:06:08 +0200479 return rv
Ole Troana03f4ef2016-12-02 12:53:55 +0100480
Ole Troaneabd6072018-08-09 12:50:55 +0200481 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
Ole Troandfc9b7c2017-03-06 23:51:57 +0100482 """Attach to VPP.
483
484 name - the name of the client.
485 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
Ole Troaneabd6072018-08-09 12:50:55 +0200486 do_async - if true, messages are sent without waiting for a reply
Ole Troandfc9b7c2017-03-06 23:51:57 +0100487 rx_qlen - the length of the VPP message receive queue between
488 client and server.
489 """
Ole Troan94495f22018-08-02 11:58:12 +0200490 msg_handler = self.transport.get_callback(do_async)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100491 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200492 do_async)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100493
Ole Troan6bf177c2017-08-17 10:34:32 +0200494 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
Ole Troandfc9b7c2017-03-06 23:51:57 +0100495 """Attach to VPP in synchronous mode. Application must poll for events.
496
497 name - the name of the client.
498 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
499 rx_qlen - the length of the VPP message receive queue between
500 client and server.
501 """
502
Ole Troan94495f22018-08-02 11:58:12 +0200503 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200504 do_async=False)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100505
Ole Troana03f4ef2016-12-02 12:53:55 +0100506 def disconnect(self):
Ole Troan5016f992017-01-19 09:44:44 +0100507 """Detach from VPP."""
Ole Troan94495f22018-08-02 11:58:12 +0200508 rv = self.transport.disconnect()
Vratko Polak94e45312019-05-27 18:36:23 +0200509 if self.event_thread is not None:
510 self.message_queue.put("terminate event thread")
Ole Troana03f4ef2016-12-02 12:53:55 +0100511 return rv
512
Ole Troan5016f992017-01-19 09:44:44 +0100513 def msg_handler_sync(self, msg):
514 """Process an incoming message from VPP in sync mode.
515
516 The message may be a reply or it may be an async notification.
517 """
518 r = self.decode_incoming_msg(msg)
519 if r is None:
Ole Troana03f4ef2016-12-02 12:53:55 +0100520 return
521
Ole Troan5016f992017-01-19 09:44:44 +0100522 # If we have a context, then use the context to find any
523 # request waiting for a reply
524 context = 0
525 if hasattr(r, 'context') and r.context > 0:
526 context = r.context
Ole Troan5f9dcff2016-08-01 04:59:13 +0200527
Ole Troan5016f992017-01-19 09:44:44 +0100528 if context == 0:
529 # No context -> async notification that we feed to the callback
Ole Troandfc9b7c2017-03-06 23:51:57 +0100530 self.message_queue.put_nowait(r)
Ole Troana03f4ef2016-12-02 12:53:55 +0100531 else:
Paul Vinciguerra6ccc6e92018-11-27 08:15:22 -0800532 raise VPPIOError(2, 'RPC reply message received in event handler')
Ole Troan5016f992017-01-19 09:44:44 +0100533
Ole Troan413f4a52018-11-28 11:36:05 +0100534 def has_context(self, msg):
535 if len(msg) < 10:
536 return False
537
538 header = VPPType('header_with_context', [['u16', 'msgid'],
539 ['u32', 'client_index'],
540 ['u32', 'context']])
541
542 (i, ci, context), size = header.unpack(msg, 0)
543 if self.id_names[i] == 'rx_thread_exit':
544 return
545
546 #
547 # Decode message and returns a tuple.
548 #
549 msgobj = self.id_msgdef[i]
550 if 'context' in msgobj.field_by_name and context >= 0:
551 return True
552 return False
553
Ole Troan0bcad322018-12-11 13:04:01 +0100554 def decode_incoming_msg(self, msg, no_type_conversion=False):
Ole Troan5016f992017-01-19 09:44:44 +0100555 if not msg:
Ole Troan3cc49712017-03-08 12:02:24 +0100556 self.logger.warning('vpp_api.read failed')
Ole Troan5016f992017-01-19 09:44:44 +0100557 return
Ole Troan413f4a52018-11-28 11:36:05 +0100558
Ole Troanc84cbad2018-09-06 22:58:05 +0200559 (i, ci), size = self.header.unpack(msg, 0)
Ole Troan5016f992017-01-19 09:44:44 +0100560 if self.id_names[i] == 'rx_thread_exit':
561 return
562
563 #
564 # Decode message and returns a tuple.
565 #
Ole Troana7564e82018-06-12 21:06:44 +0200566 msgobj = self.id_msgdef[i]
567 if not msgobj:
Paul Vinciguerra6ccc6e92018-11-27 08:15:22 -0800568 raise VPPIOError(2, 'Reply message undefined')
Ole Troan5016f992017-01-19 09:44:44 +0100569
Ole Troan0bcad322018-12-11 13:04:01 +0100570 r, size = msgobj.unpack(msg, ntc=no_type_conversion)
Ole Troana03f4ef2016-12-02 12:53:55 +0100571 return r
572
Ole Troan5016f992017-01-19 09:44:44 +0100573 def msg_handler_async(self, msg):
574 """Process a message from VPP in async mode.
575
576 In async mode, all messages are returned to the callback.
577 """
578 r = self.decode_incoming_msg(msg)
579 if r is None:
580 return
581
582 msgname = type(r).__name__
583
Ole Troan4df97162017-07-07 16:06:08 +0200584 if self.event_callback:
585 self.event_callback(msgname, r)
Ole Troan5016f992017-01-19 09:44:44 +0100586
587 def _control_ping(self, context):
588 """Send a ping command."""
589 self._call_vpp_async(self.control_ping_index,
Ole Troan4df97162017-07-07 16:06:08 +0200590 self.control_ping_msgdef,
Ole Troan5016f992017-01-19 09:44:44 +0100591 context=context)
592
Ole Troana7564e82018-06-12 21:06:44 +0200593 def validate_args(self, msg, kwargs):
594 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
595 if d:
Paul Vinciguerra6ccc6e92018-11-27 08:15:22 -0800596 raise VPPValueError('Invalid argument {} to {}'
Ole Troan0bcad322018-12-11 13:04:01 +0100597 .format(list(d), msg.name))
Ole Troana7564e82018-06-12 21:06:44 +0200598
Ole Troan413f4a52018-11-28 11:36:05 +0100599 def _call_vpp(self, i, msgdef, multipart, **kwargs):
Ole Troan5016f992017-01-19 09:44:44 +0100600 """Given a message, send the message and await a reply.
601
602 msgdef - the message packing definition
603 i - the message type index
604 multipart - True if the message returns multiple
605 messages in return.
606 context - context number - chosen at random if not
607 supplied.
608 The remainder of the kwargs are the arguments to the API call.
609
610 The return value is the message or message array containing
611 the response. It will raise an IOError exception if there was
612 no response within the timeout window.
613 """
614
Ole Troan4df97162017-07-07 16:06:08 +0200615 if 'context' not in kwargs:
Ole Troandfc9b7c2017-03-06 23:51:57 +0100616 context = self.get_context()
617 kwargs['context'] = context
618 else:
619 context = kwargs['context']
620 kwargs['_vl_msg_id'] = i
Ole Troan5016f992017-01-19 09:44:44 +0100621
Ole Troan0bcad322018-12-11 13:04:01 +0100622 no_type_conversion = kwargs.pop('_no_type_conversion', False)
623
Ole Troan94495f22018-08-02 11:58:12 +0200624 try:
625 if self.transport.socket_index:
626 kwargs['client_index'] = self.transport.socket_index
627 except AttributeError:
628 pass
Ole Troan413f4a52018-11-28 11:36:05 +0100629 self.validate_args(msgdef, kwargs)
630
Vratko Polakb6590202019-07-16 14:32:55 +0200631 s = 'Calling {}({})'.format(msgdef.name,
632 ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()]))
633 self.logger.debug(s)
Ole Troan413f4a52018-11-28 11:36:05 +0100634
635 b = msgdef.pack(kwargs)
Ole Troan94495f22018-08-02 11:58:12 +0200636 self.transport.suspend()
637
638 self.transport.write(b)
Ole Troan5016f992017-01-19 09:44:44 +0100639
640 if multipart:
641 # Send a ping after the request - we use its response
642 # to detect that we have seen all results.
643 self._control_ping(context)
644
645 # Block until we get a reply.
Ole Troandfc9b7c2017-03-06 23:51:57 +0100646 rl = []
647 while (True):
Ole Troan94495f22018-08-02 11:58:12 +0200648 msg = self.transport.read()
Ole Troandfc9b7c2017-03-06 23:51:57 +0100649 if not msg:
Paul Vinciguerra6ccc6e92018-11-27 08:15:22 -0800650 raise VPPIOError(2, 'VPP API client: read failed')
Ole Troan0bcad322018-12-11 13:04:01 +0100651 r = self.decode_incoming_msg(msg, no_type_conversion)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100652 msgname = type(r).__name__
Ole Troan4df97162017-07-07 16:06:08 +0200653 if context not in r or r.context == 0 or context != r.context:
Ole Troan94495f22018-08-02 11:58:12 +0200654 # Message being queued
Ole Troandfc9b7c2017-03-06 23:51:57 +0100655 self.message_queue.put_nowait(r)
656 continue
657
658 if not multipart:
659 rl = r
660 break
661 if msgname == 'control_ping_reply':
662 break
663
664 rl.append(r)
665
Ole Troan94495f22018-08-02 11:58:12 +0200666 self.transport.resume()
Ole Troandfc9b7c2017-03-06 23:51:57 +0100667
Klement Sekera5e2f84d2019-09-12 09:01:06 +0000668 s = 'Return value: {!r}'.format(r)
669 if len(s) > 80:
670 s = s[:80] + "..."
671 self.logger.debug(s)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100672 return rl
Ole Troan5016f992017-01-19 09:44:44 +0100673
Ole Troana7564e82018-06-12 21:06:44 +0200674 def _call_vpp_async(self, i, msg, **kwargs):
Vratko Polak2f6e0c62019-09-06 15:20:07 +0200675 """Given a message, send the message and return the context.
Ole Troan5016f992017-01-19 09:44:44 +0100676
677 msgdef - the message packing definition
678 i - the message type index
679 context - context number - chosen at random if not
680 supplied.
681 The remainder of the kwargs are the arguments to the API call.
Vratko Polak2f6e0c62019-09-06 15:20:07 +0200682
683 The reply message(s) will be delivered later to the registered callback.
684 The returned context will help with assigning which call
685 the reply belongs to.
Ole Troan5016f992017-01-19 09:44:44 +0100686 """
Ole Troan4df97162017-07-07 16:06:08 +0200687 if 'context' not in kwargs:
Ole Troan7e3a8752016-12-05 10:27:09 +0100688 context = self.get_context()
689 kwargs['context'] = context
690 else:
691 context = kwargs['context']
Ole Troan94495f22018-08-02 11:58:12 +0200692 try:
693 if self.transport.socket_index:
694 kwargs['client_index'] = self.transport.socket_index
695 except AttributeError:
696 kwargs['client_index'] = 0
Ole Troan7e3a8752016-12-05 10:27:09 +0100697 kwargs['_vl_msg_id'] = i
Ole Troana7564e82018-06-12 21:06:44 +0200698 b = msg.pack(kwargs)
Ole Troan7e3a8752016-12-05 10:27:09 +0100699
Ole Troan94495f22018-08-02 11:58:12 +0200700 self.transport.write(b)
Vratko Polak2f6e0c62019-09-06 15:20:07 +0200701 return context
Ole Troan7e3a8752016-12-05 10:27:09 +0100702
Ole Troana03f4ef2016-12-02 12:53:55 +0100703 def register_event_callback(self, callback):
Ole Troan5016f992017-01-19 09:44:44 +0100704 """Register a callback for async messages.
Ole Troana03f4ef2016-12-02 12:53:55 +0100705
Ole Troan5016f992017-01-19 09:44:44 +0100706 This will be called for async notifications in sync mode,
707 and all messages in async mode. In sync mode, replies to
708 requests will not come here.
709
710 callback is a fn(msg_type_name, msg_type) that will be
711 called when a message comes in. While this function is
712 executing, note that (a) you are in a background thread and
713 may wish to use threading.Lock to protect your datastructures,
714 and (b) message processing from VPP will stop (so if you take
715 a long while about it you may provoke reply timeouts or cause
716 VPP to fill the RX buffer). Passing None will disable the
717 callback.
718 """
719 self.event_callback = callback
Ole Troandfc9b7c2017-03-06 23:51:57 +0100720
721 def thread_msg_handler(self):
Ole Troan94495f22018-08-02 11:58:12 +0200722 """Python thread calling the user registered message handler.
Ole Troandfc9b7c2017-03-06 23:51:57 +0100723
724 This is to emulate the old style event callback scheme. Modern
725 clients should provide their own thread to poll the event
726 queue.
727 """
728 while True:
729 r = self.message_queue.get()
Klement Sekera180402d2018-02-17 10:58:37 +0100730 if r == "terminate event thread":
731 break
Ole Troandfc9b7c2017-03-06 23:51:57 +0100732 msgname = type(r).__name__
Ole Troan4df97162017-07-07 16:06:08 +0200733 if self.event_callback:
734 self.event_callback(msgname, r)
Chris Luke52bf22e2017-11-03 23:32:38 -0400735
Paul Vinciguerra8a2fa3b2019-06-16 21:23:31 -0400736 def __repr__(self):
737 return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \
738 "logger=%s, read_timeout=%s, use_socket=%s, " \
739 "server_address='%s'>" % (
740 self._apifiles, self.testmode, self.async_thread,
741 self.logger, self.read_timeout, self.use_socket,
742 self.server_address)
743
744
Paul Vinciguerra19542292019-03-17 17:34:46 -0700745# Provide the old name for backward compatibility.
746VPP = VPPApiClient
Chris Luke52bf22e2017-11-03 23:32:38 -0400747
748# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4