blob: 7e729658b43e920c87ee019b32dd2db745975b61 [file] [log] [blame]
Neale Ranns7c922dc2018-08-30 06:12:27 -07001#!/usr/bin/env python
2
3import unittest
4import socket
5
6from framework import VppTestCase, VppTestRunner
Neale Rannsc0a93142018-09-05 15:42:26 -07007from vpp_ip import DpoProto
Neale Ranns7c922dc2018-08-30 06:12:27 -07008from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
Neale Rannsc0a93142018-09-05 15:42:26 -07009 VppIpTable, VppMplsTable, VppMplsLabel
Neale Ranns7c922dc2018-08-30 06:12:27 -070010from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
11
12from scapy.packet import Raw
13from scapy.layers.l2 import Ether
14from scapy.layers.inet import IP, UDP, ICMP
15from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16from scapy.contrib.mpls import MPLS
17
18
19def verify_filter(capture, sent):
20 if not len(capture) == len(sent):
21 # filter out any IPv6 RAs from the capture
22 for p in capture:
23 if p.haslayer(IPv6):
24 capture.remove(p)
25 return capture
26
27
28def verify_mpls_stack(tst, rx, mpls_labels):
29 # the rx'd packet has the MPLS label popped
30 eth = rx[Ether]
31 tst.assertEqual(eth.type, 0x8847)
32
33 rx_mpls = rx[MPLS]
34
35 for ii in range(len(mpls_labels)):
36 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
39
40 if ii == len(mpls_labels) - 1:
41 tst.assertEqual(rx_mpls.s, 1)
42 else:
43 # not end of stack
44 tst.assertEqual(rx_mpls.s, 0)
45 # pop the label to expose the next
46 rx_mpls = rx_mpls[MPLS].payload
47
48
49class TestSRMPLS(VppTestCase):
50 """ SR-MPLS Test Case """
51
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -070052 @classmethod
53 def setUpClass(cls):
54 super(TestSRMPLS, cls).setUpClass()
55
56 @classmethod
57 def tearDownClass(cls):
58 super(TestSRMPLS, cls).tearDownClass()
59
Neale Ranns7c922dc2018-08-30 06:12:27 -070060 def setUp(self):
61 super(TestSRMPLS, self).setUp()
62
63 # create 2 pg interfaces
64 self.create_pg_interfaces(range(4))
65
66 # setup both interfaces
67 # assign them different tables.
68 table_id = 0
69 self.tables = []
70
71 tbl = VppMplsTable(self, 0)
72 tbl.add_vpp_config()
73 self.tables.append(tbl)
74
75 for i in self.pg_interfaces:
76 i.admin_up()
77 i.config_ip4()
78 i.resolve_arp()
79 i.config_ip6()
80 i.resolve_ndp()
81 i.enable_mpls()
82
83 def tearDown(self):
84 for i in self.pg_interfaces:
85 i.unconfig_ip4()
86 i.unconfig_ip6()
87 i.ip6_disable()
88 i.disable_mpls()
89 i.admin_down()
90 super(TestSRMPLS, self).tearDown()
91
92 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
93 self.reset_packet_infos()
94 pkts = []
95 for i in range(0, 257):
96 info = self.create_packet_info(src_if, src_if)
97 payload = self.info_to_payload(info)
98 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
99 IP(src=src_if.remote_ip4, dst=dst_ip,
100 ttl=ip_ttl, tos=ip_dscp) /
101 UDP(sport=1234, dport=1234) /
102 Raw(payload))
103 info.data = p.copy()
104 pkts.append(p)
105 return pkts
106
107 def verify_capture_labelled_ip4(self, src_if, capture, sent,
108 mpls_labels, ip_ttl=None):
109 try:
110 capture = verify_filter(capture, sent)
111
112 self.assertEqual(len(capture), len(sent))
113
114 for i in range(len(capture)):
115 tx = sent[i]
116 rx = capture[i]
117 tx_ip = tx[IP]
118 rx_ip = rx[IP]
119
120 verify_mpls_stack(self, rx, mpls_labels)
121
122 self.assertEqual(rx_ip.src, tx_ip.src)
123 self.assertEqual(rx_ip.dst, tx_ip.dst)
124 if not ip_ttl:
125 # IP processing post pop has decremented the TTL
126 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
127 else:
128 self.assertEqual(rx_ip.ttl, ip_ttl)
129
130 except:
131 raise
132
133 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
134 try:
135 capture = verify_filter(capture, sent)
136
137 self.assertEqual(len(capture), len(sent))
138
139 for i in range(len(capture)):
140 tx = sent[i]
141 rx = capture[i]
142 tx_ip = tx[IP]
143 rx_ip = rx[IP]
144
145 verify_mpls_stack(self, rx, mpls_labels)
146
147 self.assertEqual(rx_ip.src, tx_ip.src)
148 self.assertEqual(rx_ip.dst, tx_ip.dst)
149 # IP processing post pop has decremented the TTL
150 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
151
152 except:
153 raise
154
155 def test_sr_mpls(self):
156 """ SR MPLS """
157
158 #
159 # A simple MPLS xconnect - neos label in label out
160 #
161 route_32_eos = VppMplsRoute(self, 32, 0,
162 [VppRoutePath(self.pg0.remote_ip4,
163 self.pg0.sw_if_index,
164 labels=[VppMplsLabel(32)])])
165 route_32_eos.add_vpp_config()
166
167 #
168 # A binding SID with only one label
169 #
170 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
171
172 #
173 # A labeled IP route that resolves thru the binding SID
174 #
175 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
176 [VppRoutePath("0.0.0.0",
177 0xffffffff,
178 nh_via_label=999,
179 labels=[VppMplsLabel(55)])])
180 ip_10_0_0_1.add_vpp_config()
181
182 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
183 rx = self.send_and_expect(self.pg1, tx, self.pg0)
184 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
185 [VppMplsLabel(32),
186 VppMplsLabel(55)])
187
188 #
189 # An unlabeled IP route that resolves thru the binding SID
190 #
191 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
192 [VppRoutePath("0.0.0.0",
193 0xffffffff,
194 nh_via_label=999)])
195 ip_10_0_0_1.add_vpp_config()
196
197 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
198 rx = self.send_and_expect(self.pg1, tx, self.pg0)
199 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
200 [VppMplsLabel(32)])
201
202 self.vapi.sr_mpls_policy_del(999)
203
204 #
205 # this time the SID has many labels pushed
206 #
207 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
208
209 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
210 rx = self.send_and_expect(self.pg1, tx, self.pg0)
211 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
212 [VppMplsLabel(32),
213 VppMplsLabel(33),
214 VppMplsLabel(34),
215 VppMplsLabel(55)])
216 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
217 rx = self.send_and_expect(self.pg1, tx, self.pg0)
218 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
219 [VppMplsLabel(32),
220 VppMplsLabel(33),
221 VppMplsLabel(34)])
222
223 #
224 # Resolve an MPLS tunnel via the SID
225 #
226 mpls_tun = VppMPLSTunnelInterface(
227 self,
228 [VppRoutePath("0.0.0.0",
229 0xffffffff,
230 nh_via_label=999,
231 labels=[VppMplsLabel(44),
232 VppMplsLabel(46)])])
233 mpls_tun.add_vpp_config()
234 mpls_tun.admin_up()
235
236 #
237 # add an unlabelled route through the new tunnel
238 #
239 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
240 [VppRoutePath("0.0.0.0",
241 mpls_tun._sw_if_index)])
242 route_10_0_0_3.add_vpp_config()
243 self.logger.info(self.vapi.cli("sh mpls tun 0"))
244 self.logger.info(self.vapi.cli("sh adj 21"))
245
246 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
247 rx = self.send_and_expect(self.pg1, tx, self.pg0)
248 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
249 [VppMplsLabel(32),
250 VppMplsLabel(33),
251 VppMplsLabel(34),
252 VppMplsLabel(44),
253 VppMplsLabel(46)])
254
255 #
256 # add a labelled route through the new tunnel
257 #
258 route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
259 [VppRoutePath("0.0.0.0",
260 mpls_tun._sw_if_index,
261 labels=[VppMplsLabel(55)])])
262 route_10_0_0_3.add_vpp_config()
263
264 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
265 rx = self.send_and_expect(self.pg1, tx, self.pg0)
266 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
267 [VppMplsLabel(32),
268 VppMplsLabel(33),
269 VppMplsLabel(34),
270 VppMplsLabel(44),
271 VppMplsLabel(46),
272 VppMplsLabel(55)])
273
274 self.vapi.sr_mpls_policy_del(999)
275
276
277if __name__ == '__main__':
278 unittest.main(testRunner=VppTestRunner)