blob: d78cb58be0bd8e7776d032c2e3bff322929c61dd [file] [log] [blame]
Matej Klotton8d8a1da2016-12-22 11:06:56 +01001#!/usr/bin/env python
2"""CRUD tests of APIs (Create, Read, Update, Delete) HLD:
3
4- interface up/down/add/delete - interface type:
5 - pg (TBD)
6 - loopback
7 - vhostuser (TBD)
8 - af_packet (TBD)
9 - netmap (TBD)
10 - tuntap (root privileges needed)
11 - vxlan (TBD)
12"""
13
14import unittest
15
16from scapy.layers.inet import IP, ICMP
17from scapy.layers.l2 import Ether
18
19from framework import VppTestCase, VppTestRunner
20
21
22class TestLoopbackInterfaceCRUD(VppTestCase):
23 """CRUD Loopback
24
25 """
26
27 @classmethod
28 def setUpClass(cls):
29 super(TestLoopbackInterfaceCRUD, cls).setUpClass()
30 try:
31 cls.create_pg_interfaces(range(1))
32 for i in cls.pg_interfaces:
33 i.config_ip4()
34 i.resolve_arp()
35 except:
36 cls.tearDownClass()
37 raise
38
39 @staticmethod
40 def create_icmp_stream(src_if, dst_ifs):
41 """
42
43 :param VppInterface src_if: Packets are send to this interface,
44 using this interfaces remote host.
45 :param list dst_ifs: IPv4 ICMP requests are send to interfaces
46 addresses.
47 :return: List of generated packets.
48 """
49 pkts = []
50 for i in dst_ifs:
51 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
52 IP(src=src_if.remote_ip4, dst=i.local_ip4) /
53 ICMP(id=i.sw_if_index, type='echo-request'))
54 pkts.append(p)
55 return pkts
56
57 def verify_icmp(self, capture, request_src_if, dst_ifs):
58 """
59
60 :param capture: Capture to verify.
61 :param VppInterface request_src_if: Interface where was send packets.
62 :param list dst_ifs: Interfaces where was generated IPv4 ICMP requests.
63 """
64 rcvd_icmp_pkts = []
65 for pkt in capture:
66 try:
67 ip = pkt[IP]
68 icmp = pkt[ICMP]
69 except IndexError:
70 pass
71 else:
72 info = (ip.src, ip.dst, icmp.type, icmp.id)
73 rcvd_icmp_pkts.append(info)
74
75 for i in dst_ifs:
76 # 0 - icmp echo response
77 info = (i.local_ip4, request_src_if.remote_ip4, 0, i.sw_if_index)
78 self.assertIn(info, rcvd_icmp_pkts)
79
80 def test_crud(self):
81 # create
Klement Sekerab9ef2732018-06-24 22:49:33 +020082 loopbacks = self.create_loopback_interfaces(20)
Matej Klotton8d8a1da2016-12-22 11:06:56 +010083 for i in loopbacks:
Ole Trøan3b0d7e42019-03-15 16:14:41 +000084 i.local_ip4_prefix_len = 32
Matej Klotton8d8a1da2016-12-22 11:06:56 +010085 i.config_ip4()
86 i.admin_up()
87
88 # read (check sw if dump, ip4 fib, ip6 fib)
89 if_dump = self.vapi.sw_interface_dump()
90 fib4_dump = self.vapi.ip_fib_dump()
91 for i in loopbacks:
92 self.assertTrue(i.is_interface_config_in_dump(if_dump))
93 self.assertTrue(i.is_ip4_entry_in_fib_dump(fib4_dump))
94
95 # check ping
96 stream = self.create_icmp_stream(self.pg0, loopbacks)
97 self.pg0.add_stream(stream)
98 self.pg_enable_capture(self.pg_interfaces)
99 self.pg_start()
100 capture = self.pg0.get_capture(expected_count=len(stream))
101
102 self.verify_icmp(capture, self.pg0, loopbacks)
103
104 # delete
105 for i in loopbacks:
106 i.remove_vpp_config()
107
108 # read (check not in sw if dump, ip4 fib, ip6 fib)
109 if_dump = self.vapi.sw_interface_dump()
110 fib4_dump = self.vapi.ip_fib_dump()
111 for i in loopbacks:
112 self.assertFalse(i.is_interface_config_in_dump(if_dump))
113 self.assertFalse(i.is_ip4_entry_in_fib_dump(fib4_dump))
114
115 # check not ping
116 stream = self.create_icmp_stream(self.pg0, loopbacks)
117 self.pg0.add_stream(stream)
118 self.pg_enable_capture(self.pg_interfaces)
119 self.pg_start()
120 self.pg0.assert_nothing_captured()
121
122 def test_down(self):
123 # create
Klement Sekerab9ef2732018-06-24 22:49:33 +0200124 loopbacks = self.create_loopback_interfaces(20)
Matej Klotton8d8a1da2016-12-22 11:06:56 +0100125 for i in loopbacks:
Ole Trøan3b0d7e42019-03-15 16:14:41 +0000126 i.local_ip4_prefix_len = 32
Matej Klotton8d8a1da2016-12-22 11:06:56 +0100127 i.config_ip4()
128 i.admin_up()
129
130 # disable
131 for i in loopbacks:
132 i.admin_down()
133 i.unconfig_ip4()
134
135 # read (check not in sw if dump, ip4 fib, ip6 fib)
136 if_dump = self.vapi.sw_interface_dump()
137 fib4_dump = self.vapi.ip_fib_dump()
138 for i in loopbacks:
139 self.assertTrue(i.is_interface_config_in_dump(if_dump))
140 self.assertFalse(i.is_ip4_entry_in_fib_dump(fib4_dump))
141
142 # check not ping
143 stream = self.create_icmp_stream(self.pg0, loopbacks)
144 self.pg0.add_stream(stream)
145 self.pg_enable_capture(self.pg_interfaces)
146 self.pg_start()
147 self.pg0.assert_nothing_captured()
148
149
150if __name__ == '__main__':
151 unittest.main(testRunner=VppTestRunner)