blob: 59ef5efc864328229123bc9bfe9dd46232ce05b1 [file] [log] [blame]
Pavel Kotucekf6e3dc42016-11-04 09:58:01 +01001#!/usr/bin/env python
2
3import unittest
4import random
5
6from scapy.packet import Raw
7from scapy.layers.l2 import Ether
8from scapy.layers.inet import IP, UDP
9from logging import *
10
11from framework import VppTestCase, VppTestRunner
12from util import Host
13
14
15class TestSpan(VppTestCase):
16 """ SPAN Test Case """
17
18 # Test variables
19 hosts_nr = 10 # Number of hosts
20 pkts_per_burst = 257 # Number of packets per burst
21
22 @classmethod
23 def setUpClass(cls):
24 super(TestSpan, cls).setUpClass()
25
26 def setUp(self):
27 super(TestSpan, self).setUp()
28
29 # create 3 pg interfaces
30 self.create_pg_interfaces(range(3))
31
32 # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
33 self.flows = dict()
34 self.flows[self.pg0] = [self.pg1]
35
36 # packet sizes
37 self.pg_if_packet_sizes = [64, 512] #, 1518, 9018]
38
39 self.interfaces = list(self.pg_interfaces)
40
41 # Create host MAC and IPv4 lists
42 # self.MY_MACS = dict()
43 # self.MY_IP4S = dict()
44 self.create_host_lists(TestSpan.hosts_nr)
45
46 # Create bi-directional cross-connects between pg0 and pg1
47 self.vapi.sw_interface_set_l2_xconnect(
48 self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
49 self.vapi.sw_interface_set_l2_xconnect(
50 self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
51
52 # setup all interfaces
53 for i in self.interfaces:
54 i.admin_up()
55 i.config_ip4()
56 i.resolve_arp()
57
58 # Enable SPAN on pg0 (mirrored to pg2)
59 self.vapi.sw_interface_span_enable_disable(self.pg0.sw_if_index, self.pg2.sw_if_index)
60
61 def tearDown(self):
62 super(TestSpan, self).tearDown()
63
64 def create_host_lists(self, count):
65 """ Method to create required number of MAC and IPv4 addresses.
66 Create required number of host MAC addresses and distribute them among
67 interfaces. Create host IPv4 address for every host MAC address too.
68
69 :param count: Number of hosts to create MAC and IPv4 addresses for.
70 """
71 # mapping between packet-generator index and lists of test hosts
72 self.hosts_by_pg_idx = dict()
73
74 for pg_if in self.pg_interfaces:
75 # self.MY_MACS[i.sw_if_index] = []
76 # self.MY_IP4S[i.sw_if_index] = []
77 self.hosts_by_pg_idx[pg_if.sw_if_index] = []
78 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
79 for j in range(0, count):
80 host = Host(
81 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
82 "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
83 hosts.append(host)
84
85 def create_stream(self, src_if, packet_sizes):
86 pkts = []
87 for i in range(0, TestSpan.pkts_per_burst):
88 dst_if = self.flows[src_if][0]
89 dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
90 src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
91 pkt_info = self.create_packet_info(
92 src_if.sw_if_index, dst_if.sw_if_index)
93 payload = self.info_to_payload(pkt_info)
94 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
95 IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
96 UDP(sport=1234, dport=1234) /
97 Raw(payload))
98 pkt_info.data = p.copy()
99 size = packet_sizes[(i / 2) % len(packet_sizes)]
100 self.extend_packet(p, size)
101 pkts.append(p)
102 return pkts
103
104 def verify_capture(self, dst_if, capture_pg1, capture_pg2):
105 last_info = dict()
106 for i in self.interfaces:
107 last_info[i.sw_if_index] = None
108 dst_sw_if_index = dst_if.sw_if_index
109 if len(capture_pg1) != len(capture_pg2):
110 error("Diffrent number of outgoing and mirrored packets : %u != %u"
111 % (len(capture_pg1), len(capture_pg2)))
112 raise
113 for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
114 try:
115 ip1 = pkt_pg1[IP]
116 udp1 = pkt_pg1[UDP]
117 raw1 = pkt_pg1[Raw]
118
119 if pkt_pg1[Ether] != pkt_pg2[Ether]:
120 error("Diffrent ethernet header of outgoing and mirrored packet")
121 raise
122 if ip1 != pkt_pg2[IP]:
123 error("Diffrent ip header of outgoing and mirrored packet")
124 raise
125 if udp1 != pkt_pg2[UDP]:
126 error("Diffrent udp header of outgoing and mirrored packet")
127 raise
128 if raw1 != pkt_pg2[Raw]:
129 error("Diffrent raw data of outgoing and mirrored packet")
130 raise
131
132 payload_info = self.payload_to_info(str(raw1))
133 packet_index = payload_info.index
134 self.assertEqual(payload_info.dst, dst_sw_if_index)
135 debug("Got packet on port %s: src=%u (id=%u)" %
136 (dst_if.name, payload_info.src, packet_index))
137 next_info = self.get_next_packet_info_for_interface2(
138 payload_info.src, dst_sw_if_index,
139 last_info[payload_info.src])
140 last_info[payload_info.src] = next_info
141 self.assertTrue(next_info is not None)
142 self.assertEqual(packet_index, next_info.index)
143 saved_packet = next_info.data
144 # Check standard fields
145 self.assertEqual(ip1.src, saved_packet[IP].src)
146 self.assertEqual(ip1.dst, saved_packet[IP].dst)
147 self.assertEqual(udp1.sport, saved_packet[UDP].sport)
148 self.assertEqual(udp1.dport, saved_packet[UDP].dport)
149 except:
150 error("Unexpected or invalid packet:")
151 pkt_pg1.show()
152 pkt_pg2.show()
153 raise
154 for i in self.interfaces:
155 remaining_packet = self.get_next_packet_info_for_interface2(
156 i, dst_sw_if_index, last_info[i.sw_if_index])
157 self.assertTrue(remaining_packet is None,
158 "Port %u: Packet expected from source %u didn't"
159 " arrive" % (dst_sw_if_index, i.sw_if_index))
160
161 def test_span(self):
162 """ SPAN test
163
164 Test scenario:
165 1. config
166 3 interfaces, pg0 l2xconnected with pg1
167 2. sending l2 eth packets between 2 interfaces (pg0, pg1) and mirrored to pg2
168 64B, 512B, 1518B, 9018B (ether_size)
169 burst of packets per interface
170 """
171
172 # Create incoming packet streams for packet-generator interfaces
173 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
174 self.pg0.add_stream(pkts)
175
176 # Enable packet capturing and start packet sending
177 self.pg_enable_capture(self.pg_interfaces)
178 self.pg_start()
179
180 # Verify packets outgoing packet streams on mirrored interface (pg2)
181 info("Verifying capture on interfaces %s and %s" % (self.pg1.name, self.pg2.name))
182 self.verify_capture(self.pg1, self.pg1.get_capture(), self.pg2.get_capture())
183
184
185if __name__ == '__main__':
186 unittest.main(testRunner=VppTestRunner)