blob: e1a7059f31799b50b58c21275cff138cb41dd6df [file] [log] [blame]
Ole Troana03f4ef2016-12-02 12:53:55 +01001#!/usr/bin/env python
Ole Troan5f9dcff2016-08-01 04:59:13 +02002#
3# Copyright (c) 2016 Cisco and/or its affiliates.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at:
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
Ole Troana03f4ef2016-12-02 12:53:55 +010015#
Ole Troan5f9dcff2016-08-01 04:59:13 +020016
Ole Troan5f9dcff2016-08-01 04:59:13 +020017from __future__ import print_function
Ole Troana7564e82018-06-12 21:06:44 +020018from __future__ import absolute_import
Ole Troan4df97162017-07-07 16:06:08 +020019import sys
20import os
21import logging
22import collections
23import struct
24import json
25import threading
Chris Luke52bf22e2017-11-03 23:32:38 -040026import fnmatch
Klement Sekera180402d2018-02-17 10:58:37 +010027import weakref
Ole Troan4df97162017-07-07 16:06:08 +020028import atexit
Ole Troana7564e82018-06-12 21:06:44 +020029from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
30from . vpp_serializer import VPPMessage
Ole Troan4df97162017-07-07 16:06:08 +020031
32if sys.version[0] == '2':
33 import Queue as queue
34else:
35 import queue as queue
36
Ole Troanafddd832018-02-28 14:55:20 +010037
Klement Sekera180402d2018-02-17 10:58:37 +010038def vpp_atexit(vpp_weakref):
Ole Troan5016f992017-01-19 09:44:44 +010039 """Clean up VPP connection on shutdown."""
Klement Sekera180402d2018-02-17 10:58:37 +010040 vpp_instance = vpp_weakref()
Ole Troan94495f22018-08-02 11:58:12 +020041 if vpp_instance and vpp_instance.transport.connected:
Klement Sekera180402d2018-02-17 10:58:37 +010042 vpp_instance.logger.debug('Cleaning up VPP on exit')
43 vpp_instance.disconnect()
Ole Troan5016f992017-01-19 09:44:44 +010044
Ole Troanafddd832018-02-28 14:55:20 +010045
Ole Troan4df97162017-07-07 16:06:08 +020046def vpp_iterator(d):
47 if sys.version[0] == '2':
48 return d.iteritems()
49 else:
50 return d.items()
51
52
Klement Sekera8aedf5e2018-07-06 11:07:21 +020053class VppApiDynamicMethodHolder(object):
Klement Sekera7112c542017-03-01 09:53:19 +010054 pass
55
56
57class FuncWrapper(object):
58 def __init__(self, func):
59 self._func = func
60 self.__name__ = func.__name__
61
62 def __call__(self, **kwargs):
63 return self._func(**kwargs)
64
65
Ole Troana03f4ef2016-12-02 12:53:55 +010066class VPP():
Ole Troan5016f992017-01-19 09:44:44 +010067 """VPP interface.
68
69 This class provides the APIs to VPP. The APIs are loaded
70 from provided .api.json files and makes functions accordingly.
71 These functions are documented in the VPP .api files, as they
72 are dynamically created.
73
74 Additionally, VPP can send callback messages; this class
75 provides a means to register a callback function to receive
76 these messages in a background thread.
77 """
Ole Troana7564e82018-06-12 21:06:44 +020078
79 def process_json_file(self, apidef_file):
80 api = json.load(apidef_file)
81 types = {}
82 for t in api['enums']:
83 t[0] = 'vl_api_' + t[0] + '_t'
84 types[t[0]] = {'type': 'enum', 'data': t}
85 for t in api['unions']:
86 t[0] = 'vl_api_' + t[0] + '_t'
87 types[t[0]] = {'type': 'union', 'data': t}
88 for t in api['types']:
89 t[0] = 'vl_api_' + t[0] + '_t'
90 types[t[0]] = {'type': 'type', 'data': t}
91
92 i = 0
93 while True:
94 unresolved = {}
95 for k, v in types.items():
96 t = v['data']
97 if v['type'] == 'enum':
98 try:
99 VPPEnumType(t[0], t[1:])
100 except ValueError:
101 unresolved[k] = v
102 elif v['type'] == 'union':
103 try:
104 VPPUnionType(t[0], t[1:])
105 except ValueError:
106 unresolved[k] = v
107 elif v['type'] == 'type':
108 try:
109 VPPType(t[0], t[1:])
110 except ValueError:
111 unresolved[k] = v
112 if len(unresolved) == 0:
113 break
114 if i > 3:
115 raise ValueError('Unresolved type definitions {}'
116 .format(unresolved))
117 types = unresolved
118 i += 1
119
120 for m in api['messages']:
121 try:
122 self.messages[m[0]] = VPPMessage(m[0], m[1:])
123 except NotImplementedError:
124 self.logger.error('Not implemented error for {}'.format(m[0]))
125
Ole Troan4df97162017-07-07 16:06:08 +0200126 def __init__(self, apifiles=None, testmode=False, async_thread=True,
Ole Troana7564e82018-06-12 21:06:44 +0200127 logger=logging.getLogger('vpp_papi'), loglevel='debug',
Ole Troan94495f22018-08-02 11:58:12 +0200128 read_timeout=5, use_socket=False,
129 server_address='/run/vpp-api.sock'):
Ole Troan5016f992017-01-19 09:44:44 +0100130 """Create a VPP API object.
131
132 apifiles is a list of files containing API
133 descriptions that will be loaded - methods will be
134 dynamically created reflecting these APIs. If not
135 provided this will load the API files from VPP's
136 default install location.
Ian Wellsd0e812f2018-06-06 14:12:27 +0100137
138 logger, if supplied, is the logging logger object to log to.
139 loglevel, if supplied, is the log level this logger is set
140 to report at (from the loglevels in the logging module).
Ole Troan5016f992017-01-19 09:44:44 +0100141 """
Ian Wellsd0e812f2018-06-06 14:12:27 +0100142 if logger is None:
143 logger = logging.getLogger(__name__)
144 if loglevel is not None:
145 logger.setLevel(loglevel)
Ole Troan3cc49712017-03-08 12:02:24 +0100146 self.logger = logger
Ole Troan3cc49712017-03-08 12:02:24 +0100147
Ole Troana03f4ef2016-12-02 12:53:55 +0100148 self.messages = {}
149 self.id_names = []
150 self.id_msgdef = []
Ole Troana7564e82018-06-12 21:06:44 +0200151 self.header = VPPType('header', [['u16', 'msgid'],
152 ['u32', 'client_index']])
Ole Troan5016f992017-01-19 09:44:44 +0100153 self.apifiles = []
Ole Troan3d31f002017-01-26 11:13:00 +0100154 self.event_callback = None
Ole Troan4df97162017-07-07 16:06:08 +0200155 self.message_queue = queue.Queue()
dongjuan84937522017-11-09 14:46:36 +0800156 self.read_timeout = read_timeout
Klement Sekera180402d2018-02-17 10:58:37 +0100157 self.async_thread = async_thread
Ole Troan5f9dcff2016-08-01 04:59:13 +0200158
Ole Troan94495f22018-08-02 11:58:12 +0200159 if use_socket:
160 from . vpp_transport_socket import VppTransport
161 else:
162 from . vpp_transport_shmem import VppTransport
163
Ole Troanf5984bd2016-12-18 13:15:08 +0100164 if not apifiles:
165 # Pick up API definitions from default directory
Chris Luke52bf22e2017-11-03 23:32:38 -0400166 try:
167 apifiles = self.find_api_files()
168 except RuntimeError:
169 # In test mode we don't care that we can't find the API files
170 if testmode:
171 apifiles = []
172 else:
173 raise
Ole Troanf5984bd2016-12-18 13:15:08 +0100174
Ole Troana03f4ef2016-12-02 12:53:55 +0100175 for file in apifiles:
Ole Troana03f4ef2016-12-02 12:53:55 +0100176 with open(file) as apidef_file:
Ole Troana7564e82018-06-12 21:06:44 +0200177 self.process_json_file(apidef_file)
Ole Troan5f9dcff2016-08-01 04:59:13 +0200178
Ole Troan4df97162017-07-07 16:06:08 +0200179 self.apifiles = apifiles
Ole Troan5f9dcff2016-08-01 04:59:13 +0200180
Ole Troana03f4ef2016-12-02 12:53:55 +0100181 # Basic sanity check
Ole Troanf5984bd2016-12-18 13:15:08 +0100182 if len(self.messages) == 0 and not testmode:
183 raise ValueError(1, 'Missing JSON message definitions')
Ole Troan5f9dcff2016-08-01 04:59:13 +0200184
Ole Troan94495f22018-08-02 11:58:12 +0200185 self.transport = VppTransport(self, read_timeout=read_timeout,
186 server_address=server_address)
Ole Troan5016f992017-01-19 09:44:44 +0100187 # Make sure we allow VPP to clean up the message rings.
Klement Sekera180402d2018-02-17 10:58:37 +0100188 atexit.register(vpp_atexit, weakref.ref(self))
Ole Troan5f9dcff2016-08-01 04:59:13 +0200189
Ole Troana03f4ef2016-12-02 12:53:55 +0100190 class ContextId(object):
Ole Troan5016f992017-01-19 09:44:44 +0100191 """Thread-safe provider of unique context IDs."""
Ole Troana03f4ef2016-12-02 12:53:55 +0100192 def __init__(self):
193 self.context = 0
Ole Troan4df97162017-07-07 16:06:08 +0200194 self.lock = threading.Lock()
195
Ole Troana03f4ef2016-12-02 12:53:55 +0100196 def __call__(self):
Ole Troan5016f992017-01-19 09:44:44 +0100197 """Get a new unique (or, at least, not recently used) context."""
Ole Troan4df97162017-07-07 16:06:08 +0200198 with self.lock:
199 self.context += 1
200 return self.context
Ole Troana03f4ef2016-12-02 12:53:55 +0100201 get_context = ContextId()
Ole Troan5f9dcff2016-08-01 04:59:13 +0200202
Chris Luke52bf22e2017-11-03 23:32:38 -0400203 @classmethod
204 def find_api_dir(cls):
205 """Attempt to find the best directory in which API definition
206 files may reside. If the value VPP_API_DIR exists in the environment
207 then it is first on the search list. If we're inside a recognized
208 location in a VPP source tree (src/scripts and src/vpp-api/python)
209 then entries from there to the likely locations in build-root are
210 added. Finally the location used by system packages is added.
211
212 :returns: A single directory name, or None if no such directory
213 could be found.
214 """
215 dirs = []
216
217 if 'VPP_API_DIR' in os.environ:
218 dirs.append(os.environ['VPP_API_DIR'])
219
220 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
221 # in which case, plot a course to likely places in the src tree
222 import __main__ as main
223 if hasattr(main, '__file__'):
224 # get the path of the calling script
225 localdir = os.path.dirname(os.path.realpath(main.__file__))
226 else:
227 # use cwd if there is no calling script
Andrey "Zed" Zaikin68e2ffb2018-04-24 14:50:02 +0300228 localdir = os.getcwd()
Chris Luke52bf22e2017-11-03 23:32:38 -0400229 localdir_s = localdir.split(os.path.sep)
230
231 def dmatch(dir):
232 """Match dir against right-hand components of the script dir"""
233 d = dir.split('/') # param 'dir' assumes a / separator
Ole Troanafddd832018-02-28 14:55:20 +0100234 length = len(d)
235 return len(localdir_s) > length and localdir_s[-length:] == d
Chris Luke52bf22e2017-11-03 23:32:38 -0400236
237 def sdir(srcdir, variant):
238 """Build a path from srcdir to the staged API files of
239 'variant' (typically '' or '_debug')"""
240 # Since 'core' and 'plugin' files are staged
241 # in separate directories, we target the parent dir.
242 return os.path.sep.join((
243 srcdir,
244 'build-root',
245 'install-vpp%s-native' % variant,
246 'vpp',
247 'share',
248 'vpp',
249 'api',
250 ))
251
252 srcdir = None
253 if dmatch('src/scripts'):
254 srcdir = os.path.sep.join(localdir_s[:-2])
255 elif dmatch('src/vpp-api/python'):
256 srcdir = os.path.sep.join(localdir_s[:-3])
257 elif dmatch('test'):
258 # we're apparently running tests
259 srcdir = os.path.sep.join(localdir_s[:-1])
260
261 if srcdir:
262 # we're in the source tree, try both the debug and release
263 # variants.
Chris Luke52bf22e2017-11-03 23:32:38 -0400264 dirs.append(sdir(srcdir, '_debug'))
265 dirs.append(sdir(srcdir, ''))
266
267 # Test for staged copies of the scripts
268 # For these, since we explicitly know if we're running a debug versus
269 # release variant, target only the relevant directory
270 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
271 srcdir = os.path.sep.join(localdir_s[:-4])
272 dirs.append(sdir(srcdir, '_debug'))
273 if dmatch('build-root/install-vpp-native/vpp/bin'):
274 srcdir = os.path.sep.join(localdir_s[:-4])
275 dirs.append(sdir(srcdir, ''))
276
277 # finally, try the location system packages typically install into
278 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
279
280 # check the directories for existance; first one wins
281 for dir in dirs:
282 if os.path.isdir(dir):
283 return dir
284
285 return None
286
287 @classmethod
288 def find_api_files(cls, api_dir=None, patterns='*'):
289 """Find API definition files from the given directory tree with the
290 given pattern. If no directory is given then find_api_dir() is used
291 to locate one. If no pattern is given then all definition files found
292 in the directory tree are used.
293
294 :param api_dir: A directory tree in which to locate API definition
295 files; subdirectories are descended into.
296 If this is None then find_api_dir() is called to discover it.
297 :param patterns: A list of patterns to use in each visited directory
298 when looking for files.
299 This can be a list/tuple object or a comma-separated string of
300 patterns. Each value in the list will have leading/trialing
301 whitespace stripped.
302 The pattern specifies the first part of the filename, '.api.json'
303 is appended.
304 The results are de-duplicated, thus overlapping patterns are fine.
305 If this is None it defaults to '*' meaning "all API files".
306 :returns: A list of file paths for the API files found.
307 """
308 if api_dir is None:
309 api_dir = cls.find_api_dir()
310 if api_dir is None:
311 raise RuntimeError("api_dir cannot be located")
312
313 if isinstance(patterns, list) or isinstance(patterns, tuple):
314 patterns = [p.strip() + '.api.json' for p in patterns]
315 else:
316 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
317
318 api_files = []
319 for root, dirnames, files in os.walk(api_dir):
320 # iterate all given patterns and de-dup the result
321 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
322 for filename in files:
323 api_files.append(os.path.join(root, filename))
324
325 return api_files
326
Klement Sekera7112c542017-03-01 09:53:19 +0100327 @property
328 def api(self):
329 if not hasattr(self, "_api"):
330 raise Exception("Not connected, api definitions not available")
331 return self._api
332
Ole Troaneabd6072018-08-09 12:50:55 +0200333 def make_function(self, msg, i, multipart, do_async):
334 if (do_async):
Ole Troana7564e82018-06-12 21:06:44 +0200335 def f(**kwargs):
336 return self._call_vpp_async(i, msg, **kwargs)
337 else:
338 def f(**kwargs):
339 return self._call_vpp(i, msg, multipart, **kwargs)
340
341 f.__name__ = str(msg.name)
342 f.__doc__ = ", ".join(["%s %s" %
343 (msg.fieldtypes[j], k)
344 for j, k in enumerate(msg.fields)])
345 return f
346
Ole Troaneabd6072018-08-09 12:50:55 +0200347 def _register_functions(self, do_async=False):
Ole Troana03f4ef2016-12-02 12:53:55 +0100348 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
349 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
Klement Sekera8aedf5e2018-07-06 11:07:21 +0200350 self._api = VppApiDynamicMethodHolder()
Ole Troana7564e82018-06-12 21:06:44 +0200351 for name, msg in vpp_iterator(self.messages):
352 n = name + '_' + msg.crc[2:]
Ole Troan94495f22018-08-02 11:58:12 +0200353 i = self.transport.get_msg_index(n.encode())
Ole Troan3cc49712017-03-08 12:02:24 +0100354 if i > 0:
Ole Troana7564e82018-06-12 21:06:44 +0200355 self.id_msgdef[i] = msg
Ole Troana03f4ef2016-12-02 12:53:55 +0100356 self.id_names[i] = name
Ole Troana7564e82018-06-12 21:06:44 +0200357 # TODO: Fix multipart (use services)
Ole Troana03f4ef2016-12-02 12:53:55 +0100358 multipart = True if name.find('_dump') > 0 else False
Ole Troaneabd6072018-08-09 12:50:55 +0200359 f = self.make_function(msg, i, multipart, do_async)
Klement Sekera7112c542017-03-01 09:53:19 +0100360 setattr(self._api, name, FuncWrapper(f))
Ole Troan3cc49712017-03-08 12:02:24 +0100361 else:
Ole Troan4df97162017-07-07 16:06:08 +0200362 self.logger.debug(
363 'No such message type or failed CRC checksum: %s', n)
Ole Troana03f4ef2016-12-02 12:53:55 +0100364
Ole Troan4df97162017-07-07 16:06:08 +0200365 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200366 do_async):
Ole Troan94495f22018-08-02 11:58:12 +0200367 pfx = chroot_prefix.encode() if chroot_prefix else None
368
369 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
Ole Troana03f4ef2016-12-02 12:53:55 +0100370 if rv != 0:
371 raise IOError(2, 'Connect failed')
Ole Troan94495f22018-08-02 11:58:12 +0200372 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
Ole Troaneabd6072018-08-09 12:50:55 +0200373 self._register_functions(do_async=do_async)
Ole Troana03f4ef2016-12-02 12:53:55 +0100374
375 # Initialise control ping
Ole Troana7564e82018-06-12 21:06:44 +0200376 crc = self.messages['control_ping'].crc
Ole Troan94495f22018-08-02 11:58:12 +0200377 self.control_ping_index = self.transport.get_msg_index(
Ole Troan4df97162017-07-07 16:06:08 +0200378 ('control_ping' + '_' + crc[2:]).encode())
Ole Troana03f4ef2016-12-02 12:53:55 +0100379 self.control_ping_msgdef = self.messages['control_ping']
Klement Sekera180402d2018-02-17 10:58:37 +0100380 if self.async_thread:
381 self.event_thread = threading.Thread(
382 target=self.thread_msg_handler)
383 self.event_thread.daemon = True
384 self.event_thread.start()
Ole Troan4df97162017-07-07 16:06:08 +0200385 return rv
Ole Troana03f4ef2016-12-02 12:53:55 +0100386
Ole Troaneabd6072018-08-09 12:50:55 +0200387 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
Ole Troandfc9b7c2017-03-06 23:51:57 +0100388 """Attach to VPP.
389
390 name - the name of the client.
391 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
Ole Troaneabd6072018-08-09 12:50:55 +0200392 do_async - if true, messages are sent without waiting for a reply
Ole Troandfc9b7c2017-03-06 23:51:57 +0100393 rx_qlen - the length of the VPP message receive queue between
394 client and server.
395 """
Ole Troan94495f22018-08-02 11:58:12 +0200396 msg_handler = self.transport.get_callback(do_async)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100397 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200398 do_async)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100399
Ole Troan6bf177c2017-08-17 10:34:32 +0200400 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
Ole Troandfc9b7c2017-03-06 23:51:57 +0100401 """Attach to VPP in synchronous mode. Application must poll for events.
402
403 name - the name of the client.
404 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
405 rx_qlen - the length of the VPP message receive queue between
406 client and server.
407 """
408
Ole Troan94495f22018-08-02 11:58:12 +0200409 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200410 do_async=False)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100411
Ole Troana03f4ef2016-12-02 12:53:55 +0100412 def disconnect(self):
Ole Troan5016f992017-01-19 09:44:44 +0100413 """Detach from VPP."""
Ole Troan94495f22018-08-02 11:58:12 +0200414 rv = self.transport.disconnect()
Klement Sekera180402d2018-02-17 10:58:37 +0100415 self.message_queue.put("terminate event thread")
Ole Troana03f4ef2016-12-02 12:53:55 +0100416 return rv
417
Ole Troan5016f992017-01-19 09:44:44 +0100418 def msg_handler_sync(self, msg):
419 """Process an incoming message from VPP in sync mode.
420
421 The message may be a reply or it may be an async notification.
422 """
423 r = self.decode_incoming_msg(msg)
424 if r is None:
Ole Troana03f4ef2016-12-02 12:53:55 +0100425 return
426
Ole Troan5016f992017-01-19 09:44:44 +0100427 # If we have a context, then use the context to find any
428 # request waiting for a reply
429 context = 0
430 if hasattr(r, 'context') and r.context > 0:
431 context = r.context
Ole Troan5f9dcff2016-08-01 04:59:13 +0200432
Ole Troan5016f992017-01-19 09:44:44 +0100433 if context == 0:
434 # No context -> async notification that we feed to the callback
Ole Troandfc9b7c2017-03-06 23:51:57 +0100435 self.message_queue.put_nowait(r)
Ole Troana03f4ef2016-12-02 12:53:55 +0100436 else:
Ole Troandfc9b7c2017-03-06 23:51:57 +0100437 raise IOError(2, 'RPC reply message received in event handler')
Ole Troan5016f992017-01-19 09:44:44 +0100438
439 def decode_incoming_msg(self, msg):
440 if not msg:
Ole Troan3cc49712017-03-08 12:02:24 +0100441 self.logger.warning('vpp_api.read failed')
Ole Troan5016f992017-01-19 09:44:44 +0100442 return
Ole Troanc84cbad2018-09-06 22:58:05 +0200443 (i, ci), size = self.header.unpack(msg, 0)
Ole Troan5016f992017-01-19 09:44:44 +0100444 if self.id_names[i] == 'rx_thread_exit':
445 return
446
447 #
448 # Decode message and returns a tuple.
449 #
Ole Troana7564e82018-06-12 21:06:44 +0200450 msgobj = self.id_msgdef[i]
451 if not msgobj:
Ole Troan5016f992017-01-19 09:44:44 +0100452 raise IOError(2, 'Reply message undefined')
453
Ole Troanc84cbad2018-09-06 22:58:05 +0200454 r, size = msgobj.unpack(msg)
Ole Troana03f4ef2016-12-02 12:53:55 +0100455 return r
456
Ole Troan5016f992017-01-19 09:44:44 +0100457 def msg_handler_async(self, msg):
458 """Process a message from VPP in async mode.
459
460 In async mode, all messages are returned to the callback.
461 """
462 r = self.decode_incoming_msg(msg)
463 if r is None:
464 return
465
466 msgname = type(r).__name__
467
Ole Troan4df97162017-07-07 16:06:08 +0200468 if self.event_callback:
469 self.event_callback(msgname, r)
Ole Troan5016f992017-01-19 09:44:44 +0100470
471 def _control_ping(self, context):
472 """Send a ping command."""
473 self._call_vpp_async(self.control_ping_index,
Ole Troan4df97162017-07-07 16:06:08 +0200474 self.control_ping_msgdef,
Ole Troan5016f992017-01-19 09:44:44 +0100475 context=context)
476
Ole Troana7564e82018-06-12 21:06:44 +0200477 def validate_args(self, msg, kwargs):
478 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
479 if d:
480 raise ValueError('Invalid argument {} to {}'
481 .format(list(d), msg.name))
482
483 def _call_vpp(self, i, msg, multipart, **kwargs):
Ole Troan5016f992017-01-19 09:44:44 +0100484 """Given a message, send the message and await a reply.
485
486 msgdef - the message packing definition
487 i - the message type index
488 multipart - True if the message returns multiple
489 messages in return.
490 context - context number - chosen at random if not
491 supplied.
492 The remainder of the kwargs are the arguments to the API call.
493
494 The return value is the message or message array containing
495 the response. It will raise an IOError exception if there was
496 no response within the timeout window.
497 """
498
Ole Troan4df97162017-07-07 16:06:08 +0200499 if 'context' not in kwargs:
Ole Troandfc9b7c2017-03-06 23:51:57 +0100500 context = self.get_context()
501 kwargs['context'] = context
502 else:
503 context = kwargs['context']
504 kwargs['_vl_msg_id'] = i
Ole Troan5016f992017-01-19 09:44:44 +0100505
Ole Troan94495f22018-08-02 11:58:12 +0200506 try:
507 if self.transport.socket_index:
508 kwargs['client_index'] = self.transport.socket_index
509 except AttributeError:
510 pass
Ole Troana7564e82018-06-12 21:06:44 +0200511 self.validate_args(msg, kwargs)
512 b = msg.pack(kwargs)
Ole Troan94495f22018-08-02 11:58:12 +0200513 self.transport.suspend()
514
515 self.transport.write(b)
Ole Troan5016f992017-01-19 09:44:44 +0100516
517 if multipart:
518 # Send a ping after the request - we use its response
519 # to detect that we have seen all results.
520 self._control_ping(context)
521
522 # Block until we get a reply.
Ole Troandfc9b7c2017-03-06 23:51:57 +0100523 rl = []
524 while (True):
Ole Troan94495f22018-08-02 11:58:12 +0200525 msg = self.transport.read()
Ole Troandfc9b7c2017-03-06 23:51:57 +0100526 if not msg:
Ole Troan4df97162017-07-07 16:06:08 +0200527 raise IOError(2, 'VPP API client: read failed')
Ole Troandfc9b7c2017-03-06 23:51:57 +0100528 r = self.decode_incoming_msg(msg)
529 msgname = type(r).__name__
Ole Troan4df97162017-07-07 16:06:08 +0200530 if context not in r or r.context == 0 or context != r.context:
Ole Troan94495f22018-08-02 11:58:12 +0200531 # Message being queued
Ole Troandfc9b7c2017-03-06 23:51:57 +0100532 self.message_queue.put_nowait(r)
533 continue
534
535 if not multipart:
536 rl = r
537 break
538 if msgname == 'control_ping_reply':
539 break
540
541 rl.append(r)
542
Ole Troan94495f22018-08-02 11:58:12 +0200543 self.transport.resume()
Ole Troandfc9b7c2017-03-06 23:51:57 +0100544
545 return rl
Ole Troan5016f992017-01-19 09:44:44 +0100546
Ole Troana7564e82018-06-12 21:06:44 +0200547 def _call_vpp_async(self, i, msg, **kwargs):
Ole Troan5016f992017-01-19 09:44:44 +0100548 """Given a message, send the message and await a reply.
549
550 msgdef - the message packing definition
551 i - the message type index
552 context - context number - chosen at random if not
553 supplied.
554 The remainder of the kwargs are the arguments to the API call.
555 """
Ole Troan4df97162017-07-07 16:06:08 +0200556 if 'context' not in kwargs:
Ole Troan7e3a8752016-12-05 10:27:09 +0100557 context = self.get_context()
558 kwargs['context'] = context
559 else:
560 context = kwargs['context']
Ole Troan94495f22018-08-02 11:58:12 +0200561 try:
562 if self.transport.socket_index:
563 kwargs['client_index'] = self.transport.socket_index
564 except AttributeError:
565 kwargs['client_index'] = 0
Ole Troan7e3a8752016-12-05 10:27:09 +0100566 kwargs['_vl_msg_id'] = i
Ole Troana7564e82018-06-12 21:06:44 +0200567 b = msg.pack(kwargs)
Ole Troan7e3a8752016-12-05 10:27:09 +0100568
Ole Troan94495f22018-08-02 11:58:12 +0200569 self.transport.write(b)
Ole Troan7e3a8752016-12-05 10:27:09 +0100570
Ole Troana03f4ef2016-12-02 12:53:55 +0100571 def register_event_callback(self, callback):
Ole Troan5016f992017-01-19 09:44:44 +0100572 """Register a callback for async messages.
Ole Troana03f4ef2016-12-02 12:53:55 +0100573
Ole Troan5016f992017-01-19 09:44:44 +0100574 This will be called for async notifications in sync mode,
575 and all messages in async mode. In sync mode, replies to
576 requests will not come here.
577
578 callback is a fn(msg_type_name, msg_type) that will be
579 called when a message comes in. While this function is
580 executing, note that (a) you are in a background thread and
581 may wish to use threading.Lock to protect your datastructures,
582 and (b) message processing from VPP will stop (so if you take
583 a long while about it you may provoke reply timeouts or cause
584 VPP to fill the RX buffer). Passing None will disable the
585 callback.
586 """
587 self.event_callback = callback
Ole Troandfc9b7c2017-03-06 23:51:57 +0100588
589 def thread_msg_handler(self):
Ole Troan94495f22018-08-02 11:58:12 +0200590 """Python thread calling the user registered message handler.
Ole Troandfc9b7c2017-03-06 23:51:57 +0100591
592 This is to emulate the old style event callback scheme. Modern
593 clients should provide their own thread to poll the event
594 queue.
595 """
596 while True:
597 r = self.message_queue.get()
Klement Sekera180402d2018-02-17 10:58:37 +0100598 if r == "terminate event thread":
599 break
Ole Troandfc9b7c2017-03-06 23:51:57 +0100600 msgname = type(r).__name__
Ole Troan4df97162017-07-07 16:06:08 +0200601 if self.event_callback:
602 self.event_callback(msgname, r)
Chris Luke52bf22e2017-11-03 23:32:38 -0400603
604
605# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4