| """ test framework utilities """ |
| |
| import socket |
| import sys |
| from abc import abstractmethod, ABCMeta |
| from cStringIO import StringIO |
| from scapy.layers.inet6 import in6_mactoifaceid |
| |
| from scapy.layers.l2 import Ether |
| from scapy.packet import Raw |
| from scapy.layers.inet import IP, UDP, TCP |
| from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest |
| from scapy.packet import Packet |
| from socket import inet_pton, AF_INET, AF_INET6 |
| |
| |
| def ppp(headline, packet): |
| """ Return string containing the output of scapy packet.show() call. """ |
| o = StringIO() |
| old_stdout = sys.stdout |
| sys.stdout = o |
| print(headline) |
| packet.show() |
| sys.stdout = old_stdout |
| return o.getvalue() |
| |
| |
| def ppc(headline, capture, limit=10): |
| """ Return string containing ppp() printout for a capture. |
| |
| :param headline: printed as first line of output |
| :param capture: packets to print |
| :param limit: limit the print to # of packets |
| """ |
| if not capture: |
| return headline |
| tail = "" |
| if limit < len(capture): |
| tail = "\nPrint limit reached, %s out of %s packets printed" % ( |
| len(capture), limit) |
| limit = len(capture) |
| body = "".join([ppp("Packet #%s:" % count, p) |
| for count, p in zip(range(0, limit), capture)]) |
| return "%s\n%s%s" % (headline, body, tail) |
| |
| |
| def ip4_range(ip4, s, e): |
| tmp = ip4.rsplit('.', 1)[0] |
| return ("%s.%d" % (tmp, i) for i in range(s, e)) |
| |
| |
| def ip4n_range(ip4n, s, e): |
| ip4 = socket.inet_ntop(socket.AF_INET, ip4n) |
| return (socket.inet_pton(socket.AF_INET, ip) |
| for ip in ip4_range(ip4, s, e)) |
| |
| |
| def mactobinary(mac): |
| """ Convert the : separated format into binary packet data for the API """ |
| return mac.replace(':', '').decode('hex') |
| |
| |
| def mk_ll_addr(mac): |
| euid = in6_mactoifaceid(mac) |
| addr = "fe80::" + euid |
| return addr |
| |
| |
| class NumericConstant(object): |
| __metaclass__ = ABCMeta |
| |
| desc_dict = {} |
| |
| @abstractmethod |
| def __init__(self, value): |
| self._value = value |
| |
| def __int__(self): |
| return self._value |
| |
| def __long__(self): |
| return self._value |
| |
| def __str__(self): |
| if self._value in self.desc_dict: |
| return self.desc_dict[self._value] |
| return "" |
| |
| |
| class Host(object): |
| """ Generic test host "connected" to VPPs interface. """ |
| |
| @property |
| def mac(self): |
| """ MAC address """ |
| return self._mac |
| |
| @property |
| def bin_mac(self): |
| """ MAC address """ |
| return mactobinary(self._mac) |
| |
| @property |
| def ip4(self): |
| """ IPv4 address - string """ |
| return self._ip4 |
| |
| @property |
| def ip4n(self): |
| """ IPv4 address of remote host - raw, suitable as API parameter.""" |
| return socket.inet_pton(socket.AF_INET, self._ip4) |
| |
| @property |
| def ip6(self): |
| """ IPv6 address - string """ |
| return self._ip6 |
| |
| @property |
| def ip6n(self): |
| """ IPv6 address of remote host - raw, suitable as API parameter.""" |
| return socket.inet_pton(socket.AF_INET6, self._ip6) |
| |
| @property |
| def ip6_ll(self): |
| """ IPv6 link-local address - string """ |
| return self._ip6_ll |
| |
| @property |
| def ip6n_ll(self): |
| """ IPv6 link-local address of remote host - |
| raw, suitable as API parameter.""" |
| return socket.inet_pton(socket.AF_INET6, self._ip6_ll) |
| |
| def __eq__(self, other): |
| if isinstance(other, Host): |
| return (self.mac == other.mac and |
| self.ip4 == other.ip4 and |
| self.ip6 == other.ip6 and |
| self.ip6_ll == other.ip6_ll) |
| else: |
| return False |
| |
| def __ne__(self, other): |
| return not self.__eq__(other) |
| |
| def __repr__(self): |
| return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac, |
| self.ip4, |
| self.ip6, |
| self.ip6_ll) |
| |
| def __hash__(self): |
| return hash(self.__repr__()) |
| |
| def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None): |
| self._mac = mac |
| self._ip4 = ip4 |
| self._ip6 = ip6 |
| self._ip6_ll = ip6_ll |
| |
| |
| class ForeignAddressFactory(object): |
| count = 0 |
| prefix_len = 24 |
| net_template = '10.10.10.{}' |
| net = net_template.format(0) + '/' + str(prefix_len) |
| |
| def get_ip4(self): |
| if self.count > 255: |
| raise Exception("Network host address exhaustion") |
| self.count += 1 |
| return self.net_template.format(self.count) |
| |
| |
| class L4_Conn(): |
| """ L4 'connection' tied to two VPP interfaces """ |
| def __init__(self, testcase, if1, if2, af, l4proto, port1, port2): |
| self.testcase = testcase |
| self.ifs = [None, None] |
| self.ifs[0] = if1 |
| self.ifs[1] = if2 |
| self.address_family = af |
| self.l4proto = l4proto |
| self.ports = [None, None] |
| self.ports[0] = port1 |
| self.ports[1] = port2 |
| self |
| |
| def pkt(self, side, l4args={}, payload="x"): |
| is_ip6 = 1 if self.address_family == AF_INET6 else 0 |
| s0 = side |
| s1 = 1-side |
| src_if = self.ifs[s0] |
| dst_if = self.ifs[s1] |
| layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4), |
| IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)] |
| merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]} |
| merged_l4args.update(l4args) |
| p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / |
| layer_3[is_ip6] / |
| self.l4proto(**merged_l4args) / |
| Raw(payload)) |
| return p |
| |
| def send(self, side, flags=None, payload=""): |
| l4args = {} |
| if flags is not None: |
| l4args['flags'] = flags |
| self.ifs[side].add_stream(self.pkt(side, |
| l4args=l4args, payload=payload)) |
| self.ifs[1-side].enable_capture() |
| self.testcase.pg_start() |
| |
| def recv(self, side): |
| p = self.ifs[side].wait_for_packet(1) |
| return p |
| |
| def send_through(self, side, flags=None, payload=""): |
| self.send(side, flags, payload) |
| p = self.recv(1-side) |
| return p |
| |
| def send_pingpong(self, side, flags1=None, flags2=None): |
| p1 = self.send_through(side, flags1) |
| p2 = self.send_through(1-side, flags2) |
| return [p1, p2] |
| |
| |
| class L4_CONN_SIDE: |
| L4_CONN_SIDE_ZERO = 0 |
| L4_CONN_SIDE_ONE = 1 |