PAPI: Allow ipaddress object as argument and return values from API calls
The API calls that use any of vl_api_address_t, vl_api_ip4_address,
vl_api_ip6_address_t, vl_api_prefix_t, vl_api_ip4_prefix_t,
vl_api_ip6_prefix_t now accepts either the old style dictionary,
a text string (2001:db8::/32) or an ipaddress ojbect.
Unless it is called with '_no_type_conversion':True, it will
also return an appropriate ipaddress object.
Change-Id: I84e4a1577bd57f6b5ae725f316a523988b6a955b
Signed-off-by: Ole Troan <ot@cisco.com>
diff --git a/src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py b/src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py
index ba3190c..ca3afc4 100755
--- a/src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py
+++ b/src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py
@@ -4,10 +4,10 @@
from vpp_papi.vpp_serializer import VPPType, VPPEnumType
from vpp_papi.vpp_serializer import VPPUnionType, VPPMessage
from vpp_papi.vpp_serializer import VPPTypeAlias
-from vpp_papi.vpp_format import VPPFormat
from socket import inet_pton, AF_INET, AF_INET6
import logging
import sys
+from ipaddress import *
class TestAddType(unittest.TestCase):
@@ -27,8 +27,10 @@
af = VPPEnumType('vl_api_address_family_t', [["ADDRESS_IP4", 0],
["ADDRESS_IP6", 1],
{"enumtype": "u32"}])
- ip4 = VPPType('vl_api_ip4_address_t', [['u8', 'address', 4]])
- ip6 = VPPType('vl_api_ip6_address_t', [['u8', 'address', 16]])
+ ip4 = VPPTypeAlias('vl_api_ip4_address_t', {'type': 'u8',
+ 'length': 4})
+ ip6 = VPPTypeAlias('vl_api_ip6_address_t', {'type': 'u8',
+ 'length': 16})
VPPUnionType('vl_api_address_union_t',
[["vl_api_ip4_address_t", "ip4"],
["vl_api_ip6_address_t", "ip6"]])
@@ -47,41 +49,34 @@
'vla_address'],
['u8', 'is_cool']])
- b = ip4.pack({'address': inet_pton(AF_INET, '1.1.1.1')})
+ b = ip4.pack(inet_pton(AF_INET, '1.1.1.1'))
self.assertEqual(len(b), 4)
nt, size = ip4.unpack(b)
- self.assertEqual(nt.address, inet_pton(AF_INET, '1.1.1.1'))
+ self.assertEqual(str(nt), '1.1.1.1')
- b = ip6.pack({'address': inet_pton(AF_INET6, '1::1')})
+ b = ip6.pack(inet_pton(AF_INET6, '1::1'))
self.assertEqual(len(b), 16)
b = address.pack({'af': af.ADDRESS_IP4,
'un':
- {'ip4':
- {'address': inet_pton(AF_INET, '2.2.2.2')}}})
+ {'ip4': inet_pton(AF_INET, '2.2.2.2')}})
self.assertEqual(len(b), 20)
nt, size = address.unpack(b)
- self.assertEqual(nt.af, af.ADDRESS_IP4)
- self.assertEqual(nt.un.ip4.address,
- inet_pton(AF_INET, '2.2.2.2'))
- self.assertEqual(nt.un.ip6.address,
- inet_pton(AF_INET6, '0202:0202::'))
+ self.assertEqual(str(nt), '2.2.2.2')
# List of addresses
address_list = []
for i in range(4):
address_list.append({'af': af.ADDRESS_IP4,
'un':
- {'ip4':
- {'address': inet_pton(AF_INET, '2.2.2.2')}}})
+ {'ip4': inet_pton(AF_INET, '2.2.2.2')}})
b = va_address_list.pack({'count': len(address_list),
'addresses': address_list})
self.assertEqual(len(b), 81)
nt, size = va_address_list.unpack(b)
- self.assertEqual(nt.addresses[0].un.ip4.address,
- inet_pton(AF_INET, '2.2.2.2'))
+ self.assertEqual(str(nt.addresses[0]), '2.2.2.2')
b = message_with_va_address_list.pack({'vla_address':
{'count': len(address_list),
@@ -97,6 +92,12 @@
{"enumtype": "u32"}])
ip4 = VPPTypeAlias('vl_api_ip4_address_t', {'type': 'u8',
'length': 4})
+ b = ip4.pack('1.1.1.1')
+ self.assertEqual(len(b), 4)
+ nt, size = ip4.unpack(b)
+
+ self.assertEqual(str(nt), '1.1.1.1')
+
ip6 = VPPTypeAlias('vl_api_ip6_address_t', {'type': 'u8',
'length': 16})
VPPUnionType('vl_api_address_union_t',
@@ -108,44 +109,65 @@
['vl_api_address_union_t', 'un']])
prefix = VPPType('vl_api_prefix_t',
- [['vl_api_address_t', 'address'],
- ['u8', 'address_length']])
+ [['vl_api_address_t', 'address'],
+ ['u8', 'address_length']])
message = VPPMessage('svs',
- [['vl_api_prefix_t', 'prefix']])
+ [['vl_api_prefix_t', 'prefix']])
message_addr = VPPMessage('svs_address',
[['vl_api_address_t', 'address']])
b = message_addr.pack({'address': "1::1"})
self.assertEqual(len(b), 20)
nt, size = message_addr.unpack(b)
- self.assertEqual("1::1", VPPFormat.unformat(nt.address))
+ self.assertEqual("1::1", str(nt.address))
b = message_addr.pack({'address': "1.1.1.1"})
self.assertEqual(len(b), 20)
nt, size = message_addr.unpack(b)
- self.assertEqual("1.1.1.1", VPPFormat.unformat(nt.address))
+ self.assertEqual("1.1.1.1", str(nt.address))
- b = message.pack({'prefix': "1.1.1.1/24"})
+ b = message.pack({'prefix': "1.1.1.0/24"})
self.assertEqual(len(b), 21)
nt, size = message.unpack(b)
- self.assertEqual("1.1.1.1/24", VPPFormat.unformat(nt.prefix))
+ self.assertEqual("1.1.1.0/24", str(nt.prefix))
message_array = VPPMessage('address_array',
- [['vl_api_ip4_address_t',
+ [['vl_api_ip6_address_t',
'addresses', 2]])
- b = message_array.pack({'addresses': ["1::1", "2::2"]})
- self.assertEqual(len(b), 8)
-
+ b = message_array.pack({'addresses': [IPv6Address(u"1::1"), "2::2"]})
+ self.assertEqual(len(b), 32)
message_array_vla = VPPMessage('address_array_vla',
[['u32', 'num'],
- ['vl_api_ip4_address_t',
+ ['vl_api_ip6_address_t',
'addresses', 0, 'num']])
b = message_array_vla.pack({'addresses': ["1::1", "2::2"], 'num': 2})
- self.assertEqual(len(b), 12)
+ self.assertEqual(len(b), 36)
+
+ message_array4 = VPPMessage('address_array4',
+ [['vl_api_ip4_address_t',
+ 'addresses', 2]])
+ b = message_array4.pack({'addresses': ["1.1.1.1", "2.2.2.2"]})
+ self.assertEqual(len(b), 8)
+ b = message_array4.pack({'addresses': [IPv4Address(u"1.1.1.1"),
+ "2.2.2.2"]})
+ self.assertEqual(len(b), 8)
+
+ message = VPPMessage('address', [['vl_api_address_t', 'address']])
+ b = message.pack({'address': '1::1'})
+ self.assertEqual(len(b), 20)
+ b = message.pack({'address': '1.1.1.1'})
+ self.assertEqual(len(b), 20)
+ message = VPPMessage('prefix', [['vl_api_prefix_t', 'prefix']])
+ b = message.pack({'prefix': '1::1/130'})
+ self.assertEqual(len(b), 21)
+ b = message.pack({'prefix': IPv6Network(u'1::/119')})
+ self.assertEqual(len(b), 21)
+ b = message.pack({'prefix': IPv4Network(u'1.1.0.0/16')})
+ self.assertEqual(len(b), 21)
def test_zero_vla(self):
'''Default zero'ed out for VLAs'''
list = VPPType('vl_api_list_t',
- [['u8', 'count', 10]])
+ [['u8', 'count', 10]])
# Define an embedded VLA type
valist = VPPType('vl_api_valist_t',
@@ -153,12 +175,12 @@
['u8', 'string', 0, 'count']])
# Define a message
vamessage = VPPMessage('vamsg',
- [['vl_api_valist_t', 'valist'],
- ['u8', 'is_something']])
+ [['vl_api_valist_t', 'valist'],
+ ['u8', 'is_something']])
message = VPPMessage('msg',
- [['vl_api_list_t', 'list'],
- ['u8', 'is_something']])
+ [['vl_api_list_t', 'list'],
+ ['u8', 'is_something']])
# Pack message without VLA specified
b = message.pack({'is_something': 1})
diff --git a/src/vpp-api/python/vpp_papi/vpp_format.py b/src/vpp-api/python/vpp_papi/vpp_format.py
index 908606a..1b880ec 100644
--- a/src/vpp-api/python/vpp_papi/vpp_format.py
+++ b/src/vpp-api/python/vpp_papi/vpp_format.py
@@ -15,137 +15,107 @@
from socket import inet_pton, inet_ntop, AF_INET6, AF_INET
import socket
+import ipaddress
+
+# Copies from vl_api_address_t definition
+ADDRESS_IP4 = 0
+ADDRESS_IP6 = 1
+
+#
+# Type conversion for input arguments and return values
+#
-class VPPFormatError(Exception):
- pass
+def format_vl_api_address_t(args):
+ try:
+ return {'un': {'ip6': inet_pton(AF_INET6, args)},
+ 'af': ADDRESS_IP6}
+ except socket.error as e:
+ return {'un': {'ip4': inet_pton(AF_INET, args)},
+ 'af': ADDRESS_IP4}
-class VPPFormat(object):
- VPPFormatError = VPPFormatError
+def format_vl_api_prefix_t(args):
+ p, length = args.split('/')
+ return {'address': format_vl_api_address_t(p),
+ 'address_length': int(length)}
- @staticmethod
- def format_vl_api_ip6_prefix_t(args):
- prefix, len = args.split('/')
- return {'prefix': {'address': inet_pton(AF_INET6, prefix)},
- 'len': int(len)}
- @staticmethod
- def unformat_vl_api_ip6_prefix_t(args):
- return "{}/{}".format(inet_ntop(AF_INET6, args.prefix),
- args.len)
+def format_vl_api_ip6_prefix_t(args):
+ p, length = args.split('/')
+ return {'prefix': inet_pton(AF_INET6, p),
+ 'len': int(length)}
- @staticmethod
- def format_vl_api_ip4_prefix_t(args):
- prefix, len = args.split('/')
- return {'prefix': {'address': inet_pton(AF_INET, prefix)},
- 'len': int(len)}
- @staticmethod
- def unformat_vl_api_ip4_prefix_t(args):
- return "{}/{}".format(inet_ntop(AF_INET, args.prefix),
- args.len)
+def format_vl_api_ip4_prefix_t(args):
+ p, length = args.split('/')
+ return {'prefix': inet_pton(AF_INET, p),
+ 'len': int(length)}
- @staticmethod
- def format_vl_api_ip6_address_t(args):
- return {'address': inet_pton(AF_INET6, args)}
- @staticmethod
- def format_vl_api_ip4_address_t(args):
- return {'address': inet_pton(AF_INET, args)}
+conversion_table = {
+ 'vl_api_ip6_address_t':
+ {
+ 'IPv6Address': lambda o: o.packed,
+ 'str': lambda s: inet_pton(AF_INET6, s)
+ },
+ 'vl_api_ip4_address_t':
+ {
+ 'IPv4Address': lambda o: o.packed,
+ 'str': lambda s: inet_pton(AF_INET, s)
+ },
+ 'vl_api_ip6_prefix_t':
+ {
+ 'IPv6Network': lambda o: {'prefix': o.network_address.packed,
+ 'len': o.prefixlen},
+ 'str': lambda s: format_vl_api_ip6_prefix_t(s)
+ },
+ 'vl_api_ip4_prefix_t':
+ {
+ 'IPv4Network': lambda o: {'prefix': o.network_address.packed,
+ 'len': o.prefixlen},
+ 'str': lambda s: format_vl_api_ip4_prefix_t(s)
+ },
+ 'vl_api_address_t':
+ {
+ 'IPv4Address': lambda o: {'af': ADDRESS_IP4, 'un': {'ip4': o.packed}},
+ 'IPv6Address': lambda o: {'af': ADDRESS_IP6, 'un': {'ip6': o.packed}},
+ 'str': lambda s: format_vl_api_address_t(s)
+ },
+ 'vl_api_prefix_t':
+ {
+ 'IPv4Network': lambda o: {'prefix':
+ {'af': ADDRESS_IP4, 'un':
+ {'ip4': o.network_address.packed}},
+ 'len': o.prefixlen},
+ 'IPv6Network': lambda o: {'prefix':
+ {'af': ADDRESS_IP6, 'un':
+ {'ip6': o.network_address.packed}},
+ 'len': o.prefixlen},
+ 'str': lambda s: format_vl_api_prefix_t(s)
+ },
+}
- @staticmethod
- def format_vl_api_address_t(args):
- try:
- return {'un': {'ip6': inet_pton(AF_INET6, args)},
- 'af': int(1)}
- except socket.error as e:
- return {'un': {'ip4': inet_pton(AF_INET, args)},
- 'af': int(0)}
- @staticmethod
- def unformat_vl_api_address_t(arg):
- if arg.af == 1:
- return inet_ntop(AF_INET6, arg.un.ip6)
- if arg.af == 0:
- return inet_ntop(AF_INET, arg.un.ip4)
- raise VPPFormatError
+def unformat_api_address_t(o):
+ if o.af == 1:
+ return ipaddress.IPv6Address(o.un.ip6)
+ if o.af == 0:
+ return ipaddress.IPv4Address(o.un.ip4)
- @staticmethod
- def format_vl_api_prefix_t(args):
- prefix, len = args.split('/')
- return {'address': VPPFormat.format_vl_api_address_t(prefix),
- 'address_length': int(len)}
- @staticmethod
- def unformat_vl_api_prefix_t(arg):
- if arg.address.af == 1:
- return "{}/{}".format(inet_ntop(AF_INET6,
- arg.address.un.ip6),
- arg.address_length)
- if arg.address.af == 0:
- return "{}/{}".format(inet_ntop(AF_INET,
- arg.address.un.ip4),
- arg.address_length)
- raise VPPFormatError
+def unformat_api_prefix_t(o):
+ if isinstance(o.address, ipaddress.IPv4Address):
+ return ipaddress.IPv4Network((o.address, o.address_length), False)
+ if isinstance(o.address, ipaddress.IPv6Address):
+ return ipaddress.IPv6Network((o.address, o.address_length), False)
- @staticmethod
- def format_u8(args):
- try:
- return int(args)
- except Exception as e:
- return args.encode()
- @staticmethod
- def format(typename, args):
- try:
- return getattr(VPPFormat, 'format_' + typename)(args)
- except AttributeError:
- # Default
- return (int(args))
-
- @staticmethod
- def unformat_bytes(args):
- try:
- return args.decode('utf-8')
- except ValueError as e:
- return args
-
- @staticmethod
- def unformat_list(args):
- s = '['
- for f in args:
- t = type(f).__name__
- if type(f) is int:
- s2 = str(f)
- else:
- s2 = VPPFormat.unformat_t(t, f)
- s += '{} '.format(s2)
- return s[:-1] + ']'
-
- @staticmethod
- def unformat(args):
- s = ''
- return VPPFormat.unformat_t(type(args).__name__, args)
- '''
- for i, f in enumerate(args):
- print('F', f)
- t = type(f).__name__
- if type(f) is int:
- s2 = str(f)
- else:
- s2 = VPPFormat.unformat_t(t, f)
- s += '{} {} '.format(args._fields[i], s2)
- return s[:-1]
- '''
-
- @staticmethod
- def unformat_t(typename, args):
- try:
- return getattr(VPPFormat, 'unformat_' + typename)(args)
- except AttributeError:
- # Type without explicit override
- return VPPFormat.unformat(args)
-
- # Default handling
- return args
+conversion_unpacker_table = {
+ 'vl_api_ip6_address_t': lambda o: ipaddress.IPv6Address(o),
+ 'vl_api_ip6_prefix_t': lambda o: ipaddress.IPv6Network((o.prefix, o.len)),
+ 'vl_api_ip4_address_t': lambda o: ipaddress.IPv4Address(o),
+ 'vl_api_ip4_prefix_t': lambda o: ipaddress.IPv4Network((o.prefix, o.len)),
+ 'vl_api_address_t': lambda o: unformat_api_address_t(o),
+ 'vl_api_prefix_t': lambda o: unformat_api_prefix_t(o),
+}
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index c37334c..32abe14 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -28,7 +28,6 @@
import atexit
from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
-from . vpp_format import VPPFormat
if sys.version[0] == '2':
import Queue as queue
@@ -56,11 +55,11 @@
vpp_instance.logger.debug('Cleaning up VPP on exit')
vpp_instance.disconnect()
-
-def vpp_iterator(d):
- if sys.version[0] == '2':
+if sys.version[0] == '2':
+ def vpp_iterator(d):
return d.iteritems()
- else:
+else:
+ def vpp_iterator(d):
return d.items()
@@ -409,7 +408,8 @@
# Create function for client side messages.
if name in self.services:
- if 'stream' in self.services[name] and self.services[name]['stream']:
+ if 'stream' in self.services[name] and \
+ self.services[name]['stream']:
multipart = True
else:
multipart = False
@@ -493,7 +493,7 @@
else:
raise VPPIOError(2, 'RPC reply message received in event handler')
- def decode_incoming_msg(self, msg):
+ def decode_incoming_msg(self, msg, no_type_conversion=False):
if not msg:
self.logger.warning('vpp_api.read failed')
return
@@ -508,7 +508,7 @@
if not msgobj:
raise VPPIOError(2, 'Reply message undefined')
- r, size = msgobj.unpack(msg)
+ r, size = msgobj.unpack(msg, ntc=no_type_conversion)
return r
def msg_handler_async(self, msg):
@@ -535,7 +535,7 @@
d = set(kwargs.keys()) - set(msg.field_by_name.keys())
if d:
raise VPPValueError('Invalid argument {} to {}'
- .format(list(d), msg.name))
+ .format(list(d), msg.name))
def _call_vpp(self, i, msg, multipart, **kwargs):
"""Given a message, send the message and await a reply.
@@ -560,6 +560,8 @@
context = kwargs['context']
kwargs['_vl_msg_id'] = i
+ no_type_conversion = kwargs.pop('_no_type_conversion', False)
+
try:
if self.transport.socket_index:
kwargs['client_index'] = self.transport.socket_index
@@ -582,7 +584,7 @@
msg = self.transport.read()
if not msg:
raise VPPIOError(2, 'VPP API client: read failed')
- r = self.decode_incoming_msg(msg)
+ r = self.decode_incoming_msg(msg, no_type_conversion)
msgname = type(r).__name__
if context not in r or r.context == 0 or context != r.context:
# Message being queued
diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py
index 13721ff..5dce03b 100644
--- a/src/vpp-api/python/vpp_papi/vpp_serializer.py
+++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py
@@ -17,7 +17,10 @@
import collections
from enum import IntEnum
import logging
-from .vpp_format import VPPFormat
+from . import vpp_format
+import ipaddress
+import sys
+import socket
#
# Set log-level in application by doing e.g.:
@@ -26,6 +29,32 @@
#
logger = logging.getLogger(__name__)
+if sys.version[0] == '2':
+ check = lambda d: type(d) is dict
+else:
+ check = lambda d: type(d) is dict or type(d) is bytes
+
+def conversion_required(data, field_type):
+ if check(data):
+ return False
+ try:
+ if type(data).__name__ in vpp_format.conversion_table[field_type]:
+ return True
+ except KeyError:
+ return False
+
+
+def conversion_packer(data, field_type):
+ t = type(data).__name__
+ return types[field_type].pack(vpp_format.
+ conversion_table[field_type][t](data))
+
+
+def conversion_unpacker(data, field_type):
+ if field_type not in vpp_format.conversion_unpacker_table:
+ return data
+ return vpp_format.conversion_unpacker_table[field_type](data)
+
class BaseTypes(object):
def __init__(self, type, elements=0):
@@ -51,7 +80,7 @@
data = 0
return self.packer.pack(data)
- def unpack(self, data, offset, result=None):
+ def unpack(self, data, offset, result=None, ntc=False):
return self.packer.unpack_from(data, offset)[0], self.packer.size
@@ -79,19 +108,21 @@
self.packer = BaseTypes(field_type, num)
self.size = self.packer.size
- def pack(self, list, kwargs=None):
+ def pack(self, data, kwargs=None):
"""Packs a fixed length bytestring. Left-pads with zeros
if input data is too short."""
- if not list:
+ if not data:
return b'\x00' * self.size
- if len(list) > self.num:
+
+ if len(data) > self.num:
raise VPPSerializerValueError(
'Fixed list length error for "{}", got: {}'
' expected: {}'
- .format(self.name, len(list), self.num))
- return self.packer.pack(list)
+ .format(self.name, len(data), self.num))
- def unpack(self, data, offset=0, result=None):
+ return self.packer.pack(data)
+
+ def unpack(self, data, offset=0, result=None, ntc=False):
if len(data[offset:]) < self.num:
raise VPPSerializerValueError(
'Invalid array length for "{}" got {}'
@@ -105,6 +136,8 @@
self.num = num
self.packer = types[field_type]
self.size = self.packer.size * num
+ self.name = name
+ self.field_type = field_type
def pack(self, list, kwargs):
if len(list) != self.num:
@@ -116,12 +149,12 @@
b += self.packer.pack(e)
return b
- def unpack(self, data, offset=0, result=None):
+ def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
result = []
total = 0
for e in range(self.num):
- x, size = self.packer.unpack(data, offset)
+ x, size = self.packer.unpack(data, offset, ntc=ntc)
result.append(x)
offset += size
total += size
@@ -153,7 +186,7 @@
b += self.packer.pack(e)
return b
- def unpack(self, data, offset=0, result=None):
+ def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
total = 0
@@ -162,11 +195,11 @@
if result[self.index] == 0:
return b'', 0
p = BaseTypes('u8', result[self.index])
- return p.unpack(data, offset)
+ return p.unpack(data, offset, ntc=ntc)
r = []
for e in range(result[self.index]):
- x, size = self.packer.unpack(data, offset)
+ x, size = self.packer.unpack(data, offset, ntc=ntc)
r.append(x)
offset += size
total += size
@@ -187,7 +220,7 @@
b += self.packer.pack(e)
return b
- def unpack(self, data, offset=0, result=None):
+ def unpack(self, data, offset=0, result=None, ntc=False):
total = 0
# Return a list of arguments
if (len(data) - offset) % self.packer.size:
@@ -196,7 +229,7 @@
elements = int((len(data) - offset) / self.packer.size)
r = []
for e in range(elements):
- x, size = self.packer.unpack(data, offset)
+ x, size = self.packer.unpack(data, offset, ntc=ntc)
r.append(x)
offset += self.packer.size
total += size
@@ -227,7 +260,7 @@
def pack(self, data, kwargs=None):
return types['u32'].pack(data)
- def unpack(self, data, offset=0, result=None):
+ def unpack(self, data, offset=0, result=None, ntc=False):
x, size = types['u32'].unpack(data, offset)
return self.enum(x), size
@@ -272,31 +305,53 @@
r[:len(b)] = b
return r
- def unpack(self, data, offset=0, result=None):
+ def unpack(self, data, offset=0, result=None, ntc=False):
r = []
maxsize = 0
for k, p in self.packers.items():
- x, size = p.unpack(data, offset)
+ x, size = p.unpack(data, offset, ntc=ntc)
if size > maxsize:
maxsize = size
r.append(x)
return self.tuple._make(r), maxsize
-def VPPTypeAlias(name, msgdef):
- t = vpp_get_type(msgdef['type'])
- if not t:
- raise ValueError()
- if 'length' in msgdef:
- if msgdef['length'] == 0:
+class VPPTypeAlias(object):
+ def __init__(self, name, msgdef):
+ self.name = name
+ t = vpp_get_type(msgdef['type'])
+ if not t:
raise ValueError()
- if msgdef['type'] == 'u8':
- types[name] = FixedList_u8(name, msgdef['type'],
- msgdef['length'])
+ if 'length' in msgdef:
+ if msgdef['length'] == 0:
+ raise ValueError()
+ if msgdef['type'] == 'u8':
+ self.packer = FixedList_u8(name, msgdef['type'],
+ msgdef['length'])
+ self.size = self.packer.size
+ else:
+ self.packer = FixedList(name, msgdef['type'], msgdef['length'])
else:
- types[name] = FixedList(name, msgdef['type'], msgdef['length'])
- else:
- types[name] = t
+ self.packer = t
+ self.size = t.size
+
+ types[name] = self
+
+ def pack(self, data, kwargs=None):
+ if data and conversion_required(data, self.name):
+ try:
+ return conversion_packer(data, self.name)
+ # Python 2 and 3 raises different exceptions from inet_pton
+ except(OSError, socket.error, TypeError):
+ pass
+
+ return self.packer.pack(data, kwargs)
+
+ def unpack(self, data, offset=0, result=None, ntc=False):
+ t, size = self.packer.unpack(data, offset, result, ntc=ntc)
+ if not ntc:
+ return conversion_unpacker(t, self.name), size
+ return t, size
class VPPType(object):
@@ -352,9 +407,12 @@
if not kwargs:
kwargs = data
b = bytes()
- for i, a in enumerate(self.fields):
- # Try one of the format functions
+ # Try one of the format functions
+ if data and conversion_required(data, self.name):
+ return conversion_packer(data, self.name)
+
+ for i, a in enumerate(self.fields):
if data and type(data) is not dict and a not in data:
raise VPPSerializerValueError(
"Invalid argument: {} expected {}.{}".
@@ -367,33 +425,27 @@
else:
arg = data[a]
kwarg = kwargs[a] if a in kwargs else None
-
if isinstance(self.packers[i], VPPType):
- try:
- b += self.packers[i].pack(arg, kwarg)
- except ValueError:
- # Invalid argument, can we convert it?
- arg = VPPFormat.format(self.packers[i].name, data[a])
- data[a] = arg
- kwarg = arg
- b += self.packers[i].pack(arg, kwarg)
+ b += self.packers[i].pack(arg, kwarg)
else:
b += self.packers[i].pack(arg, kwargs)
return b
- def unpack(self, data, offset=0, result=None):
+ def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
result = []
total = 0
for p in self.packers:
- x, size = p.unpack(data, offset, result)
+ x, size = p.unpack(data, offset, result, ntc)
if type(x) is tuple and len(x) == 1:
x = x[0]
result.append(x)
offset += size
total += size
t = self.tuple._make(result)
+ if not ntc:
+ t = conversion_unpacker(t, self.name)
return t, total
diff --git a/test/test_ip4.py b/test/test_ip4.py
index 5409044..685d089 100644
--- a/test/test_ip4.py
+++ b/test/test_ip4.py
@@ -14,7 +14,7 @@
from util import ppp
from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
- VppMplsTable, VppIpTable, VppIpAddress
+ VppMplsTable, VppIpTable
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
@@ -1123,7 +1123,7 @@
#
# Configure a punt redirect via pg1.
#
- nh_addr = VppIpAddress(self.pg1.remote_ip4).encode()
+ nh_addr = self.pg1.remote_ip4
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg1.sw_if_index,
nh_addr)
@@ -1187,7 +1187,7 @@
#
# Configure a punt redirects
#
- nh_address = VppIpAddress(self.pg3.remote_ip4).encode()
+ nh_address = self.pg3.remote_ip4
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg3.sw_if_index,
nh_address)
@@ -1196,7 +1196,7 @@
nh_address)
self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
self.pg3.sw_if_index,
- VppIpAddress('0.0.0.0').encode())
+ '0.0.0.0')
#
# Dump pg0 punt redirects
@@ -1212,8 +1212,8 @@
self.assertEqual(len(punts), 3)
for p in punts:
self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
- self.assertNotEqual(punts[1].punt.nh.un.ip4, self.pg3.remote_ip4)
- self.assertEqual(punts[2].punt.nh.un.ip4, '\x00'*4)
+ self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip4)
+ self.assertEqual(str(punts[2].punt.nh), '0.0.0.0')
class TestIPDeag(VppTestCase):
diff --git a/test/test_ip6.py b/test/test_ip6.py
index 3a9e88c..6c44d79 100644
--- a/test/test_ip6.py
+++ b/test/test_ip6.py
@@ -21,7 +21,7 @@
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
- VppMplsRoute, VppMplsTable, VppIpTable, VppIpAddress
+ VppMplsRoute, VppMplsTable, VppIpTable
from vpp_neighbor import find_nbr, VppNeighbor
from vpp_pg_interface import is_ipv6_misc
from vpp_sub_interface import VppSubInterface, VppDot1QSubint
@@ -1949,7 +1949,7 @@
#
# Configure a punt redirect via pg1.
#
- nh_addr = VppIpAddress(self.pg1.remote_ip6).encode()
+ nh_addr = self.pg1.remote_ip6
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg1.sw_if_index,
nh_addr)
@@ -2013,7 +2013,7 @@
#
# Configure a punt redirects
#
- nh_addr = VppIpAddress(self.pg3.remote_ip6).encode()
+ nh_addr = self.pg3.remote_ip6
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg3.sw_if_index,
nh_addr)
@@ -2022,7 +2022,7 @@
nh_addr)
self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
self.pg3.sw_if_index,
- VppIpAddress('0::0').encode())
+ '0::0')
#
# Dump pg0 punt redirects
@@ -2039,8 +2039,8 @@
self.assertEqual(len(punts), 3)
for p in punts:
self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
- self.assertNotEqual(punts[1].punt.nh.un.ip6, self.pg3.remote_ip6)
- self.assertEqual(punts[2].punt.nh.un.ip6, '\x00'*16)
+ self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
+ self.assertEqual(str(punts[2].punt.nh), '::')
class TestIPDeag(VppTestCase):
diff --git a/test/test_l2bd_arp_term.py b/test/test_l2bd_arp_term.py
index ddba79b..f25be57 100644
--- a/test/test_l2bd_arp_term.py
+++ b/test/test_l2bd_arp_term.py
@@ -21,7 +21,6 @@
from framework import VppTestCase, VppTestRunner
from util import Host, ppp
from vpp_mac import VppMacAddress, mactobinary
-from vpp_ip import VppIpAddress
class TestL2bdArpTerm(VppTestCase):
@@ -74,7 +73,7 @@
ip = e.ip4 if is_ipv6 == 0 else e.ip6
self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
mac=VppMacAddress(e.mac).encode(),
- ip=VppIpAddress(ip).encode(),
+ ip=ip,
is_ipv6=is_ipv6,
is_add=is_add)
diff --git a/test/test_map.py b/test/test_map.py
index 3d02853..e98532e 100644
--- a/test/test_map.py
+++ b/test/test_map.py
@@ -6,7 +6,7 @@
from framework import VppTestCase, VppTestRunner
from vpp_ip import *
from vpp_ip_route import VppIpRoute, VppRoutePath
-
+from ipaddress import IPv6Network, IPv4Network
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
@@ -76,9 +76,9 @@
#
# Add a domain that maps from pg0 to pg1
#
- map_dst = VppIp6Prefix(map_br_pfx, map_br_pfx_len).encode()
- map_src = VppIp6Prefix("3000::1", 128).encode()
- client_pfx = VppIp4Prefix("192.168.0.0", 16).encode()
+ map_dst = '{}/{}'.format(map_br_pfx, map_br_pfx_len)
+ map_src = '3000::1/128'
+ client_pfx = '192.168.0.0/16'
self.vapi.map_add_domain(map_dst, map_src, client_pfx)
#
@@ -168,10 +168,10 @@
#
# Add a domain that maps from pg0 to pg1
#
- map_dst = VppIp6Prefix("2001:db8::", 32).encode()
- map_src = VppIp6Prefix("1234:5678:90ab:cdef::", 64).encode()
- ip4_pfx = VppIp4Prefix("192.168.0.0", 24).encode()
- self.vapi.map_add_domain(map_dst, map_src, ip4_pfx, 16, 6, 4, 1)
+ self.vapi.map_add_domain('2001:db8::/32',
+ '1234:5678:90ab:cdef::/64',
+ '192.168.0.0/24',
+ 16, 6, 4, 1)
# Enable MAP-T on interfaces.
diff --git a/test/test_syslog.py b/test/test_syslog.py
index a545130..9b328be 100644
--- a/test/test_syslog.py
+++ b/test/test_syslog.py
@@ -94,10 +94,10 @@
""" Syslog Protocol test """
self.vapi.syslog_set_sender(self.pg0.remote_ip4n, self.pg0.local_ip4n)
config = self.vapi.syslog_get_sender()
- self.assertEqual(config.collector_address,
- self.pg0.remote_ip4n)
+ self.assertEqual(str(config.collector_address),
+ self.pg0.remote_ip4)
self.assertEqual(config.collector_port, 514)
- self.assertEqual(config.src_address, self.pg0.local_ip4n)
+ self.assertEqual(str(config.src_address), self.pg0.local_ip4)
self.assertEqual(config.vrf_id, 0)
self.assertEqual(config.max_msg_size, 480)
diff --git a/test/vpp_igmp.py b/test/vpp_igmp.py
index c4cbab1..2403fbf 100644
--- a/test/vpp_igmp.py
+++ b/test/vpp_igmp.py
@@ -16,8 +16,7 @@
def find_igmp_state(states, itf, gaddr, saddr):
for s in states:
if s.sw_if_index == itf.sw_if_index and \
- s.gaddr == socket.inet_pton(socket.AF_INET, gaddr) and \
- s.saddr == socket.inet_pton(socket.AF_INET, saddr):
+ str(s.gaddr) == gaddr and str(s.saddr) == saddr:
return True
return False
@@ -25,8 +24,7 @@
def wait_for_igmp_event(test, timeout, itf, gaddr, saddr, ff):
ev = test.vapi.wait_for_event(timeout, "igmp_event")
if ev.sw_if_index == itf.sw_if_index and \
- ev.gaddr == socket.inet_pton(socket.AF_INET, gaddr) and \
- ev.saddr == socket.inet_pton(socket.AF_INET, saddr) and \
+ str(ev.gaddr) == gaddr and str(ev.saddr) == saddr and \
ev.filter == ff:
return True
return False
diff --git a/test/vpp_ip.py b/test/vpp_ip.py
index 7f3a6b0..055e6d7 100644
--- a/test/vpp_ip.py
+++ b/test/vpp_ip.py
@@ -192,20 +192,6 @@
return False
-class VppIp6Prefix():
- def __init__(self, prefix, prefixlen):
- self.ip_prefix = ip_address(unicode(prefix))
- self.prefixlen = prefixlen
-
- def encode(self):
- return {'prefix': self.ip_prefix.packed,
- 'len': self.prefixlen}
-
-
-class VppIp4Prefix(VppIp6Prefix):
- pass
-
-
class VppIpMPrefix():
def __init__(self, saddr, gaddr, len):
self.saddr = saddr
diff --git a/test/vpp_papi_provider.py b/test/vpp_papi_provider.py
index 26d5fcf..320ea12 100644
--- a/test/vpp_papi_provider.py
+++ b/test/vpp_papi_provider.py
@@ -2821,7 +2821,8 @@
def vxlan_gbp_tunnel_dump(self, sw_if_index=0xffffffff):
return self.api(self.papi.vxlan_gbp_tunnel_dump,
- {'sw_if_index': sw_if_index})
+ {'sw_if_index': sw_if_index,
+ '_no_type_conversion': True})
def pppoe_add_del_session(
self,
@@ -3553,7 +3554,8 @@
def gbp_endpoint_dump(self):
""" GBP endpoint Dump """
- return self.api(self.papi.gbp_endpoint_dump, {})
+ return self.api(self.papi.gbp_endpoint_dump,
+ {'_no_type_conversion': True})
def gbp_endpoint_group_add(self, epg, bd,
rd, uplink_sw_if_index):
@@ -3665,7 +3667,8 @@
def gbp_subnet_dump(self):
""" GBP Subnet Dump """
- return self.api(self.papi.gbp_subnet_dump, {})
+ return self.api(self.papi.gbp_subnet_dump,
+ {'_no_type_conversion': True})
def gbp_contract_add_del(self, is_add, src_epg, dst_epg, acl_index, rules):
""" GBP contract Add/Del """
diff --git a/test/vpp_udp_encap.py b/test/vpp_udp_encap.py
index 209115f..5e2df76 100644
--- a/test/vpp_udp_encap.py
+++ b/test/vpp_udp_encap.py
@@ -5,15 +5,14 @@
from vpp_object import *
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
-from vpp_ip import *
def find_udp_encap(test, ue):
encaps = test.vapi.udp_encap_dump()
for e in encaps:
if ue.id == e.udp_encap.id \
- and ue.src_ip == e.udp_encap.src_ip \
- and ue.dst_ip == e.udp_encap.dst_ip \
+ and ue.src_ip == str(e.udp_encap.src_ip) \
+ and ue.dst_ip == str(e.udp_encap.dst_ip) \
and e.udp_encap.dst_port == ue.dst_port \
and e.udp_encap.src_port == ue.src_port:
return True
@@ -34,15 +33,15 @@
self.table_id = table_id
self.src_ip_s = src_ip
self.dst_ip_s = dst_ip
- self.src_ip = VppIpAddress(src_ip)
- self.dst_ip = VppIpAddress(dst_ip)
+ self.src_ip = src_ip
+ self.dst_ip = dst_ip
self.src_port = src_port
self.dst_port = dst_port
def add_vpp_config(self):
r = self._test.vapi.udp_encap_add(
- self.src_ip.encode(),
- self.dst_ip.encode(),
+ self.src_ip,
+ self.dst_ip,
self.src_port,
self.dst_port,
self.table_id)