blob: fb8958c511bacec3bd748fb18f72b5bb0600e399 [file] [log] [blame]
Renato Botelho do Coutoead1e532019-10-31 13:31:07 -05001#!/usr/bin/env python3
Dave Barach4bf88882019-08-21 09:11:41 -04002
3import unittest
4
5from framework import VppTestCase, VppTestRunner
6from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
Dave Barach4bf88882019-08-21 09:11:41 -04007from ipaddress import *
8
9import scapy.compat
10from scapy.contrib.mpls import MPLS
11from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
12from scapy.layers.l2 import Ether
13from scapy.packet import Raw
14from scapy.layers.dns import DNSRR, DNS, DNSQR
15
16
17class TestDns(VppTestCase):
18 """ Dns Test Cases """
19
20 @classmethod
21 def setUpClass(cls):
22 super(TestDns, cls).setUpClass()
23
24 @classmethod
25 def tearDownClass(cls):
26 super(TestDns, cls).tearDownClass()
27
28 def setUp(self):
29 super(TestDns, self).setUp()
30
31 self.create_pg_interfaces(range(1))
32
33 for i in self.pg_interfaces:
34 i.admin_up()
35 i.config_ip4()
36 i.resolve_arp()
37
38 def tearDown(self):
39 super(TestDns, self).tearDown()
40
41 def create_stream(self, src_if):
42 """Create input packet stream for defined interface.
43
44 :param VppInterface src_if: Interface to create packet stream for.
45 """
46 good_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
47 IP(src=src_if.remote_ip4) /
48 UDP(sport=1234, dport=53) /
49 DNS(rd=1, qd=DNSQR(qname="bozo.clown.org")))
50
51 bad_request = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
52 IP(src=src_if.remote_ip4) /
53 UDP(sport=1234, dport=53) /
54 DNS(rd=1, qd=DNSQR(qname="no.clown.org")))
55 pkts = [good_request, bad_request]
56 return pkts
57
58 def verify_capture(self, dst_if, capture):
59 """Verify captured input packet stream for defined interface.
60
61 :param VppInterface dst_if: Interface to verify captured packet stream
62 for.
63 :param list capture: Captured packet stream.
64 """
65 self.logger.info("Verifying capture on interface %s" % dst_if.name)
66 for packet in capture:
67 dns = packet[DNS]
68 self.assertEqual(dns.an[0].rdata, '1.2.3.4')
69
70 def test_dns_unittest(self):
71 """ DNS Name Resolver Basic Functional Test """
72
73 # Set up an upstream name resolver. We won't actually go there
74 self.vapi.dns_name_server_add_del(
75 is_ip6=0, is_add=1, server_address=IPv4Address(u'8.8.8.8').packed)
76
77 # Enable name resolution
78 self.vapi.dns_enable_disable(enable=1)
79
80 # Manually add a static dns cache entry
81 self.logger.info(self.vapi.cli("dns cache add bozo.clown.org 1.2.3.4"))
82
83 # Test the binary API
snaramre26ea1462019-10-11 19:04:19 +000084 rv = self.vapi.dns_resolve_name(name=b'bozo.clown.org')
Dave Barach4bf88882019-08-21 09:11:41 -040085 self.assertEqual(rv.ip4_address, IPv4Address(u'1.2.3.4').packed)
86
87 # Configure 127.0.0.1/8 on the pg interface
88 self.vapi.sw_interface_add_del_address(
89 sw_if_index=self.pg0.sw_if_index,
Neale Rannsefd7bc22019-11-11 08:32:34 +000090 prefix="127.0.0.1/8")
Dave Barach4bf88882019-08-21 09:11:41 -040091
92 # Send a couple of DNS request packets, one for bozo.clown.org
93 # and one for no.clown.org which won't resolve
94
95 pkts = self.create_stream(self.pg0)
96 self.pg0.add_stream(pkts)
97 self.pg_enable_capture(self.pg_interfaces)
98
99 self.pg_start()
100 pkts = self.pg0.get_capture(1)
101 self.verify_capture(self.pg0, pkts)
102
103 # Make sure that the cache contents are correct
104 str = self.vapi.cli("show dns cache verbose")
105 self.assertIn('1.2.3.4', str)
106 self.assertIn('[P] no.clown.org:', str)
107
108if __name__ == '__main__':
109 unittest.main(testRunner=VppTestRunner)