blob: 5e98f92cecd54541186bfaca4a6f18e88a9bd446 [file] [log] [blame]
Ole Troana03f4ef2016-12-02 12:53:55 +01001#!/usr/bin/env python
Ole Troan5f9dcff2016-08-01 04:59:13 +02002#
3# Copyright (c) 2016 Cisco and/or its affiliates.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at:
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
Ole Troana03f4ef2016-12-02 12:53:55 +010015#
Ole Troan5f9dcff2016-08-01 04:59:13 +020016
Ole Troan5f9dcff2016-08-01 04:59:13 +020017from __future__ import print_function
Ole Troana7564e82018-06-12 21:06:44 +020018from __future__ import absolute_import
Ole Troan4df97162017-07-07 16:06:08 +020019import sys
20import os
21import logging
22import collections
23import struct
24import json
25import threading
Chris Luke52bf22e2017-11-03 23:32:38 -040026import fnmatch
Klement Sekera180402d2018-02-17 10:58:37 +010027import weakref
Ole Troan4df97162017-07-07 16:06:08 +020028import atexit
Ole Troana7564e82018-06-12 21:06:44 +020029from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
Ole Troan0685da42018-10-16 14:42:50 +020030from . vpp_serializer import VPPMessage, vpp_get_type
Ole Troan4df97162017-07-07 16:06:08 +020031
32if sys.version[0] == '2':
33 import Queue as queue
34else:
35 import queue as queue
36
Ole Troanafddd832018-02-28 14:55:20 +010037
Ole Troan0685da42018-10-16 14:42:50 +020038class VppEnumType(type):
39 def __getattr__(cls, name):
40 t = vpp_get_type(name)
41 return t.enum
42
43
44# Python3
45# class VppEnum(metaclass=VppEnumType):
46# pass
47class VppEnum:
48 __metaclass__ = VppEnumType
49
50
Klement Sekera180402d2018-02-17 10:58:37 +010051def vpp_atexit(vpp_weakref):
Ole Troan5016f992017-01-19 09:44:44 +010052 """Clean up VPP connection on shutdown."""
Klement Sekera180402d2018-02-17 10:58:37 +010053 vpp_instance = vpp_weakref()
Ole Troan94495f22018-08-02 11:58:12 +020054 if vpp_instance and vpp_instance.transport.connected:
Klement Sekera180402d2018-02-17 10:58:37 +010055 vpp_instance.logger.debug('Cleaning up VPP on exit')
56 vpp_instance.disconnect()
Ole Troan5016f992017-01-19 09:44:44 +010057
Ole Troanafddd832018-02-28 14:55:20 +010058
Ole Troan4df97162017-07-07 16:06:08 +020059def vpp_iterator(d):
60 if sys.version[0] == '2':
61 return d.iteritems()
62 else:
63 return d.items()
64
65
Klement Sekera8aedf5e2018-07-06 11:07:21 +020066class VppApiDynamicMethodHolder(object):
Klement Sekera7112c542017-03-01 09:53:19 +010067 pass
68
69
70class FuncWrapper(object):
71 def __init__(self, func):
72 self._func = func
73 self.__name__ = func.__name__
74
75 def __call__(self, **kwargs):
76 return self._func(**kwargs)
77
78
Ole Troana03f4ef2016-12-02 12:53:55 +010079class VPP():
Ole Troan5016f992017-01-19 09:44:44 +010080 """VPP interface.
81
82 This class provides the APIs to VPP. The APIs are loaded
83 from provided .api.json files and makes functions accordingly.
84 These functions are documented in the VPP .api files, as they
85 are dynamically created.
86
87 Additionally, VPP can send callback messages; this class
88 provides a means to register a callback function to receive
89 these messages in a background thread.
90 """
Ole Troana7564e82018-06-12 21:06:44 +020091
92 def process_json_file(self, apidef_file):
93 api = json.load(apidef_file)
94 types = {}
95 for t in api['enums']:
96 t[0] = 'vl_api_' + t[0] + '_t'
97 types[t[0]] = {'type': 'enum', 'data': t}
98 for t in api['unions']:
99 t[0] = 'vl_api_' + t[0] + '_t'
100 types[t[0]] = {'type': 'union', 'data': t}
101 for t in api['types']:
102 t[0] = 'vl_api_' + t[0] + '_t'
103 types[t[0]] = {'type': 'type', 'data': t}
104
105 i = 0
106 while True:
107 unresolved = {}
108 for k, v in types.items():
109 t = v['data']
Ole Troan0685da42018-10-16 14:42:50 +0200110 if not vpp_get_type(t[0]):
111 if v['type'] == 'enum':
112 try:
113 VPPEnumType(t[0], t[1:])
114 except ValueError:
115 unresolved[k] = v
116 elif v['type'] == 'union':
117 try:
118 VPPUnionType(t[0], t[1:])
119 except ValueError:
120 unresolved[k] = v
121 elif v['type'] == 'type':
122 try:
123 VPPType(t[0], t[1:])
124 except ValueError:
125 unresolved[k] = v
Ole Troana7564e82018-06-12 21:06:44 +0200126 if len(unresolved) == 0:
127 break
128 if i > 3:
129 raise ValueError('Unresolved type definitions {}'
130 .format(unresolved))
131 types = unresolved
132 i += 1
133
134 for m in api['messages']:
135 try:
136 self.messages[m[0]] = VPPMessage(m[0], m[1:])
137 except NotImplementedError:
138 self.logger.error('Not implemented error for {}'.format(m[0]))
139
Ole Troan4df97162017-07-07 16:06:08 +0200140 def __init__(self, apifiles=None, testmode=False, async_thread=True,
Ole Troana7564e82018-06-12 21:06:44 +0200141 logger=logging.getLogger('vpp_papi'), loglevel='debug',
Ole Troan94495f22018-08-02 11:58:12 +0200142 read_timeout=5, use_socket=False,
143 server_address='/run/vpp-api.sock'):
Ole Troan5016f992017-01-19 09:44:44 +0100144 """Create a VPP API object.
145
146 apifiles is a list of files containing API
147 descriptions that will be loaded - methods will be
148 dynamically created reflecting these APIs. If not
149 provided this will load the API files from VPP's
150 default install location.
Ian Wellsd0e812f2018-06-06 14:12:27 +0100151
152 logger, if supplied, is the logging logger object to log to.
153 loglevel, if supplied, is the log level this logger is set
154 to report at (from the loglevels in the logging module).
Ole Troan5016f992017-01-19 09:44:44 +0100155 """
Ian Wellsd0e812f2018-06-06 14:12:27 +0100156 if logger is None:
157 logger = logging.getLogger(__name__)
158 if loglevel is not None:
159 logger.setLevel(loglevel)
Ole Troan3cc49712017-03-08 12:02:24 +0100160 self.logger = logger
Ole Troan3cc49712017-03-08 12:02:24 +0100161
Ole Troana03f4ef2016-12-02 12:53:55 +0100162 self.messages = {}
163 self.id_names = []
164 self.id_msgdef = []
Ole Troana7564e82018-06-12 21:06:44 +0200165 self.header = VPPType('header', [['u16', 'msgid'],
166 ['u32', 'client_index']])
Ole Troan5016f992017-01-19 09:44:44 +0100167 self.apifiles = []
Ole Troan3d31f002017-01-26 11:13:00 +0100168 self.event_callback = None
Ole Troan4df97162017-07-07 16:06:08 +0200169 self.message_queue = queue.Queue()
dongjuan84937522017-11-09 14:46:36 +0800170 self.read_timeout = read_timeout
Klement Sekera180402d2018-02-17 10:58:37 +0100171 self.async_thread = async_thread
Ole Troan5f9dcff2016-08-01 04:59:13 +0200172
Ole Troan94495f22018-08-02 11:58:12 +0200173 if use_socket:
174 from . vpp_transport_socket import VppTransport
175 else:
176 from . vpp_transport_shmem import VppTransport
177
Ole Troanf5984bd2016-12-18 13:15:08 +0100178 if not apifiles:
179 # Pick up API definitions from default directory
Chris Luke52bf22e2017-11-03 23:32:38 -0400180 try:
181 apifiles = self.find_api_files()
182 except RuntimeError:
183 # In test mode we don't care that we can't find the API files
184 if testmode:
185 apifiles = []
186 else:
187 raise
Ole Troanf5984bd2016-12-18 13:15:08 +0100188
Ole Troana03f4ef2016-12-02 12:53:55 +0100189 for file in apifiles:
Ole Troana03f4ef2016-12-02 12:53:55 +0100190 with open(file) as apidef_file:
Ole Troana7564e82018-06-12 21:06:44 +0200191 self.process_json_file(apidef_file)
Ole Troan5f9dcff2016-08-01 04:59:13 +0200192
Ole Troan4df97162017-07-07 16:06:08 +0200193 self.apifiles = apifiles
Ole Troan5f9dcff2016-08-01 04:59:13 +0200194
Ole Troana03f4ef2016-12-02 12:53:55 +0100195 # Basic sanity check
Ole Troanf5984bd2016-12-18 13:15:08 +0100196 if len(self.messages) == 0 and not testmode:
197 raise ValueError(1, 'Missing JSON message definitions')
Ole Troan5f9dcff2016-08-01 04:59:13 +0200198
Ole Troan94495f22018-08-02 11:58:12 +0200199 self.transport = VppTransport(self, read_timeout=read_timeout,
200 server_address=server_address)
Ole Troan5016f992017-01-19 09:44:44 +0100201 # Make sure we allow VPP to clean up the message rings.
Klement Sekera180402d2018-02-17 10:58:37 +0100202 atexit.register(vpp_atexit, weakref.ref(self))
Ole Troan5f9dcff2016-08-01 04:59:13 +0200203
Ole Troana03f4ef2016-12-02 12:53:55 +0100204 class ContextId(object):
Ole Troan5016f992017-01-19 09:44:44 +0100205 """Thread-safe provider of unique context IDs."""
Ole Troana03f4ef2016-12-02 12:53:55 +0100206 def __init__(self):
207 self.context = 0
Ole Troan4df97162017-07-07 16:06:08 +0200208 self.lock = threading.Lock()
209
Ole Troana03f4ef2016-12-02 12:53:55 +0100210 def __call__(self):
Ole Troan5016f992017-01-19 09:44:44 +0100211 """Get a new unique (or, at least, not recently used) context."""
Ole Troan4df97162017-07-07 16:06:08 +0200212 with self.lock:
213 self.context += 1
214 return self.context
Ole Troana03f4ef2016-12-02 12:53:55 +0100215 get_context = ContextId()
Ole Troan5f9dcff2016-08-01 04:59:13 +0200216
Ole Troan0685da42018-10-16 14:42:50 +0200217 def get_type(self, name):
218 return vpp_get_type(name)
219
Chris Luke52bf22e2017-11-03 23:32:38 -0400220 @classmethod
221 def find_api_dir(cls):
222 """Attempt to find the best directory in which API definition
223 files may reside. If the value VPP_API_DIR exists in the environment
224 then it is first on the search list. If we're inside a recognized
225 location in a VPP source tree (src/scripts and src/vpp-api/python)
226 then entries from there to the likely locations in build-root are
227 added. Finally the location used by system packages is added.
228
229 :returns: A single directory name, or None if no such directory
230 could be found.
231 """
232 dirs = []
233
234 if 'VPP_API_DIR' in os.environ:
235 dirs.append(os.environ['VPP_API_DIR'])
236
237 # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir;
238 # in which case, plot a course to likely places in the src tree
239 import __main__ as main
240 if hasattr(main, '__file__'):
241 # get the path of the calling script
242 localdir = os.path.dirname(os.path.realpath(main.__file__))
243 else:
244 # use cwd if there is no calling script
Andrey "Zed" Zaikin68e2ffb2018-04-24 14:50:02 +0300245 localdir = os.getcwd()
Chris Luke52bf22e2017-11-03 23:32:38 -0400246 localdir_s = localdir.split(os.path.sep)
247
248 def dmatch(dir):
249 """Match dir against right-hand components of the script dir"""
250 d = dir.split('/') # param 'dir' assumes a / separator
Ole Troanafddd832018-02-28 14:55:20 +0100251 length = len(d)
252 return len(localdir_s) > length and localdir_s[-length:] == d
Chris Luke52bf22e2017-11-03 23:32:38 -0400253
254 def sdir(srcdir, variant):
255 """Build a path from srcdir to the staged API files of
256 'variant' (typically '' or '_debug')"""
257 # Since 'core' and 'plugin' files are staged
258 # in separate directories, we target the parent dir.
259 return os.path.sep.join((
260 srcdir,
261 'build-root',
262 'install-vpp%s-native' % variant,
263 'vpp',
264 'share',
265 'vpp',
266 'api',
267 ))
268
269 srcdir = None
270 if dmatch('src/scripts'):
271 srcdir = os.path.sep.join(localdir_s[:-2])
272 elif dmatch('src/vpp-api/python'):
273 srcdir = os.path.sep.join(localdir_s[:-3])
274 elif dmatch('test'):
275 # we're apparently running tests
276 srcdir = os.path.sep.join(localdir_s[:-1])
277
278 if srcdir:
279 # we're in the source tree, try both the debug and release
280 # variants.
Chris Luke52bf22e2017-11-03 23:32:38 -0400281 dirs.append(sdir(srcdir, '_debug'))
282 dirs.append(sdir(srcdir, ''))
283
284 # Test for staged copies of the scripts
285 # For these, since we explicitly know if we're running a debug versus
286 # release variant, target only the relevant directory
287 if dmatch('build-root/install-vpp_debug-native/vpp/bin'):
288 srcdir = os.path.sep.join(localdir_s[:-4])
289 dirs.append(sdir(srcdir, '_debug'))
290 if dmatch('build-root/install-vpp-native/vpp/bin'):
291 srcdir = os.path.sep.join(localdir_s[:-4])
292 dirs.append(sdir(srcdir, ''))
293
294 # finally, try the location system packages typically install into
295 dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api')))
296
297 # check the directories for existance; first one wins
298 for dir in dirs:
299 if os.path.isdir(dir):
300 return dir
301
302 return None
303
304 @classmethod
305 def find_api_files(cls, api_dir=None, patterns='*'):
306 """Find API definition files from the given directory tree with the
307 given pattern. If no directory is given then find_api_dir() is used
308 to locate one. If no pattern is given then all definition files found
309 in the directory tree are used.
310
311 :param api_dir: A directory tree in which to locate API definition
312 files; subdirectories are descended into.
313 If this is None then find_api_dir() is called to discover it.
314 :param patterns: A list of patterns to use in each visited directory
315 when looking for files.
316 This can be a list/tuple object or a comma-separated string of
317 patterns. Each value in the list will have leading/trialing
318 whitespace stripped.
319 The pattern specifies the first part of the filename, '.api.json'
320 is appended.
321 The results are de-duplicated, thus overlapping patterns are fine.
322 If this is None it defaults to '*' meaning "all API files".
323 :returns: A list of file paths for the API files found.
324 """
325 if api_dir is None:
326 api_dir = cls.find_api_dir()
327 if api_dir is None:
328 raise RuntimeError("api_dir cannot be located")
329
330 if isinstance(patterns, list) or isinstance(patterns, tuple):
331 patterns = [p.strip() + '.api.json' for p in patterns]
332 else:
333 patterns = [p.strip() + '.api.json' for p in patterns.split(",")]
334
335 api_files = []
336 for root, dirnames, files in os.walk(api_dir):
337 # iterate all given patterns and de-dup the result
338 files = set(sum([fnmatch.filter(files, p) for p in patterns], []))
339 for filename in files:
340 api_files.append(os.path.join(root, filename))
341
342 return api_files
343
Klement Sekera7112c542017-03-01 09:53:19 +0100344 @property
345 def api(self):
346 if not hasattr(self, "_api"):
347 raise Exception("Not connected, api definitions not available")
348 return self._api
349
Ole Troaneabd6072018-08-09 12:50:55 +0200350 def make_function(self, msg, i, multipart, do_async):
351 if (do_async):
Ole Troana7564e82018-06-12 21:06:44 +0200352 def f(**kwargs):
353 return self._call_vpp_async(i, msg, **kwargs)
354 else:
355 def f(**kwargs):
356 return self._call_vpp(i, msg, multipart, **kwargs)
357
358 f.__name__ = str(msg.name)
359 f.__doc__ = ", ".join(["%s %s" %
360 (msg.fieldtypes[j], k)
361 for j, k in enumerate(msg.fields)])
362 return f
363
Ole Troaneabd6072018-08-09 12:50:55 +0200364 def _register_functions(self, do_async=False):
Ole Troana03f4ef2016-12-02 12:53:55 +0100365 self.id_names = [None] * (self.vpp_dictionary_maxid + 1)
366 self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1)
Klement Sekera8aedf5e2018-07-06 11:07:21 +0200367 self._api = VppApiDynamicMethodHolder()
Ole Troana7564e82018-06-12 21:06:44 +0200368 for name, msg in vpp_iterator(self.messages):
369 n = name + '_' + msg.crc[2:]
Ole Troan94495f22018-08-02 11:58:12 +0200370 i = self.transport.get_msg_index(n.encode())
Ole Troan3cc49712017-03-08 12:02:24 +0100371 if i > 0:
Ole Troana7564e82018-06-12 21:06:44 +0200372 self.id_msgdef[i] = msg
Ole Troana03f4ef2016-12-02 12:53:55 +0100373 self.id_names[i] = name
Ole Troana7564e82018-06-12 21:06:44 +0200374 # TODO: Fix multipart (use services)
Ole Troana03f4ef2016-12-02 12:53:55 +0100375 multipart = True if name.find('_dump') > 0 else False
Ole Troaneabd6072018-08-09 12:50:55 +0200376 f = self.make_function(msg, i, multipart, do_async)
Klement Sekera7112c542017-03-01 09:53:19 +0100377 setattr(self._api, name, FuncWrapper(f))
Ole Troan3cc49712017-03-08 12:02:24 +0100378 else:
Ole Troan4df97162017-07-07 16:06:08 +0200379 self.logger.debug(
380 'No such message type or failed CRC checksum: %s', n)
Ole Troana03f4ef2016-12-02 12:53:55 +0100381
Ole Troan4df97162017-07-07 16:06:08 +0200382 def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200383 do_async):
Ole Troan94495f22018-08-02 11:58:12 +0200384 pfx = chroot_prefix.encode() if chroot_prefix else None
385
386 rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
Ole Troana03f4ef2016-12-02 12:53:55 +0100387 if rv != 0:
388 raise IOError(2, 'Connect failed')
Ole Troan94495f22018-08-02 11:58:12 +0200389 self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
Ole Troaneabd6072018-08-09 12:50:55 +0200390 self._register_functions(do_async=do_async)
Ole Troana03f4ef2016-12-02 12:53:55 +0100391
392 # Initialise control ping
Ole Troana7564e82018-06-12 21:06:44 +0200393 crc = self.messages['control_ping'].crc
Ole Troan94495f22018-08-02 11:58:12 +0200394 self.control_ping_index = self.transport.get_msg_index(
Ole Troan4df97162017-07-07 16:06:08 +0200395 ('control_ping' + '_' + crc[2:]).encode())
Ole Troana03f4ef2016-12-02 12:53:55 +0100396 self.control_ping_msgdef = self.messages['control_ping']
Klement Sekera180402d2018-02-17 10:58:37 +0100397 if self.async_thread:
398 self.event_thread = threading.Thread(
399 target=self.thread_msg_handler)
400 self.event_thread.daemon = True
401 self.event_thread.start()
Ole Troan4df97162017-07-07 16:06:08 +0200402 return rv
Ole Troana03f4ef2016-12-02 12:53:55 +0100403
Ole Troaneabd6072018-08-09 12:50:55 +0200404 def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
Ole Troandfc9b7c2017-03-06 23:51:57 +0100405 """Attach to VPP.
406
407 name - the name of the client.
408 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
Ole Troaneabd6072018-08-09 12:50:55 +0200409 do_async - if true, messages are sent without waiting for a reply
Ole Troandfc9b7c2017-03-06 23:51:57 +0100410 rx_qlen - the length of the VPP message receive queue between
411 client and server.
412 """
Ole Troan94495f22018-08-02 11:58:12 +0200413 msg_handler = self.transport.get_callback(do_async)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100414 return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200415 do_async)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100416
Ole Troan6bf177c2017-08-17 10:34:32 +0200417 def connect_sync(self, name, chroot_prefix=None, rx_qlen=32):
Ole Troandfc9b7c2017-03-06 23:51:57 +0100418 """Attach to VPP in synchronous mode. Application must poll for events.
419
420 name - the name of the client.
421 chroot_prefix - if VPP is chroot'ed, the prefix of the jail
422 rx_qlen - the length of the VPP message receive queue between
423 client and server.
424 """
425
Ole Troan94495f22018-08-02 11:58:12 +0200426 return self.connect_internal(name, None, chroot_prefix, rx_qlen,
Ole Troaneabd6072018-08-09 12:50:55 +0200427 do_async=False)
Ole Troandfc9b7c2017-03-06 23:51:57 +0100428
Ole Troana03f4ef2016-12-02 12:53:55 +0100429 def disconnect(self):
Ole Troan5016f992017-01-19 09:44:44 +0100430 """Detach from VPP."""
Ole Troan94495f22018-08-02 11:58:12 +0200431 rv = self.transport.disconnect()
Klement Sekera180402d2018-02-17 10:58:37 +0100432 self.message_queue.put("terminate event thread")
Ole Troana03f4ef2016-12-02 12:53:55 +0100433 return rv
434
Ole Troan5016f992017-01-19 09:44:44 +0100435 def msg_handler_sync(self, msg):
436 """Process an incoming message from VPP in sync mode.
437
438 The message may be a reply or it may be an async notification.
439 """
440 r = self.decode_incoming_msg(msg)
441 if r is None:
Ole Troana03f4ef2016-12-02 12:53:55 +0100442 return
443
Ole Troan5016f992017-01-19 09:44:44 +0100444 # If we have a context, then use the context to find any
445 # request waiting for a reply
446 context = 0
447 if hasattr(r, 'context') and r.context > 0:
448 context = r.context
Ole Troan5f9dcff2016-08-01 04:59:13 +0200449
Ole Troan5016f992017-01-19 09:44:44 +0100450 if context == 0:
451 # No context -> async notification that we feed to the callback
Ole Troandfc9b7c2017-03-06 23:51:57 +0100452 self.message_queue.put_nowait(r)
Ole Troana03f4ef2016-12-02 12:53:55 +0100453 else:
Ole Troandfc9b7c2017-03-06 23:51:57 +0100454 raise IOError(2, 'RPC reply message received in event handler')
Ole Troan5016f992017-01-19 09:44:44 +0100455
456 def decode_incoming_msg(self, msg):
457 if not msg:
Ole Troan3cc49712017-03-08 12:02:24 +0100458 self.logger.warning('vpp_api.read failed')
Ole Troan5016f992017-01-19 09:44:44 +0100459 return
Ole Troanc84cbad2018-09-06 22:58:05 +0200460 (i, ci), size = self.header.unpack(msg, 0)
Ole Troan5016f992017-01-19 09:44:44 +0100461 if self.id_names[i] == 'rx_thread_exit':
462 return
463
464 #
465 # Decode message and returns a tuple.
466 #
Ole Troana7564e82018-06-12 21:06:44 +0200467 msgobj = self.id_msgdef[i]
468 if not msgobj:
Ole Troan5016f992017-01-19 09:44:44 +0100469 raise IOError(2, 'Reply message undefined')
470
Ole Troanc84cbad2018-09-06 22:58:05 +0200471 r, size = msgobj.unpack(msg)
Ole Troana03f4ef2016-12-02 12:53:55 +0100472 return r
473
Ole Troan5016f992017-01-19 09:44:44 +0100474 def msg_handler_async(self, msg):
475 """Process a message from VPP in async mode.
476
477 In async mode, all messages are returned to the callback.
478 """
479 r = self.decode_incoming_msg(msg)
480 if r is None:
481 return
482
483 msgname = type(r).__name__
484
Ole Troan4df97162017-07-07 16:06:08 +0200485 if self.event_callback:
486 self.event_callback(msgname, r)
Ole Troan5016f992017-01-19 09:44:44 +0100487
488 def _control_ping(self, context):
489 """Send a ping command."""
490 self._call_vpp_async(self.control_ping_index,
Ole Troan4df97162017-07-07 16:06:08 +0200491 self.control_ping_msgdef,
Ole Troan5016f992017-01-19 09:44:44 +0100492 context=context)
493
Ole Troana7564e82018-06-12 21:06:44 +0200494 def validate_args(self, msg, kwargs):
495 d = set(kwargs.keys()) - set(msg.field_by_name.keys())
496 if d:
497 raise ValueError('Invalid argument {} to {}'
498 .format(list(d), msg.name))
499
500 def _call_vpp(self, i, msg, multipart, **kwargs):
Ole Troan5016f992017-01-19 09:44:44 +0100501 """Given a message, send the message and await a reply.
502
503 msgdef - the message packing definition
504 i - the message type index
505 multipart - True if the message returns multiple
506 messages in return.
507 context - context number - chosen at random if not
508 supplied.
509 The remainder of the kwargs are the arguments to the API call.
510
511 The return value is the message or message array containing
512 the response. It will raise an IOError exception if there was
513 no response within the timeout window.
514 """
515
Ole Troan4df97162017-07-07 16:06:08 +0200516 if 'context' not in kwargs:
Ole Troandfc9b7c2017-03-06 23:51:57 +0100517 context = self.get_context()
518 kwargs['context'] = context
519 else:
520 context = kwargs['context']
521 kwargs['_vl_msg_id'] = i
Ole Troan5016f992017-01-19 09:44:44 +0100522
Ole Troan94495f22018-08-02 11:58:12 +0200523 try:
524 if self.transport.socket_index:
525 kwargs['client_index'] = self.transport.socket_index
526 except AttributeError:
527 pass
Ole Troana7564e82018-06-12 21:06:44 +0200528 self.validate_args(msg, kwargs)
529 b = msg.pack(kwargs)
Ole Troan94495f22018-08-02 11:58:12 +0200530 self.transport.suspend()
531
532 self.transport.write(b)
Ole Troan5016f992017-01-19 09:44:44 +0100533
534 if multipart:
535 # Send a ping after the request - we use its response
536 # to detect that we have seen all results.
537 self._control_ping(context)
538
539 # Block until we get a reply.
Ole Troandfc9b7c2017-03-06 23:51:57 +0100540 rl = []
541 while (True):
Ole Troan94495f22018-08-02 11:58:12 +0200542 msg = self.transport.read()
Ole Troandfc9b7c2017-03-06 23:51:57 +0100543 if not msg:
Ole Troan4df97162017-07-07 16:06:08 +0200544 raise IOError(2, 'VPP API client: read failed')
Ole Troandfc9b7c2017-03-06 23:51:57 +0100545 r = self.decode_incoming_msg(msg)
546 msgname = type(r).__name__
Ole Troan4df97162017-07-07 16:06:08 +0200547 if context not in r or r.context == 0 or context != r.context:
Ole Troan94495f22018-08-02 11:58:12 +0200548 # Message being queued
Ole Troandfc9b7c2017-03-06 23:51:57 +0100549 self.message_queue.put_nowait(r)
550 continue
551
552 if not multipart:
553 rl = r
554 break
555 if msgname == 'control_ping_reply':
556 break
557
558 rl.append(r)
559
Ole Troan94495f22018-08-02 11:58:12 +0200560 self.transport.resume()
Ole Troandfc9b7c2017-03-06 23:51:57 +0100561
562 return rl
Ole Troan5016f992017-01-19 09:44:44 +0100563
Ole Troana7564e82018-06-12 21:06:44 +0200564 def _call_vpp_async(self, i, msg, **kwargs):
Ole Troan5016f992017-01-19 09:44:44 +0100565 """Given a message, send the message and await a reply.
566
567 msgdef - the message packing definition
568 i - the message type index
569 context - context number - chosen at random if not
570 supplied.
571 The remainder of the kwargs are the arguments to the API call.
572 """
Ole Troan4df97162017-07-07 16:06:08 +0200573 if 'context' not in kwargs:
Ole Troan7e3a8752016-12-05 10:27:09 +0100574 context = self.get_context()
575 kwargs['context'] = context
576 else:
577 context = kwargs['context']
Ole Troan94495f22018-08-02 11:58:12 +0200578 try:
579 if self.transport.socket_index:
580 kwargs['client_index'] = self.transport.socket_index
581 except AttributeError:
582 kwargs['client_index'] = 0
Ole Troan7e3a8752016-12-05 10:27:09 +0100583 kwargs['_vl_msg_id'] = i
Ole Troana7564e82018-06-12 21:06:44 +0200584 b = msg.pack(kwargs)
Ole Troan7e3a8752016-12-05 10:27:09 +0100585
Ole Troan94495f22018-08-02 11:58:12 +0200586 self.transport.write(b)
Ole Troan7e3a8752016-12-05 10:27:09 +0100587
Ole Troana03f4ef2016-12-02 12:53:55 +0100588 def register_event_callback(self, callback):
Ole Troan5016f992017-01-19 09:44:44 +0100589 """Register a callback for async messages.
Ole Troana03f4ef2016-12-02 12:53:55 +0100590
Ole Troan5016f992017-01-19 09:44:44 +0100591 This will be called for async notifications in sync mode,
592 and all messages in async mode. In sync mode, replies to
593 requests will not come here.
594
595 callback is a fn(msg_type_name, msg_type) that will be
596 called when a message comes in. While this function is
597 executing, note that (a) you are in a background thread and
598 may wish to use threading.Lock to protect your datastructures,
599 and (b) message processing from VPP will stop (so if you take
600 a long while about it you may provoke reply timeouts or cause
601 VPP to fill the RX buffer). Passing None will disable the
602 callback.
603 """
604 self.event_callback = callback
Ole Troandfc9b7c2017-03-06 23:51:57 +0100605
606 def thread_msg_handler(self):
Ole Troan94495f22018-08-02 11:58:12 +0200607 """Python thread calling the user registered message handler.
Ole Troandfc9b7c2017-03-06 23:51:57 +0100608
609 This is to emulate the old style event callback scheme. Modern
610 clients should provide their own thread to poll the event
611 queue.
612 """
613 while True:
614 r = self.message_queue.get()
Klement Sekera180402d2018-02-17 10:58:37 +0100615 if r == "terminate event thread":
616 break
Ole Troandfc9b7c2017-03-06 23:51:57 +0100617 msgname = type(r).__name__
Ole Troan4df97162017-07-07 16:06:08 +0200618 if self.event_callback:
619 self.event_callback(msgname, r)
Chris Luke52bf22e2017-11-03 23:32:38 -0400620
621
622# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4