blob: 0a6e752515993fdf8cbb61213b1b206bca913566 [file] [log] [blame]
Renato Botelho do Coutoead1e532019-10-31 13:31:07 -05001#!/usr/bin/env python3
Paul Vinciguerra3bce8eb2018-11-24 21:46:05 -08002
3import abc
Filip Tehlar770e89e2017-01-31 10:39:16 +01004import unittest
5
Paul Vinciguerra4a144b42018-11-23 03:20:21 -08006from scapy.fields import BitField, ByteField, FlagsField, IntField
7from scapy.packet import bind_layers, Packet, Raw
Filip Tehlar770e89e2017-01-31 10:39:16 +01008from scapy.layers.inet import IP, UDP, Ether
Paul Vinciguerra4a144b42018-11-23 03:20:21 -08009from scapy.layers.inet6 import IPv6
Filip Tehlar770e89e2017-01-31 10:39:16 +010010
Filip Tehlar770e89e2017-01-31 10:39:16 +010011from framework import VppTestCase, VppTestRunner
Paul Vinciguerraa279d9c2019-02-28 09:00:09 -080012from lisp import VppLocalMapping, VppLispAdjacency, VppLispLocator, \
Jakub Grajciar58db6e12020-01-30 13:26:43 +010013 VppLispLocatorSet, VppRemoteMapping, LispRemoteLocator
Paul Vinciguerra1135f652020-12-04 17:50:26 -050014from util import ppp
Paul Vinciguerra4a144b42018-11-23 03:20:21 -080015
16# From py_lispnetworking.lisp.py: # GNU General Public License v2.0
17
18
19class LISP_GPE_Header(Packet):
20 name = "LISP GPE Header"
21 fields_desc = [
22 FlagsField("gpe_flags", None, 6, ["N", "L", "E", "V", "I", "P"]),
23 BitField("reserved", 0, 18),
24 ByteField("next_proto", 0),
25 IntField("iid", 0),
26 ]
27bind_layers(UDP, LISP_GPE_Header, dport=4341)
28bind_layers(UDP, LISP_GPE_Header, sport=4341)
29bind_layers(LISP_GPE_Header, IP, next_proto=1)
30bind_layers(LISP_GPE_Header, IPv6, next_proto=2)
31bind_layers(LISP_GPE_Header, Ether, next_proto=3)
Filip Tehlar770e89e2017-01-31 10:39:16 +010032
33
Paul Vinciguerra1135f652020-12-04 17:50:26 -050034class ForeignAddressFactory(object):
35 count = 0
36 prefix_len = 24
37 net_template = '10.10.10.{}'
38 net = net_template.format(0) + '/' + str(prefix_len)
39
40 def get_ip4(self):
41 if self.count > 255:
42 raise Exception("Network host address exhaustion")
43 self.count += 1
44 return self.net_template.format(self.count)
45
46
Paul Vinciguerra090096b2020-12-03 00:42:46 -050047class Driver(metaclass=abc.ABCMeta):
Filip Tehlar770e89e2017-01-31 10:39:16 +010048
49 config_order = ['locator-sets',
50 'locators',
51 'local-mappings',
52 'remote-mappings',
53 'adjacencies']
54
55 """ Basic class for data driven testing """
56 def __init__(self, test, test_cases):
57 self._test_cases = test_cases
58 self._test = test
59
60 @property
61 def test_cases(self):
62 return self._test_cases
63
64 @property
65 def test(self):
66 return self._test
67
68 def create_packet(self, src_if, dst_if, deid, payload=''):
69 """
70 Create IPv4 packet
71
72 param: src_if
73 param: dst_if
74 """
75 packet = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
76 IP(src=src_if.remote_ip4, dst=deid) /
77 Raw(payload))
78 return packet
79
Paul Vinciguerra3bce8eb2018-11-24 21:46:05 -080080 @abc.abstractmethod
Filip Tehlar770e89e2017-01-31 10:39:16 +010081 def run(self):
82 """ testing procedure """
83 pass
84
85
86class SimpleDriver(Driver):
87 """ Implements simple test procedure """
88 def __init__(self, test, test_cases):
89 super(SimpleDriver, self).__init__(test, test_cases)
90
91 def verify_capture(self, src_loc, dst_loc, capture):
92 """
93 Verify captured packet
94
95 :param src_loc: source locator address
96 :param dst_loc: destination locator address
97 :param capture: list of captured packets
98 """
99 self.test.assertEqual(len(capture), 1, "Unexpected number of "
100 "packets! Expected 1 but {} received"
101 .format(len(capture)))
102 packet = capture[0]
103 try:
104 ip_hdr = packet[IP]
105 # assert the values match
106 self.test.assertEqual(ip_hdr.src, src_loc, "IP source address")
107 self.test.assertEqual(ip_hdr.dst, dst_loc,
108 "IP destination address")
109 gpe_hdr = packet[LISP_GPE_Header]
110 self.test.assertEqual(gpe_hdr.next_proto, 1,
111 "next_proto is not ipv4!")
112 ih = gpe_hdr[IP]
113 self.test.assertEqual(ih.src, self.test.pg0.remote_ip4,
114 "unexpected source EID!")
115 self.test.assertEqual(ih.dst, self.test.deid_ip4,
116 "unexpected dest EID!")
117 except:
118 self.test.logger.error(ppp("Unexpected or invalid packet:",
119 packet))
120 raise
121
122 def configure_tc(self, tc):
123 for config_item in self.config_order:
124 for vpp_object in tc[config_item]:
125 vpp_object.add_vpp_config()
126
127 def run(self, dest):
128 """ Send traffic for each test case and verify that it
129 is encapsulated """
130 for tc in enumerate(self.test_cases):
131 self.test.logger.info('Running {}'.format(tc[1]['name']))
132 self.configure_tc(tc[1])
133
Filip Tehlar770e89e2017-01-31 10:39:16 +0100134 packet = self.create_packet(self.test.pg0, self.test.pg1, dest,
135 'data')
136 self.test.pg0.add_stream(packet)
137 self.test.pg0.enable_capture()
138 self.test.pg1.enable_capture()
139 self.test.pg_start()
140 capture = self.test.pg1.get_capture(1)
141 self.verify_capture(self.test.pg1.local_ip4,
142 self.test.pg1.remote_ip4, capture)
143 self.test.pg0.assert_nothing_captured()
144
145
146class TestLisp(VppTestCase):
147 """ Basic LISP test """
148
149 @classmethod
150 def setUpClass(cls):
151 super(TestLisp, cls).setUpClass()
152 cls.faf = ForeignAddressFactory()
153 cls.create_pg_interfaces(range(2)) # create pg0 and pg1
154 for i in cls.pg_interfaces:
155 i.admin_up() # put the interface upsrc_if
156 i.config_ip4() # configure IPv4 address on the interface
157 i.resolve_arp() # resolve ARP, so that we know VPP MAC
158
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700159 @classmethod
160 def tearDownClass(cls):
161 super(TestLisp, cls).tearDownClass()
162
Filip Tehlar770e89e2017-01-31 10:39:16 +0100163 def setUp(self):
164 super(TestLisp, self).setUp()
Jakub Grajciar58db6e12020-01-30 13:26:43 +0100165 self.vapi.lisp_enable_disable(is_enable=1)
Filip Tehlar770e89e2017-01-31 10:39:16 +0100166
167 def test_lisp_basic_encap(self):
168 """Test case for basic encapsulation"""
169
170 self.deid_ip4_net = self.faf.net
171 self.deid_ip4 = self.faf.get_ip4()
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800172 self.seid_ip4 = '{!s}/{!s}'.format(self.pg0.local_ip4, 32)
Jakub Grajciar58db6e12020-01-30 13:26:43 +0100173 self.rloc_ip4 = self.pg1.remote_ip4
Filip Tehlar770e89e2017-01-31 10:39:16 +0100174
175 test_cases = [
176 {
177 'name': 'basic ip4 over ip4',
Jakub Grajciar58db6e12020-01-30 13:26:43 +0100178 'locator-sets': [VppLispLocatorSet(self, 'ls-4o4')],
Filip Tehlar770e89e2017-01-31 10:39:16 +0100179 'locators': [
Jakub Grajciar58db6e12020-01-30 13:26:43 +0100180 VppLispLocator(self, self.pg1.sw_if_index, 'ls-4o4')
Filip Tehlar770e89e2017-01-31 10:39:16 +0100181 ],
182 'local-mappings': [
Jakub Grajciar58db6e12020-01-30 13:26:43 +0100183 VppLocalMapping(self, self.seid_ip4, 'ls-4o4')
Filip Tehlar770e89e2017-01-31 10:39:16 +0100184 ],
185 'remote-mappings': [
186 VppRemoteMapping(self, self.deid_ip4_net,
Jakub Grajciar58db6e12020-01-30 13:26:43 +0100187 [LispRemoteLocator(self.rloc_ip4)])
Filip Tehlar770e89e2017-01-31 10:39:16 +0100188 ],
189 'adjacencies': [
190 VppLispAdjacency(self, self.seid_ip4, self.deid_ip4_net)
191 ]
192 }
193 ]
194 self.test_driver = SimpleDriver(self, test_cases)
195 self.test_driver.run(self.deid_ip4)
196
197
Neale Rannsea93e482019-11-12 17:16:47 +0000198class TestLispUT(VppTestCase):
199 """ Lisp UT """
200
201 @classmethod
202 def setUpClass(cls):
203 super(TestLispUT, cls).setUpClass()
204
205 @classmethod
206 def tearDownClass(cls):
207 super(TestLispUT, cls).tearDownClass()
208
209 def test_fib(self):
210 """ LISP Unit Tests """
211 error = self.vapi.cli("test lisp cp")
212
213 if error:
214 self.logger.critical(error)
215 self.assertNotIn("Failed", error)
216
217
Filip Tehlar770e89e2017-01-31 10:39:16 +0100218if __name__ == '__main__':
219 unittest.main(testRunner=VppTestRunner)