blob: d8461398765ff2157ea8868cd786ae843f47a761 [file] [log] [blame]
#
# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import collections
from enum import IntFlag
import logging
import socket
import struct
import sys
from . import vpp_format
#
# Set log-level in application by doing e.g.:
# logger = logging.getLogger('vpp_serializer')
# logger.setLevel(logging.DEBUG)
#
logger = logging.getLogger("vpp_papi.serializer")
def check(d):
return type(d) is dict or type(d) is bytes
def conversion_required(data, field_type):
if check(data):
return False
try:
if type(data).__name__ in vpp_format.conversion_table[field_type]:
return True
except KeyError:
return False
def conversion_packer(data, field_type):
t = type(data).__name__
return types[field_type].pack(vpp_format.conversion_table[field_type][t](data))
def conversion_unpacker(data, field_type):
if field_type not in vpp_format.conversion_unpacker_table:
return data
return vpp_format.conversion_unpacker_table[field_type](data)
class Packer:
options = {}
def pack(self, data, kwargs):
raise NotImplementedError
def unpack(self, data, offset, result=None, ntc=False):
raise NotImplementedError
# override as appropriate in subclasses
@staticmethod
def _get_packer_with_options(f_type, options):
return types[f_type]
def get_packer_with_options(self, f_type, options):
if options is not None:
try:
c = types[f_type].__class__
return c._get_packer_with_options(f_type, options)
except IndexError:
raise VPPSerializerValueError(
"Options not supported for {}{} ({})".format(
f_type, types[f_type].__class__, options
)
)
class BaseTypes(Packer):
def __init__(self, type, elements=0, options=None):
self._type = type
self._elements = elements
base_types = {
"u8": ">B",
"i8": ">b",
"string": ">s",
"u16": ">H",
"i16": ">h",
"u32": ">I",
"i32": ">i",
"u64": ">Q",
"i64": ">q",
"f64": "=d",
"bool": ">?",
"header": ">HI",
}
if elements > 0 and (type == "u8" or type == "string"):
self.packer = struct.Struct(">%ss" % elements)
else:
self.packer = struct.Struct(base_types[type])
self.size = self.packer.size
self.options = options
def pack(self, data, kwargs=None):
if data is None: # Default to zero if not specified
if self.options and "default" in self.options:
data = self.options["default"]
else:
data = 0
return self.packer.pack(data)
def unpack(self, data, offset, result=None, ntc=False):
return self.packer.unpack_from(data, offset)[0], self.packer.size
@staticmethod
def _get_packer_with_options(f_type, options):
return BaseTypes(f_type, options=options)
def __repr__(self):
return "BaseTypes(type=%s, elements=%s, options=%s)" % (
self._type,
self._elements,
self.options,
)
class String(Packer):
def __init__(self, name, num, options):
self.name = name
self.num = num
self.size = 1
self.length_field_packer = BaseTypes("u32")
self.limit = options["limit"] if "limit" in options else num
self.fixed = True if num else False
if self.fixed and not self.limit:
raise VPPSerializerValueError(
"Invalid combination for: {}, {} fixed:{} limit:{}".format(
name, options, self.fixed, self.limit
)
)
def pack(self, list, kwargs=None):
if not list:
if self.fixed:
return b"\x00" * self.limit
return self.length_field_packer.pack(0) + b""
if self.limit and len(list) > self.limit - 1:
raise VPPSerializerValueError(
"Invalid argument length for: {}, {} maximum {}".format(
list, len(list), self.limit - 1
)
)
if self.fixed:
return list.encode("ascii").ljust(self.limit, b"\x00")
return self.length_field_packer.pack(len(list)) + list.encode("ascii")
def unpack(self, data, offset=0, result=None, ntc=False):
if self.fixed:
p = BaseTypes("u8", self.num)
s = p.unpack(data, offset)
s2 = s[0].split(b"\0", 1)[0]
return (s2.decode("ascii"), self.num)
length, length_field_size = self.length_field_packer.unpack(data, offset)
if length == 0:
return "", 0
p = BaseTypes("u8", length)
x, size = p.unpack(data, offset + length_field_size)
return (x.decode("ascii", errors="replace"), size + length_field_size)
types = {
"u8": BaseTypes("u8"),
"i8": BaseTypes("i8"),
"u16": BaseTypes("u16"),
"i16": BaseTypes("i16"),
"u32": BaseTypes("u32"),
"i32": BaseTypes("i32"),
"u64": BaseTypes("u64"),
"i64": BaseTypes("i64"),
"f64": BaseTypes("f64"),
"bool": BaseTypes("bool"),
"string": String,
}
class_types = {}
def vpp_get_type(name):
try:
return types[name]
except KeyError:
return None
class VPPSerializerValueError(ValueError):
pass
class FixedList_u8(Packer):
def __init__(self, name, field_type, num):
self.name = name
self.num = num
self.packer = BaseTypes(field_type, num)
self.size = self.packer.size
self.field_type = field_type
def pack(self, data, kwargs=None):
"""Packs a fixed length bytestring. Left-pads with zeros
if input data is too short."""
if not data:
return b"\x00" * self.size
if len(data) > self.num:
raise VPPSerializerValueError(
'Fixed list length error for "{}", got: {}'
" expected: {}".format(self.name, len(data), self.num)
)
try:
return self.packer.pack(data)
except struct.error:
raise VPPSerializerValueError(
'Packing failed for "{}" {}'.format(self.name, kwargs)
)
def unpack(self, data, offset=0, result=None, ntc=False):
if len(data[offset:]) < self.num:
raise VPPSerializerValueError(
'Invalid array length for "{}" got {}'
" expected {}".format(self.name, len(data[offset:]), self.num)
)
return self.packer.unpack(data, offset)
def __repr__(self):
return "FixedList_u8(name=%s, field_type=%s, num=%s)" % (
self.name,
self.field_type,
self.num,
)
class FixedList(Packer):
def __init__(self, name, field_type, num):
self.num = num
self.packer = types[field_type]
self.size = self.packer.size * num
self.name = name
self.field_type = field_type
def pack(self, list, kwargs):
if len(list) != self.num:
raise VPPSerializerValueError(
"Fixed list length error, got: {} expected: {}".format(
len(list), self.num
)
)
b = bytearray()
for e in list:
b += self.packer.pack(e)
return bytes(b)
def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
result = []
total = 0
for e in range(self.num):
x, size = self.packer.unpack(data, offset, ntc=ntc)
result.append(x)
offset += size
total += size
return result, total
def __repr__(self):
return "FixedList(name=%s, field_type=%s, num=%s)" % (
self.name,
self.field_type,
self.num,
)
class VLAList(Packer):
def __init__(self, name, field_type, len_field_name, index):
self.name = name
self.field_type = field_type
self.index = index
self.packer = types[field_type]
self.size = self.packer.size
self.length_field = len_field_name
def pack(self, lst, kwargs=None):
if not lst:
return b""
if len(lst) != kwargs[self.length_field]:
raise VPPSerializerValueError(
"Variable length error, got: {} expected: {}".format(
len(lst), kwargs[self.length_field]
)
)
# u8 array
if self.packer.size == 1 and self.field_type == "u8":
if isinstance(lst, list):
return b"".join(lst)
return bytes(lst)
b = bytearray()
for e in lst:
b += self.packer.pack(e)
return bytes(b)
def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
total = 0
# u8 array
if self.packer.size == 1 and self.field_type == "u8":
if result[self.index] == 0:
return b"", 0
p = BaseTypes("u8", result[self.index])
return p.unpack(data, offset, ntc=ntc)
r = []
for e in range(result[self.index]):
x, size = self.packer.unpack(data, offset, ntc=ntc)
r.append(x)
offset += size
total += size
return r, total
def __repr__(self):
return "VLAList(name=%s, field_type=%s, " "len_field_name=%s, index=%s)" % (
self.name,
self.field_type,
self.length_field,
self.index,
)
class VLAList_legacy(Packer):
def __init__(self, name, field_type):
self.name = name
self.field_type = field_type
self.packer = types[field_type]
self.size = self.packer.size
def pack(self, list, kwargs=None):
if self.packer.size == 1:
return bytes(list)
b = bytearray()
for e in list:
b += self.packer.pack(e)
return bytes(b)
def unpack(self, data, offset=0, result=None, ntc=False):
total = 0
# Return a list of arguments
if (len(data) - offset) % self.packer.size:
raise VPPSerializerValueError(
"Legacy Variable Length Array length mismatch."
)
elements = int((len(data) - offset) / self.packer.size)
r = []
for e in range(elements):
x, size = self.packer.unpack(data, offset, ntc=ntc)
r.append(x)
offset += self.packer.size
total += size
return r, total
def __repr__(self):
return "VLAList_legacy(name=%s, field_type=%s)" % (self.name, self.field_type)
# Will change to IntEnum after 21.04 release
class VPPEnumType(Packer):
output_class = IntFlag
def __init__(self, name, msgdef, options=None):
self.size = types["u32"].size
self.name = name
self.enumtype = "u32"
self.msgdef = msgdef
e_hash = {}
for f in msgdef:
if type(f) is dict and "enumtype" in f:
if f["enumtype"] != "u32":
self.size = types[f["enumtype"]].size
self.enumtype = f["enumtype"]
continue
ename, evalue = f
e_hash[ename] = evalue
self.enum = self.output_class(name, e_hash)
types[name] = self
class_types[name] = self.__class__
self.options = options
def __getattr__(self, name):
return self.enum[name]
def __bool__(self):
return True
def pack(self, data, kwargs=None):
if data is None: # Default to zero if not specified
if self.options and "default" in self.options:
data = self.options["default"]
else:
data = 0
return types[self.enumtype].pack(data)
def unpack(self, data, offset=0, result=None, ntc=False):
x, size = types[self.enumtype].unpack(data, offset)
return self.enum(x), size
@classmethod
def _get_packer_with_options(cls, f_type, options):
return cls(f_type, types[f_type].msgdef, options=options)
def __repr__(self):
return "%s(name=%s, msgdef=%s, options=%s)" % (
self.__class__.__name__,
self.name,
self.msgdef,
self.options,
)
class VPPEnumFlagType(VPPEnumType):
output_class = IntFlag
def __init__(self, name, msgdef, options=None):
super(VPPEnumFlagType, self).__init__(name, msgdef, options)
class VPPUnionType(Packer):
def __init__(self, name, msgdef):
self.name = name
self.msgdef = msgdef
self.size = 0
self.maxindex = 0
fields = []
self.packers = collections.OrderedDict()
for i, f in enumerate(msgdef):
if type(f) is dict and "crc" in f:
self.crc = f["crc"]
continue
f_type, f_name = f
if f_type not in types:
logger.debug("Unknown union type {}".format(f_type))
raise VPPSerializerValueError("Unknown message type {}".format(f_type))
fields.append(f_name)
size = types[f_type].size
self.packers[f_name] = types[f_type]
if size > self.size:
self.size = size
self.maxindex = i
types[name] = self
self.tuple = collections.namedtuple(name, fields, rename=True)
# Union of variable length?
def pack(self, data, kwargs=None):
if not data:
return b"\x00" * self.size
for k, v in data.items():
logger.debug("Key: {} Value: {}".format(k, v))
b = self.packers[k].pack(v, kwargs)
break
r = bytearray(self.size)
r[: len(b)] = b
return r
def unpack(self, data, offset=0, result=None, ntc=False):
r = []
maxsize = 0
for k, p in self.packers.items():
x, size = p.unpack(data, offset, ntc=ntc)
if size > maxsize:
maxsize = size
r.append(x)
return self.tuple._make(r), maxsize
def __repr__(self):
return "VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef)
class VPPTypeAlias(Packer):
def __init__(self, name, msgdef, options=None):
self.name = name
self.msgdef = msgdef
t = vpp_get_type(msgdef["type"])
if not t:
raise ValueError("No such type: {}".format(msgdef["type"]))
if "length" in msgdef:
if msgdef["length"] == 0:
raise ValueError()
if msgdef["type"] == "u8":
self.packer = FixedList_u8(name, msgdef["type"], msgdef["length"])
self.size = self.packer.size
else:
self.packer = FixedList(name, msgdef["type"], msgdef["length"])
else:
self.packer = t
self.size = t.size
types[name] = self
self.toplevelconversion = False
self.options = options
def pack(self, data, kwargs=None):
if data and conversion_required(data, self.name):
try:
return conversion_packer(data, self.name)
# Python 2 and 3 raises different exceptions from inet_pton
except (OSError, socket.error, TypeError):
pass
if data is None: # Default to zero if not specified
if self.options and "default" in self.options:
data = self.options["default"]
else:
data = 0
return self.packer.pack(data, kwargs)
@staticmethod
def _get_packer_with_options(f_type, options):
return VPPTypeAlias(f_type, types[f_type].msgdef, options=options)
def unpack(self, data, offset=0, result=None, ntc=False):
if ntc is False and self.name in vpp_format.conversion_unpacker_table:
# Disable type conversion for dependent types
ntc = True
self.toplevelconversion = True
t, size = self.packer.unpack(data, offset, result, ntc=ntc)
if self.toplevelconversion:
self.toplevelconversion = False
return conversion_unpacker(t, self.name), size
return t, size
def __repr__(self):
return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % (
self.name,
self.msgdef,
self.options,
)
class VPPType(Packer):
# Set everything up to be able to pack / unpack
def __init__(self, name, msgdef):
self.name = name
self.msgdef = msgdef
self.packers = []
self.fields = []
self.fieldtypes = []
self.field_by_name = {}
size = 0
for i, f in enumerate(msgdef):
if type(f) is dict and "crc" in f:
self.crc = f["crc"]
continue
f_type, f_name = f[:2]
self.fields.append(f_name)
self.field_by_name[f_name] = None
self.fieldtypes.append(f_type)
if f_type not in types:
logger.debug("Unknown type {}".format(f_type))
raise VPPSerializerValueError("Unknown message type {}".format(f_type))
fieldlen = len(f)
options = [x for x in f if type(x) is dict]
if len(options):
self.options = options[0]
fieldlen -= 1
else:
self.options = {}
if fieldlen == 3: # list
list_elements = f[2]
if list_elements == 0:
if f_type == "string":
p = String(f_name, 0, self.options)
else:
p = VLAList_legacy(f_name, f_type)
self.packers.append(p)
elif f_type == "u8":
p = FixedList_u8(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
elif f_type == "string":
p = String(f_name, list_elements, self.options)
self.packers.append(p)
size += p.size
else:
p = FixedList(f_name, f_type, list_elements)
self.packers.append(p)
size += p.size
elif fieldlen == 4: # Variable length list
length_index = self.fields.index(f[3])
p = VLAList(f_name, f_type, f[3], length_index)
self.packers.append(p)
else:
# default support for types that decay to basetype
if "default" in self.options:
p = self.get_packer_with_options(f_type, self.options)
else:
p = types[f_type]
self.packers.append(p)
size += p.size
self.size = size
self.tuple = collections.namedtuple(name, self.fields, rename=True)
types[name] = self
self.toplevelconversion = False
def pack(self, data, kwargs=None):
if not kwargs:
kwargs = data
b = bytearray()
# Try one of the format functions
if data and conversion_required(data, self.name):
return conversion_packer(data, self.name)
for i, a in enumerate(self.fields):
if data and type(data) is not dict and a not in data:
raise VPPSerializerValueError(
"Invalid argument: {} expected {}.{}".format(data, self.name, a)
)
# Defaulting to zero.
if not data or a not in data: # Default to 0
arg = None
kwarg = None # No default for VLA
else:
arg = data[a]
kwarg = kwargs[a] if a in kwargs else None
if isinstance(self.packers[i], VPPType):
b += self.packers[i].pack(arg, kwarg)
else:
b += self.packers[i].pack(arg, kwargs)
return bytes(b)
def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
result = []
total = 0
if ntc is False and self.name in vpp_format.conversion_unpacker_table:
# Disable type conversion for dependent types
ntc = True
self.toplevelconversion = True
for p in self.packers:
x, size = p.unpack(data, offset, result, ntc)
if type(x) is tuple and len(x) == 1:
x = x[0]
result.append(x)
offset += size
total += size
t = self.tuple._make(result)
if self.toplevelconversion:
self.toplevelconversion = False
t = conversion_unpacker(t, self.name)
return t, total
def __repr__(self):
return "%s(name=%s, msgdef=%s)" % (
self.__class__.__name__,
self.name,
self.msgdef,
)
class VPPMessage(VPPType):
pass