blob: b9abeaeffecee1105aa9a5287403ea6e490529b5 [file] [log] [blame]
Renato Botelho do Coutoead1e532019-10-31 13:31:07 -05001#!/usr/bin/env python3
Neale Ranns7c922dc2018-08-30 06:12:27 -07002
3import unittest
4import socket
5
6from framework import VppTestCase, VppTestRunner
Neale Rannsc0a93142018-09-05 15:42:26 -07007from vpp_ip import DpoProto
Neale Ranns7c922dc2018-08-30 06:12:27 -07008from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
Neale Rannsc0a93142018-09-05 15:42:26 -07009 VppIpTable, VppMplsTable, VppMplsLabel
Neale Ranns7c922dc2018-08-30 06:12:27 -070010from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
11
12from scapy.packet import Raw
13from scapy.layers.l2 import Ether
14from scapy.layers.inet import IP, UDP, ICMP
15from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16from scapy.contrib.mpls import MPLS
17
18
19def verify_filter(capture, sent):
20 if not len(capture) == len(sent):
21 # filter out any IPv6 RAs from the capture
22 for p in capture:
23 if p.haslayer(IPv6):
24 capture.remove(p)
25 return capture
26
27
28def verify_mpls_stack(tst, rx, mpls_labels):
29 # the rx'd packet has the MPLS label popped
30 eth = rx[Ether]
31 tst.assertEqual(eth.type, 0x8847)
32
33 rx_mpls = rx[MPLS]
34
35 for ii in range(len(mpls_labels)):
36 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
39
40 if ii == len(mpls_labels) - 1:
41 tst.assertEqual(rx_mpls.s, 1)
42 else:
43 # not end of stack
44 tst.assertEqual(rx_mpls.s, 0)
45 # pop the label to expose the next
46 rx_mpls = rx_mpls[MPLS].payload
47
48
49class TestSRMPLS(VppTestCase):
50 """ SR-MPLS Test Case """
51
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -070052 @classmethod
53 def setUpClass(cls):
54 super(TestSRMPLS, cls).setUpClass()
55
56 @classmethod
57 def tearDownClass(cls):
58 super(TestSRMPLS, cls).tearDownClass()
59
Neale Ranns7c922dc2018-08-30 06:12:27 -070060 def setUp(self):
61 super(TestSRMPLS, self).setUp()
62
63 # create 2 pg interfaces
64 self.create_pg_interfaces(range(4))
65
66 # setup both interfaces
67 # assign them different tables.
68 table_id = 0
69 self.tables = []
70
71 tbl = VppMplsTable(self, 0)
72 tbl.add_vpp_config()
73 self.tables.append(tbl)
74
75 for i in self.pg_interfaces:
76 i.admin_up()
77 i.config_ip4()
78 i.resolve_arp()
79 i.config_ip6()
80 i.resolve_ndp()
81 i.enable_mpls()
82
83 def tearDown(self):
84 for i in self.pg_interfaces:
85 i.unconfig_ip4()
86 i.unconfig_ip6()
Neale Ranns7c922dc2018-08-30 06:12:27 -070087 i.disable_mpls()
88 i.admin_down()
89 super(TestSRMPLS, self).tearDown()
90
91 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
92 self.reset_packet_infos()
93 pkts = []
94 for i in range(0, 257):
95 info = self.create_packet_info(src_if, src_if)
96 payload = self.info_to_payload(info)
97 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
98 IP(src=src_if.remote_ip4, dst=dst_ip,
99 ttl=ip_ttl, tos=ip_dscp) /
100 UDP(sport=1234, dport=1234) /
101 Raw(payload))
102 info.data = p.copy()
103 pkts.append(p)
104 return pkts
105
106 def verify_capture_labelled_ip4(self, src_if, capture, sent,
107 mpls_labels, ip_ttl=None):
108 try:
109 capture = verify_filter(capture, sent)
110
111 self.assertEqual(len(capture), len(sent))
112
113 for i in range(len(capture)):
114 tx = sent[i]
115 rx = capture[i]
116 tx_ip = tx[IP]
117 rx_ip = rx[IP]
118
119 verify_mpls_stack(self, rx, mpls_labels)
120
121 self.assertEqual(rx_ip.src, tx_ip.src)
122 self.assertEqual(rx_ip.dst, tx_ip.dst)
123 if not ip_ttl:
124 # IP processing post pop has decremented the TTL
125 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
126 else:
127 self.assertEqual(rx_ip.ttl, ip_ttl)
128
129 except:
130 raise
131
132 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
133 try:
134 capture = verify_filter(capture, sent)
135
136 self.assertEqual(len(capture), len(sent))
137
138 for i in range(len(capture)):
139 tx = sent[i]
140 rx = capture[i]
141 tx_ip = tx[IP]
142 rx_ip = rx[IP]
143
144 verify_mpls_stack(self, rx, mpls_labels)
145
146 self.assertEqual(rx_ip.src, tx_ip.src)
147 self.assertEqual(rx_ip.dst, tx_ip.dst)
148 # IP processing post pop has decremented the TTL
149 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
150
151 except:
152 raise
153
154 def test_sr_mpls(self):
155 """ SR MPLS """
156
157 #
158 # A simple MPLS xconnect - neos label in label out
159 #
160 route_32_eos = VppMplsRoute(self, 32, 0,
161 [VppRoutePath(self.pg0.remote_ip4,
162 self.pg0.sw_if_index,
163 labels=[VppMplsLabel(32)])])
164 route_32_eos.add_vpp_config()
165
166 #
167 # A binding SID with only one label
168 #
169 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
170
171 #
172 # A labeled IP route that resolves thru the binding SID
173 #
174 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
175 [VppRoutePath("0.0.0.0",
176 0xffffffff,
177 nh_via_label=999,
178 labels=[VppMplsLabel(55)])])
179 ip_10_0_0_1.add_vpp_config()
180
181 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
182 rx = self.send_and_expect(self.pg1, tx, self.pg0)
183 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
184 [VppMplsLabel(32),
185 VppMplsLabel(55)])
186
187 #
188 # An unlabeled IP route that resolves thru the binding SID
189 #
190 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
191 [VppRoutePath("0.0.0.0",
192 0xffffffff,
193 nh_via_label=999)])
194 ip_10_0_0_1.add_vpp_config()
195
196 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
197 rx = self.send_and_expect(self.pg1, tx, self.pg0)
198 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
199 [VppMplsLabel(32)])
200
201 self.vapi.sr_mpls_policy_del(999)
202
203 #
204 # this time the SID has many labels pushed
205 #
206 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
207
208 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
209 rx = self.send_and_expect(self.pg1, tx, self.pg0)
210 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
211 [VppMplsLabel(32),
212 VppMplsLabel(33),
213 VppMplsLabel(34),
214 VppMplsLabel(55)])
215 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
216 rx = self.send_and_expect(self.pg1, tx, self.pg0)
217 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
218 [VppMplsLabel(32),
219 VppMplsLabel(33),
220 VppMplsLabel(34)])
221
222 #
223 # Resolve an MPLS tunnel via the SID
224 #
225 mpls_tun = VppMPLSTunnelInterface(
226 self,
227 [VppRoutePath("0.0.0.0",
228 0xffffffff,
229 nh_via_label=999,
230 labels=[VppMplsLabel(44),
231 VppMplsLabel(46)])])
232 mpls_tun.add_vpp_config()
233 mpls_tun.admin_up()
234
235 #
236 # add an unlabelled route through the new tunnel
237 #
238 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
239 [VppRoutePath("0.0.0.0",
240 mpls_tun._sw_if_index)])
241 route_10_0_0_3.add_vpp_config()
242 self.logger.info(self.vapi.cli("sh mpls tun 0"))
243 self.logger.info(self.vapi.cli("sh adj 21"))
244
245 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
246 rx = self.send_and_expect(self.pg1, tx, self.pg0)
247 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
248 [VppMplsLabel(32),
249 VppMplsLabel(33),
250 VppMplsLabel(34),
251 VppMplsLabel(44),
252 VppMplsLabel(46)])
253
254 #
255 # add a labelled route through the new tunnel
256 #
257 route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
258 [VppRoutePath("0.0.0.0",
259 mpls_tun._sw_if_index,
260 labels=[VppMplsLabel(55)])])
261 route_10_0_0_3.add_vpp_config()
262
263 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
264 rx = self.send_and_expect(self.pg1, tx, self.pg0)
265 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
266 [VppMplsLabel(32),
267 VppMplsLabel(33),
268 VppMplsLabel(34),
269 VppMplsLabel(44),
270 VppMplsLabel(46),
271 VppMplsLabel(55)])
272
273 self.vapi.sr_mpls_policy_del(999)
274
275
276if __name__ == '__main__':
277 unittest.main(testRunner=VppTestRunner)