blob: 756405ab6d7b2444917ebedfd1502e6dad669f3f [file] [log] [blame]
Neale Ranns7c922dc2018-08-30 06:12:27 -07001#!/usr/bin/env python
2
3import unittest
4import socket
5
6from framework import VppTestCase, VppTestRunner
Neale Rannsc0a93142018-09-05 15:42:26 -07007from vpp_ip import DpoProto
Neale Ranns7c922dc2018-08-30 06:12:27 -07008from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
Neale Rannsc0a93142018-09-05 15:42:26 -07009 VppIpTable, VppMplsTable, VppMplsLabel
Neale Ranns7c922dc2018-08-30 06:12:27 -070010from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
11
12from scapy.packet import Raw
13from scapy.layers.l2 import Ether
14from scapy.layers.inet import IP, UDP, ICMP
15from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16from scapy.contrib.mpls import MPLS
17
18
19def verify_filter(capture, sent):
20 if not len(capture) == len(sent):
21 # filter out any IPv6 RAs from the capture
22 for p in capture:
23 if p.haslayer(IPv6):
24 capture.remove(p)
25 return capture
26
27
28def verify_mpls_stack(tst, rx, mpls_labels):
29 # the rx'd packet has the MPLS label popped
30 eth = rx[Ether]
31 tst.assertEqual(eth.type, 0x8847)
32
33 rx_mpls = rx[MPLS]
34
35 for ii in range(len(mpls_labels)):
36 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
39
40 if ii == len(mpls_labels) - 1:
41 tst.assertEqual(rx_mpls.s, 1)
42 else:
43 # not end of stack
44 tst.assertEqual(rx_mpls.s, 0)
45 # pop the label to expose the next
46 rx_mpls = rx_mpls[MPLS].payload
47
48
49class TestSRMPLS(VppTestCase):
50 """ SR-MPLS Test Case """
51
52 def setUp(self):
53 super(TestSRMPLS, self).setUp()
54
55 # create 2 pg interfaces
56 self.create_pg_interfaces(range(4))
57
58 # setup both interfaces
59 # assign them different tables.
60 table_id = 0
61 self.tables = []
62
63 tbl = VppMplsTable(self, 0)
64 tbl.add_vpp_config()
65 self.tables.append(tbl)
66
67 for i in self.pg_interfaces:
68 i.admin_up()
69 i.config_ip4()
70 i.resolve_arp()
71 i.config_ip6()
72 i.resolve_ndp()
73 i.enable_mpls()
74
75 def tearDown(self):
76 for i in self.pg_interfaces:
77 i.unconfig_ip4()
78 i.unconfig_ip6()
79 i.ip6_disable()
80 i.disable_mpls()
81 i.admin_down()
82 super(TestSRMPLS, self).tearDown()
83
84 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
85 self.reset_packet_infos()
86 pkts = []
87 for i in range(0, 257):
88 info = self.create_packet_info(src_if, src_if)
89 payload = self.info_to_payload(info)
90 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
91 IP(src=src_if.remote_ip4, dst=dst_ip,
92 ttl=ip_ttl, tos=ip_dscp) /
93 UDP(sport=1234, dport=1234) /
94 Raw(payload))
95 info.data = p.copy()
96 pkts.append(p)
97 return pkts
98
99 def verify_capture_labelled_ip4(self, src_if, capture, sent,
100 mpls_labels, ip_ttl=None):
101 try:
102 capture = verify_filter(capture, sent)
103
104 self.assertEqual(len(capture), len(sent))
105
106 for i in range(len(capture)):
107 tx = sent[i]
108 rx = capture[i]
109 tx_ip = tx[IP]
110 rx_ip = rx[IP]
111
112 verify_mpls_stack(self, rx, mpls_labels)
113
114 self.assertEqual(rx_ip.src, tx_ip.src)
115 self.assertEqual(rx_ip.dst, tx_ip.dst)
116 if not ip_ttl:
117 # IP processing post pop has decremented the TTL
118 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
119 else:
120 self.assertEqual(rx_ip.ttl, ip_ttl)
121
122 except:
123 raise
124
125 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
126 try:
127 capture = verify_filter(capture, sent)
128
129 self.assertEqual(len(capture), len(sent))
130
131 for i in range(len(capture)):
132 tx = sent[i]
133 rx = capture[i]
134 tx_ip = tx[IP]
135 rx_ip = rx[IP]
136
137 verify_mpls_stack(self, rx, mpls_labels)
138
139 self.assertEqual(rx_ip.src, tx_ip.src)
140 self.assertEqual(rx_ip.dst, tx_ip.dst)
141 # IP processing post pop has decremented the TTL
142 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
143
144 except:
145 raise
146
147 def test_sr_mpls(self):
148 """ SR MPLS """
149
150 #
151 # A simple MPLS xconnect - neos label in label out
152 #
153 route_32_eos = VppMplsRoute(self, 32, 0,
154 [VppRoutePath(self.pg0.remote_ip4,
155 self.pg0.sw_if_index,
156 labels=[VppMplsLabel(32)])])
157 route_32_eos.add_vpp_config()
158
159 #
160 # A binding SID with only one label
161 #
162 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
163
164 #
165 # A labeled IP route that resolves thru the binding SID
166 #
167 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
168 [VppRoutePath("0.0.0.0",
169 0xffffffff,
170 nh_via_label=999,
171 labels=[VppMplsLabel(55)])])
172 ip_10_0_0_1.add_vpp_config()
173
174 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
175 rx = self.send_and_expect(self.pg1, tx, self.pg0)
176 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
177 [VppMplsLabel(32),
178 VppMplsLabel(55)])
179
180 #
181 # An unlabeled IP route that resolves thru the binding SID
182 #
183 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
184 [VppRoutePath("0.0.0.0",
185 0xffffffff,
186 nh_via_label=999)])
187 ip_10_0_0_1.add_vpp_config()
188
189 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
190 rx = self.send_and_expect(self.pg1, tx, self.pg0)
191 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
192 [VppMplsLabel(32)])
193
194 self.vapi.sr_mpls_policy_del(999)
195
196 #
197 # this time the SID has many labels pushed
198 #
199 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
200
201 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
202 rx = self.send_and_expect(self.pg1, tx, self.pg0)
203 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
204 [VppMplsLabel(32),
205 VppMplsLabel(33),
206 VppMplsLabel(34),
207 VppMplsLabel(55)])
208 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
209 rx = self.send_and_expect(self.pg1, tx, self.pg0)
210 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
211 [VppMplsLabel(32),
212 VppMplsLabel(33),
213 VppMplsLabel(34)])
214
215 #
216 # Resolve an MPLS tunnel via the SID
217 #
218 mpls_tun = VppMPLSTunnelInterface(
219 self,
220 [VppRoutePath("0.0.0.0",
221 0xffffffff,
222 nh_via_label=999,
223 labels=[VppMplsLabel(44),
224 VppMplsLabel(46)])])
225 mpls_tun.add_vpp_config()
226 mpls_tun.admin_up()
227
228 #
229 # add an unlabelled route through the new tunnel
230 #
231 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
232 [VppRoutePath("0.0.0.0",
233 mpls_tun._sw_if_index)])
234 route_10_0_0_3.add_vpp_config()
235 self.logger.info(self.vapi.cli("sh mpls tun 0"))
236 self.logger.info(self.vapi.cli("sh adj 21"))
237
238 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
239 rx = self.send_and_expect(self.pg1, tx, self.pg0)
240 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
241 [VppMplsLabel(32),
242 VppMplsLabel(33),
243 VppMplsLabel(34),
244 VppMplsLabel(44),
245 VppMplsLabel(46)])
246
247 #
248 # add a labelled route through the new tunnel
249 #
250 route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
251 [VppRoutePath("0.0.0.0",
252 mpls_tun._sw_if_index,
253 labels=[VppMplsLabel(55)])])
254 route_10_0_0_3.add_vpp_config()
255
256 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
257 rx = self.send_and_expect(self.pg1, tx, self.pg0)
258 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
259 [VppMplsLabel(32),
260 VppMplsLabel(33),
261 VppMplsLabel(34),
262 VppMplsLabel(44),
263 VppMplsLabel(46),
264 VppMplsLabel(55)])
265
266 self.vapi.sr_mpls_policy_del(999)
267
268
269if __name__ == '__main__':
270 unittest.main(testRunner=VppTestRunner)