blob: ded4a71fa4093162217496012e4bc8b45ccb099a [file] [log] [blame]
Neale Ranns7c922dc2018-08-30 06:12:27 -07001#!/usr/bin/env python
2
3import unittest
4import socket
5
6from framework import VppTestCase, VppTestRunner
7from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
8 VppMplsIpBind, VppIpMRoute, VppMRoutePath, \
9 MRouteItfFlags, MRouteEntryFlags, DpoProto, VppIpTable, VppMplsTable, \
10 VppMplsLabel, MplsLspMode
11from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
12
13from scapy.packet import Raw
14from scapy.layers.l2 import Ether
15from scapy.layers.inet import IP, UDP, ICMP
16from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
17from scapy.contrib.mpls import MPLS
18
19
20def verify_filter(capture, sent):
21 if not len(capture) == len(sent):
22 # filter out any IPv6 RAs from the capture
23 for p in capture:
24 if p.haslayer(IPv6):
25 capture.remove(p)
26 return capture
27
28
29def verify_mpls_stack(tst, rx, mpls_labels):
30 # the rx'd packet has the MPLS label popped
31 eth = rx[Ether]
32 tst.assertEqual(eth.type, 0x8847)
33
34 rx_mpls = rx[MPLS]
35
36 for ii in range(len(mpls_labels)):
37 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
38 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
39 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
40
41 if ii == len(mpls_labels) - 1:
42 tst.assertEqual(rx_mpls.s, 1)
43 else:
44 # not end of stack
45 tst.assertEqual(rx_mpls.s, 0)
46 # pop the label to expose the next
47 rx_mpls = rx_mpls[MPLS].payload
48
49
50class TestSRMPLS(VppTestCase):
51 """ SR-MPLS Test Case """
52
53 def setUp(self):
54 super(TestSRMPLS, self).setUp()
55
56 # create 2 pg interfaces
57 self.create_pg_interfaces(range(4))
58
59 # setup both interfaces
60 # assign them different tables.
61 table_id = 0
62 self.tables = []
63
64 tbl = VppMplsTable(self, 0)
65 tbl.add_vpp_config()
66 self.tables.append(tbl)
67
68 for i in self.pg_interfaces:
69 i.admin_up()
70 i.config_ip4()
71 i.resolve_arp()
72 i.config_ip6()
73 i.resolve_ndp()
74 i.enable_mpls()
75
76 def tearDown(self):
77 for i in self.pg_interfaces:
78 i.unconfig_ip4()
79 i.unconfig_ip6()
80 i.ip6_disable()
81 i.disable_mpls()
82 i.admin_down()
83 super(TestSRMPLS, self).tearDown()
84
85 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
86 self.reset_packet_infos()
87 pkts = []
88 for i in range(0, 257):
89 info = self.create_packet_info(src_if, src_if)
90 payload = self.info_to_payload(info)
91 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
92 IP(src=src_if.remote_ip4, dst=dst_ip,
93 ttl=ip_ttl, tos=ip_dscp) /
94 UDP(sport=1234, dport=1234) /
95 Raw(payload))
96 info.data = p.copy()
97 pkts.append(p)
98 return pkts
99
100 def verify_capture_labelled_ip4(self, src_if, capture, sent,
101 mpls_labels, ip_ttl=None):
102 try:
103 capture = verify_filter(capture, sent)
104
105 self.assertEqual(len(capture), len(sent))
106
107 for i in range(len(capture)):
108 tx = sent[i]
109 rx = capture[i]
110 tx_ip = tx[IP]
111 rx_ip = rx[IP]
112
113 verify_mpls_stack(self, rx, mpls_labels)
114
115 self.assertEqual(rx_ip.src, tx_ip.src)
116 self.assertEqual(rx_ip.dst, tx_ip.dst)
117 if not ip_ttl:
118 # IP processing post pop has decremented the TTL
119 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
120 else:
121 self.assertEqual(rx_ip.ttl, ip_ttl)
122
123 except:
124 raise
125
126 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
127 try:
128 capture = verify_filter(capture, sent)
129
130 self.assertEqual(len(capture), len(sent))
131
132 for i in range(len(capture)):
133 tx = sent[i]
134 rx = capture[i]
135 tx_ip = tx[IP]
136 rx_ip = rx[IP]
137
138 verify_mpls_stack(self, rx, mpls_labels)
139
140 self.assertEqual(rx_ip.src, tx_ip.src)
141 self.assertEqual(rx_ip.dst, tx_ip.dst)
142 # IP processing post pop has decremented the TTL
143 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
144
145 except:
146 raise
147
148 def test_sr_mpls(self):
149 """ SR MPLS """
150
151 #
152 # A simple MPLS xconnect - neos label in label out
153 #
154 route_32_eos = VppMplsRoute(self, 32, 0,
155 [VppRoutePath(self.pg0.remote_ip4,
156 self.pg0.sw_if_index,
157 labels=[VppMplsLabel(32)])])
158 route_32_eos.add_vpp_config()
159
160 #
161 # A binding SID with only one label
162 #
163 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
164
165 #
166 # A labeled IP route that resolves thru the binding SID
167 #
168 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
169 [VppRoutePath("0.0.0.0",
170 0xffffffff,
171 nh_via_label=999,
172 labels=[VppMplsLabel(55)])])
173 ip_10_0_0_1.add_vpp_config()
174
175 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
176 rx = self.send_and_expect(self.pg1, tx, self.pg0)
177 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
178 [VppMplsLabel(32),
179 VppMplsLabel(55)])
180
181 #
182 # An unlabeled IP route that resolves thru the binding SID
183 #
184 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
185 [VppRoutePath("0.0.0.0",
186 0xffffffff,
187 nh_via_label=999)])
188 ip_10_0_0_1.add_vpp_config()
189
190 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
191 rx = self.send_and_expect(self.pg1, tx, self.pg0)
192 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
193 [VppMplsLabel(32)])
194
195 self.vapi.sr_mpls_policy_del(999)
196
197 #
198 # this time the SID has many labels pushed
199 #
200 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
201
202 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
203 rx = self.send_and_expect(self.pg1, tx, self.pg0)
204 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
205 [VppMplsLabel(32),
206 VppMplsLabel(33),
207 VppMplsLabel(34),
208 VppMplsLabel(55)])
209 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
210 rx = self.send_and_expect(self.pg1, tx, self.pg0)
211 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
212 [VppMplsLabel(32),
213 VppMplsLabel(33),
214 VppMplsLabel(34)])
215
216 #
217 # Resolve an MPLS tunnel via the SID
218 #
219 mpls_tun = VppMPLSTunnelInterface(
220 self,
221 [VppRoutePath("0.0.0.0",
222 0xffffffff,
223 nh_via_label=999,
224 labels=[VppMplsLabel(44),
225 VppMplsLabel(46)])])
226 mpls_tun.add_vpp_config()
227 mpls_tun.admin_up()
228
229 #
230 # add an unlabelled route through the new tunnel
231 #
232 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
233 [VppRoutePath("0.0.0.0",
234 mpls_tun._sw_if_index)])
235 route_10_0_0_3.add_vpp_config()
236 self.logger.info(self.vapi.cli("sh mpls tun 0"))
237 self.logger.info(self.vapi.cli("sh adj 21"))
238
239 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
240 rx = self.send_and_expect(self.pg1, tx, self.pg0)
241 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
242 [VppMplsLabel(32),
243 VppMplsLabel(33),
244 VppMplsLabel(34),
245 VppMplsLabel(44),
246 VppMplsLabel(46)])
247
248 #
249 # add a labelled route through the new tunnel
250 #
251 route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
252 [VppRoutePath("0.0.0.0",
253 mpls_tun._sw_if_index,
254 labels=[VppMplsLabel(55)])])
255 route_10_0_0_3.add_vpp_config()
256
257 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
258 rx = self.send_and_expect(self.pg1, tx, self.pg0)
259 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
260 [VppMplsLabel(32),
261 VppMplsLabel(33),
262 VppMplsLabel(34),
263 VppMplsLabel(44),
264 VppMplsLabel(46),
265 VppMplsLabel(55)])
266
267 self.vapi.sr_mpls_policy_del(999)
268
269
270if __name__ == '__main__':
271 unittest.main(testRunner=VppTestRunner)