| #!/usr/bin/env python3 |
| |
| # |
| # Copyright 2019-2020 Rubicon Communications, LLC (Netgate) |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| # |
| |
| import unittest |
| import time |
| import socket |
| from socket import inet_pton, inet_ntop |
| |
| from vpp_object import VppObject |
| from vpp_papi import VppEnum |
| |
| from scapy.packet import raw |
| from scapy.layers.l2 import Ether, ARP |
| from scapy.layers.inet import IP, ICMP, icmptypes |
| from scapy.layers.inet6 import IPv6, ipv6nh, IPv6ExtHdrHopByHop, \ |
| ICMPv6MLReport2, ICMPv6ND_NA, ICMPv6ND_NS, ICMPv6NDOptDstLLAddr, \ |
| ICMPv6NDOptSrcLLAddr, ICMPv6EchoRequest, ICMPv6EchoReply |
| from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr |
| from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3 |
| from scapy.utils6 import in6_getnsma, in6_getnsmac |
| from config import config |
| from framework import VppTestCase, VppTestRunner |
| from util import ip6_normalize |
| |
| VRRP_VR_FLAG_PREEMPT = 1 |
| VRRP_VR_FLAG_ACCEPT = 2 |
| VRRP_VR_FLAG_UNICAST = 4 |
| VRRP_VR_FLAG_IPV6 = 8 |
| |
| VRRP_VR_STATE_INIT = 0 |
| VRRP_VR_STATE_BACKUP = 1 |
| VRRP_VR_STATE_MASTER = 2 |
| VRRP_VR_STATE_INTF_DOWN = 3 |
| |
| VRRP_INDEX_INVALID = 0xffffffff |
| |
| |
| def is_non_arp(p): |
| """ Want to filter out advertisements, igmp, etc""" |
| if p.haslayer(ARP): |
| return False |
| |
| return True |
| |
| |
| def is_not_adv(p): |
| """ Filter out everything but advertisements. E.g. multicast RD/ND """ |
| if p.haslayer(VRRPv3): |
| return False |
| |
| return True |
| |
| |
| def is_not_echo_reply(p): |
| """ filter out advertisements and other while waiting for echo reply """ |
| if p.haslayer(IP) and p.haslayer(ICMP): |
| if icmptypes[p[ICMP].type] == "echo-reply": |
| return False |
| elif p.haslayer(IPv6) and p.haslayer(ICMPv6EchoReply): |
| return False |
| |
| return True |
| |
| |
| class VppVRRPVirtualRouter(VppObject): |
| |
| def __init__(self, |
| test, |
| intf, |
| vr_id, |
| prio=100, |
| intvl=100, |
| flags=VRRP_VR_FLAG_PREEMPT, |
| vips=None): |
| self._test = test |
| self._intf = intf |
| self._sw_if_index = self._intf.sw_if_index |
| self._vr_id = vr_id |
| self._prio = prio |
| self._intvl = intvl |
| self._flags = flags |
| if (flags & VRRP_VR_FLAG_IPV6): |
| self._is_ipv6 = 1 |
| self._adv_dest_mac = "33:33:00:00:00:12" |
| self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id |
| self._adv_dest_ip = "ff02::12" |
| self._vips = ([intf.local_ip6] if vips is None else vips) |
| else: |
| self._is_ipv6 = 0 |
| self._adv_dest_mac = "01:00:5e:00:00:12" |
| self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id |
| self._adv_dest_ip = "224.0.0.18" |
| self._vips = ([intf.local_ip4] if vips is None else vips) |
| self._tracked_ifs = [] |
| self._vrrp_index = VRRP_INDEX_INVALID |
| |
| def add_vpp_config(self): |
| self._test.vapi.vrrp_vr_add_del(is_add=1, |
| sw_if_index=self._intf.sw_if_index, |
| vr_id=self._vr_id, |
| priority=self._prio, |
| interval=self._intvl, |
| flags=self._flags, |
| n_addrs=len(self._vips), |
| addrs=self._vips) |
| |
| def update_vpp_config(self): |
| r = self._test.vapi.vrrp_vr_update(vrrp_index=self._vrrp_index, |
| sw_if_index=self._intf.sw_if_index, |
| vr_id=self._vr_id, |
| priority=self._prio, |
| interval=self._intvl, |
| flags=self._flags, |
| n_addrs=len(self._vips), |
| addrs=self._vips) |
| self._vrrp_index = r.vrrp_index |
| |
| def delete_vpp_config(self): |
| self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index) |
| |
| def query_vpp_config(self): |
| vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index) |
| for vr in vrs: |
| if vr.config.vr_id != self._vr_id: |
| continue |
| |
| is_ipv6 = (1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0) |
| if is_ipv6 != self._is_ipv6: |
| continue |
| |
| return vr |
| |
| return None |
| |
| def remove_vpp_config(self): |
| self._test.vapi.vrrp_vr_add_del(is_add=0, |
| sw_if_index=self._intf.sw_if_index, |
| vr_id=self._vr_id, |
| priority=self._prio, |
| interval=self._intvl, |
| flags=self._flags, |
| n_addrs=len(self._vips), |
| addrs=self._vips) |
| |
| def start_stop(self, is_start): |
| self._test.vapi.vrrp_vr_start_stop(is_start=is_start, |
| sw_if_index=self._intf.sw_if_index, |
| vr_id=self._vr_id, |
| is_ipv6=self._is_ipv6) |
| self._start_time = (time.time() if is_start else None) |
| |
| def add_del_tracked_interface(self, is_add, sw_if_index, prio): |
| args = { |
| 'sw_if_index': self._intf.sw_if_index, |
| 'is_ipv6': self._is_ipv6, |
| 'vr_id': self._vr_id, |
| 'is_add': is_add, |
| 'n_ifs': 1, |
| 'ifs': [{'sw_if_index': sw_if_index, 'priority': prio}] |
| } |
| self._test.vapi.vrrp_vr_track_if_add_del(**args) |
| self._tracked_ifs.append(args['ifs'][0]) |
| |
| def set_unicast_peers(self, addrs): |
| args = { |
| 'sw_if_index': self._intf.sw_if_index, |
| 'is_ipv6': self._is_ipv6, |
| 'vr_id': self._vr_id, |
| 'n_addrs': len(addrs), |
| 'addrs': addrs |
| } |
| self._test.vapi.vrrp_vr_set_peers(**args) |
| self._unicast_peers = addrs |
| |
| def start_time(self): |
| return self._start_time |
| |
| def virtual_mac(self): |
| return self._virtual_mac |
| |
| def virtual_ips(self): |
| return self._vips |
| |
| def adv_dest_mac(self): |
| return self._adv_dest_mac |
| |
| def adv_dest_ip(self): |
| return self._adv_dest_ip |
| |
| def priority(self): |
| return self._prio |
| |
| def vr_id(self): |
| return self._vr_id |
| |
| def adv_interval(self): |
| return self._intvl |
| |
| def interface(self): |
| return self._intf |
| |
| def assert_state_equals(self, state): |
| vr_details = self.query_vpp_config() |
| self._test.assertEqual(vr_details.runtime.state, state) |
| |
| def master_down_seconds(self): |
| vr_details = self.query_vpp_config() |
| return (vr_details.runtime.master_down_int * 0.01) |
| |
| def vrrp_adv_packet(self, prio=None, src_ip=None): |
| dst_ip = self._adv_dest_ip |
| if prio is None: |
| prio = self._prio |
| eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac) |
| vrrp = VRRPv3(vrid=self._vr_id, priority=prio, |
| ipcount=len(self._vips), adv=self._intvl) |
| if self._is_ipv6: |
| src_ip = (self._intf.local_ip6_ll if src_ip is None else src_ip) |
| ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255) |
| vrrp.addrlist = self._vips |
| else: |
| src_ip = (self._intf.local_ip4 if src_ip is None else src_ip) |
| ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0) |
| vrrp.addrlist = self._vips |
| |
| # Fill in default values & checksums |
| pkt = Ether(raw(eth / ip / vrrp)) |
| return pkt |
| |
| |
| class TestVRRP4(VppTestCase): |
| """ IPv4 VRRP Test Case """ |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestVRRP4, cls).setUpClass() |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestVRRP4, cls).tearDownClass() |
| |
| def setUp(self): |
| super(TestVRRP4, self).setUp() |
| |
| self.create_pg_interfaces(range(2)) |
| |
| for i in self.pg_interfaces: |
| i.admin_up() |
| i.config_ip4() |
| i.generate_remote_hosts(5) |
| i.configure_ipv4_neighbors() |
| |
| self._vrs = [] |
| self._default_flags = VRRP_VR_FLAG_PREEMPT |
| self._default_adv = 100 |
| |
| def tearDown(self): |
| for vr in self._vrs: |
| try: |
| vr_api = vr.query_vpp_config() |
| if vr_api.runtime.state != VRRP_VR_STATE_INIT: |
| vr.start_stop(is_start=0) |
| vr.remove_vpp_config() |
| except: |
| self.logger.error("Error cleaning up") |
| |
| for i in self.pg_interfaces: |
| i.admin_down() |
| i.unconfig_ip4() |
| i.unconfig_ip6() |
| |
| self._vrs = [] |
| |
| super(TestVRRP4, self).tearDown() |
| |
| def verify_vrrp4_igmp(self, pkt): |
| ip = pkt[IP] |
| self.assertEqual(ip.dst, "224.0.0.22") |
| self.assertEqual(ip.proto, 2) |
| |
| igmp = pkt[IGMPv3] |
| self.assertEqual(IGMPv3.igmpv3types[igmp.type], |
| "Version 3 Membership Report") |
| |
| igmpmr = pkt[IGMPv3mr] |
| self.assertEqual(igmpmr.numgrp, 1) |
| self.assertEqual(igmpmr.records[0].maddr, "224.0.0.18") |
| |
| def verify_vrrp4_garp(self, pkt, vip, vmac): |
| arp = pkt[ARP] |
| |
| # ARP "who-has" op == 1 |
| self.assertEqual(arp.op, 1) |
| self.assertEqual(arp.pdst, arp.psrc) |
| self.assertEqual(arp.pdst, vip) |
| self.assertEqual(arp.hwsrc, vmac) |
| |
| def verify_vrrp4_adv(self, rx_pkt, vr, prio=None): |
| vips = vr.virtual_ips() |
| eth = rx_pkt[Ether] |
| ip = rx_pkt[IP] |
| vrrp = rx_pkt[VRRPv3] |
| |
| pkt = vr.vrrp_adv_packet(prio=prio) |
| |
| # Source MAC is virtual MAC, destination is multicast MAC |
| self.assertEqual(eth.src, vr.virtual_mac()) |
| self.assertEqual(eth.dst, vr.adv_dest_mac()) |
| |
| self.assertEqual(ip.dst, "224.0.0.18") |
| self.assertEqual(ip.ttl, 255) |
| self.assertEqual(ip.proto, IPPROTO_VRRP) |
| |
| self.assertEqual(vrrp.version, 3) |
| self.assertEqual(vrrp.type, 1) |
| self.assertEqual(vrrp.vrid, vr.vr_id()) |
| if prio is None: |
| prio = vr.priority() |
| self.assertEqual(vrrp.priority, prio) |
| self.assertEqual(vrrp.ipcount, len(vips)) |
| self.assertEqual(vrrp.adv, vr.adv_interval()) |
| self.assertListEqual(vrrp.addrlist, vips) |
| |
| # VR with priority 255 owns the virtual address and should |
| # become master and start advertising immediately. |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_master_adv(self): |
| """ IPv4 Master VR advertises """ |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| prio = 255 |
| intvl = self._default_adv |
| vr = VppVRRPVirtualRouter(self, self.pg0, 100, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags) |
| |
| vr.add_vpp_config() |
| vr.start_stop(is_start=1) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| vr.start_stop(is_start=0) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| |
| pkts = self.pg0.get_capture(4) |
| |
| # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent |
| self.verify_vrrp4_igmp(pkts[0]) |
| self.verify_vrrp4_adv(pkts[1], vr, prio=prio) |
| self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac()) |
| # Master -> Init: Adv with priority 0 sent to force an election |
| self.verify_vrrp4_adv(pkts[3], vr, prio=0) |
| |
| vr.remove_vpp_config() |
| self._vrs = [] |
| |
| # Same as above but with the update API, and add a change |
| # of parameters to test that too |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_master_adv_update(self): |
| """ IPv4 Master VR adv + Update to Backup """ |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| prio = 255 |
| intvl = self._default_adv |
| vr = VppVRRPVirtualRouter(self, self.pg0, 100, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags) |
| |
| vr.update_vpp_config() |
| vr.start_stop(is_start=1) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| # Update VR with lower prio and larger interval |
| # we need to keep old VR for the adv checks |
| upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100, |
| prio=100, intvl=2*intvl, |
| flags=self._default_flags, |
| vips=[self.pg0.remote_ip4]) |
| upd_vr._vrrp_index = vr._vrrp_index |
| upd_vr.update_vpp_config() |
| start_time = time.time() |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| self._vrs = [upd_vr] |
| |
| pkts = self.pg0.get_capture(5) |
| # Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent |
| self.verify_vrrp4_igmp(pkts[0]) |
| self.verify_vrrp4_adv(pkts[1], vr, prio=prio) |
| self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac()) |
| # Master -> Init: Adv with priority 0 sent to force an election |
| self.verify_vrrp4_adv(pkts[3], vr, prio=0) |
| # Init -> Backup: An IGMP join should be sent |
| self.verify_vrrp4_igmp(pkts[4]) |
| |
| # send higher prio advertisements, should not receive any |
| end_time = start_time + 2 * upd_vr.master_down_seconds() |
| src_ip = self.pg0.remote_ip4 |
| pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)] |
| while time.time() < end_time: |
| self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl*0.01) |
| self.logger.info(self.vapi.cli("show trace")) |
| |
| upd_vr.start_stop(is_start=0) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| |
| # VR with priority < 255 enters backup state and does not advertise as |
| # long as it receives higher priority advertisements |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_backup_noadv(self): |
| """ IPv4 Backup VR does not advertise """ |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[self.pg0.remote_ip4]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| vr.start_stop(is_start=1) |
| |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| # watch for advertisements for 2x the master down preemption timeout |
| end_time = vr.start_time() + 2 * vr.master_down_seconds() |
| |
| # Init -> Backup: An IGMP join should be sent |
| pkts = self.pg0.get_capture(1) |
| self.verify_vrrp4_igmp(pkts[0]) |
| |
| # send higher prio advertisements, should not receive any |
| src_ip = self.pg0.remote_ip4 |
| pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)] |
| while time.time() < end_time: |
| self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) |
| self.logger.info(self.vapi.cli("show trace")) |
| |
| vr.start_stop(is_start=0) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| vr.remove_vpp_config() |
| self._vrs = [] |
| |
| def test_vrrp4_master_arp(self): |
| """ IPv4 Master VR replies to ARP """ |
| self.pg_start() |
| |
| # VR virtual IP is the default, which is the pg local IP |
| vr_id = 100 |
| prio = 255 |
| intvl = self._default_adv |
| vr = VppVRRPVirtualRouter(self, self.pg0, 100, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags) |
| self._vrs.append(vr) |
| |
| vr.add_vpp_config() |
| |
| # before the VR is up, ARP should resolve to interface MAC |
| self.pg0.resolve_arp() |
| self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac()) |
| |
| # start the VR, ARP should now resolve to virtual MAC |
| vr.start_stop(is_start=1) |
| self.pg0.resolve_arp() |
| self.assertEqual(self.pg0.local_mac, vr.virtual_mac()) |
| |
| # stop the VR, ARP should resolve to interface MAC again |
| vr.start_stop(is_start=0) |
| self.pg0.resolve_arp() |
| self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac()) |
| |
| vr.remove_vpp_config() |
| self._vrs = [] |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_backup_noarp(self): |
| """ IPv4 Backup VR ignores ARP """ |
| # We need an address for a virtual IP that is not the IP that |
| # ARP requests will originate from |
| |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| vip = self.pg0.remote_hosts[1].ip4 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / |
| ARP(op=ARP.who_has, pdst=vip, |
| psrc=self.pg0.remote_ip4, hwsrc=self.pg0.remote_mac)) |
| |
| # Before the VR is started make sure no reply to request for VIP |
| self.pg_start() |
| self.pg_enable_capture(self.pg_interfaces) |
| self.send_and_assert_no_replies(self.pg0, [arp_req], timeout=1) |
| |
| # VR should start in backup state and still should not reply to ARP |
| # send a higher priority adv to make sure it does not become master |
| adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip4) |
| vr.start_stop(is_start=1) |
| self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1) |
| |
| vr.start_stop(is_start=0) |
| vr.remove_vpp_config() |
| self._vrs = [] |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_election(self): |
| """ IPv4 Backup VR becomes master if no advertisements received """ |
| |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.remote_ip4 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| self.pg_start() |
| vr.start_stop(is_start=1) |
| |
| # VR should be in backup state after starting |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| end_time = vr.start_time() + vr.master_down_seconds() |
| |
| # should not receive adverts until timer expires & state transition |
| self.pg_enable_capture(self.pg_interfaces) |
| while (time.time() + intvl_s) < end_time: |
| time.sleep(intvl_s) |
| self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv) |
| |
| # VR should be in master state, should send an adv |
| self.pg0.enable_capture() |
| self.pg0.wait_for_packet(intvl_s, is_not_adv) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_backup_preempts(self): |
| """ IPv4 Backup VR preempts lower priority master """ |
| |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.remote_ip4 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| self.pg_start() |
| vr.start_stop(is_start=1) |
| |
| # VR should be in backup state after starting |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| end_time = vr.start_time() + vr.master_down_seconds() |
| |
| # send lower prio advertisements until timer expires |
| src_ip = self.pg0.remote_ip4 |
| pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)] |
| while time.time() + intvl_s < end_time: |
| self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) |
| self.logger.info(self.vapi.cli("show trace")) |
| |
| # when timer expires, VR should take over as master |
| self.pg0.enable_capture() |
| self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_master_preempted(self): |
| """ IPv4 Master VR preempted by higher priority backup """ |
| |
| # A prio 255 VR cannot be preempted so the prio has to be lower and |
| # we have to wait for it to take over |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| vip = self.pg0.remote_ip4 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # start VR |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| |
| # wait for VR to take over as master |
| end_time = vr.start_time() + vr.master_down_seconds() |
| sleep_s = end_time - time.time() |
| time.sleep(sleep_s) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| # Build advertisement packet and send it |
| pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip4)] |
| self.pg_send(self.pg0, pkts) |
| |
| # VR should be in backup state again |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_accept_mode_disabled(self): |
| """ IPv4 Master VR does not reply for VIP w/ accept mode off """ |
| |
| # accept mode only matters when prio < 255, so it will have to |
| # come up as a backup and take over as master after the timeout |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| vip = self.pg0.remote_hosts[4].ip4 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # start VR |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| |
| # wait for VR to take over as master |
| end_time = vr.start_time() + vr.master_down_seconds() |
| sleep_s = end_time - time.time() |
| time.sleep(sleep_s) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| # send an ICMP echo to the VR virtual IP address |
| echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / |
| IP(dst=vip, src=self.pg0.remote_ip4) / |
| ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request')) |
| self.pg_send(self.pg0, [echo]) |
| |
| # wait for an echo reply. none should be received |
| time.sleep(1) |
| self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_accept_mode_enabled(self): |
| """ IPv4 Master VR replies for VIP w/ accept mode on """ |
| |
| # A prio 255 VR cannot be preempted so the prio has to be lower and |
| # we have to wait for it to take over |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| vip = self.pg0.remote_hosts[4].ip4 |
| flags = (VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT) |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # start VR |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| |
| # wait for VR to take over as master |
| end_time = vr.start_time() + vr.master_down_seconds() |
| sleep_s = end_time - time.time() |
| time.sleep(sleep_s) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| # send an ICMP echo to the VR virtual IP address |
| echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / |
| IP(dst=vip, src=self.pg0.remote_ip4) / |
| ICMP(seq=1, id=self.pg0.sw_if_index, type='echo-request')) |
| self.pg_send(self.pg0, [echo]) |
| |
| # wait for an echo reply. |
| time.sleep(1) |
| rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1, |
| filter_out_fn=is_not_echo_reply) |
| |
| self.assertEqual(rx_pkts[0][IP].src, vip) |
| self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4) |
| self.assertEqual(icmptypes[rx_pkts[0][ICMP].type], "echo-reply") |
| self.assertEqual(rx_pkts[0][ICMP].seq, 1) |
| self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_intf_tracking(self): |
| """ IPv4 Master VR adjusts priority based on tracked interface """ |
| |
| vr_id = 100 |
| prio = 255 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.local_ip4 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # add pg1 as a tracked interface and start the VR |
| adjustment = 50 |
| adjusted_prio = prio - adjustment |
| vr.add_del_tracked_interface(is_add=1, |
| sw_if_index=self.pg1.sw_if_index, |
| prio=adjustment) |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| adv_configured = vr.vrrp_adv_packet(prio=prio) |
| adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio) |
| |
| # tracked intf is up -> advertised priority == configured priority |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_configured) |
| |
| # take down pg1, verify priority is now being adjusted |
| self.pg1.admin_down() |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_adjusted) |
| |
| # bring up pg1, verify priority now matches configured value |
| self.pg1.admin_up() |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_configured) |
| |
| # remove IP address from pg1, verify priority now being adjusted |
| self.pg1.unconfig_ip4() |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_adjusted) |
| |
| # add IP address to pg1, verify priority now matches configured value |
| self.pg1.config_ip4() |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_configured) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp4_master_adv_unicast(self): |
| """ IPv4 Master VR advertises (unicast) """ |
| |
| vr_id = 100 |
| prio = 255 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.local_ip4 |
| flags = (self._default_flags | VRRP_VR_FLAG_UNICAST) |
| unicast_peer = self.pg0.remote_hosts[4] |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| vr.set_unicast_peers([unicast_peer.ip4]) |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # Start VR, transition to master |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| |
| self.assertTrue(rx.haslayer(Ether)) |
| self.assertTrue(rx.haslayer(IP)) |
| self.assertTrue(rx.haslayer(VRRPv3)) |
| self.assertEqual(rx[Ether].src, self.pg0.local_mac) |
| self.assertEqual(rx[Ether].dst, unicast_peer.mac) |
| self.assertEqual(rx[IP].src, self.pg0.local_ip4) |
| self.assertEqual(rx[IP].dst, unicast_peer.ip4) |
| self.assertEqual(rx[VRRPv3].vrid, vr_id) |
| self.assertEqual(rx[VRRPv3].priority, prio) |
| self.assertEqual(rx[VRRPv3].ipcount, 1) |
| self.assertEqual(rx[VRRPv3].addrlist, [vip]) |
| |
| |
| class TestVRRP6(VppTestCase): |
| """ IPv6 VRRP Test Case """ |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestVRRP6, cls).setUpClass() |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestVRRP6, cls).tearDownClass() |
| |
| def setUp(self): |
| super(TestVRRP6, self).setUp() |
| |
| self.create_pg_interfaces(range(2)) |
| |
| for i in self.pg_interfaces: |
| i.admin_up() |
| i.config_ip6() |
| i.generate_remote_hosts(5) |
| i.configure_ipv6_neighbors() |
| |
| self._vrs = [] |
| self._default_flags = (VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT) |
| self._default_adv = 100 |
| |
| def tearDown(self): |
| for vr in self._vrs: |
| try: |
| vr_api = vr.query_vpp_config() |
| if vr_api.runtime.state != VRRP_VR_STATE_INIT: |
| vr.start_stop(is_start=0) |
| vr.remove_vpp_config() |
| except: |
| self.logger.error("Error cleaning up") |
| |
| for i in self.pg_interfaces: |
| i.admin_down() |
| i.unconfig_ip4() |
| i.unconfig_ip6() |
| |
| self._vrs = [] |
| |
| super(TestVRRP6, self).tearDown() |
| |
| def verify_vrrp6_mlr(self, pkt, vr): |
| ip6 = pkt[IPv6] |
| self.assertEqual(ip6.dst, "ff02::16") |
| self.assertEqual(ipv6nh[ip6.nh], "Hop-by-Hop Option Header") |
| |
| hbh = pkt[IPv6ExtHdrHopByHop] |
| self.assertEqual(ipv6nh[hbh.nh], "ICMPv6") |
| |
| self.assertTrue(pkt.haslayer(ICMPv6MLReport2)) |
| mlr = pkt[ICMPv6MLReport2] |
| # should contain mc addr records for: |
| # - VRRPv3 multicast addr |
| # - solicited node mc addr record for each VR virtual IPv6 address |
| vips = vr.virtual_ips() |
| self.assertEqual(mlr.records_number, len(vips) + 1) |
| self.assertEqual(mlr.records[0].dst, vr.adv_dest_ip()) |
| |
| def verify_vrrp6_adv(self, rx_pkt, vr, prio=None): |
| self.assertTrue(rx_pkt.haslayer(Ether)) |
| self.assertTrue(rx_pkt.haslayer(IPv6)) |
| self.assertTrue(rx_pkt.haslayer(VRRPv3)) |
| |
| # generate a packet for this VR and compare it to the one received |
| pkt = vr.vrrp_adv_packet(prio=prio) |
| self.assertTrue(rx_pkt.haslayer(Ether)) |
| self.assertTrue(rx_pkt.haslayer(IPv6)) |
| self.assertTrue(rx_pkt.haslayer(VRRPv3)) |
| |
| self.assertEqual(pkt, rx_pkt) |
| |
| def verify_vrrp6_gna(self, pkt, vr): |
| self.assertTrue(pkt.haslayer(Ether)) |
| self.assertTrue(pkt.haslayer(IPv6)) |
| self.assertTrue(pkt.haslayer(ICMPv6ND_NA)) |
| self.assertTrue(pkt.haslayer(ICMPv6NDOptDstLLAddr)) |
| |
| self.assertEqual(pkt[Ether].dst, "33:33:00:00:00:01") |
| |
| self.assertEqual(pkt[IPv6].dst, "ff02::1") |
| # convert addrs to packed format since string versions could differ |
| src_addr = inet_pton(socket.AF_INET6, pkt[IPv6].src) |
| vr_ll_addr = inet_pton(socket.AF_INET6, vr.interface().local_ip6_ll) |
| self.assertEqual(src_addr, vr_ll_addr) |
| |
| self.assertTrue(pkt[ICMPv6ND_NA].tgt in vr.virtual_ips()) |
| self.assertEqual(pkt[ICMPv6NDOptDstLLAddr].lladdr, vr.virtual_mac()) |
| |
| # VR with priority 255 owns the virtual address and should |
| # become master and start advertising immediately. |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_master_adv(self): |
| """ IPv6 Master VR advertises """ |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| prio = 255 |
| intvl = self._default_adv |
| vr = VppVRRPVirtualRouter(self, self.pg0, 100, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags) |
| self._vrs.append(vr) |
| |
| vr.add_vpp_config() |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| vr.start_stop(is_start=1) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| vr.start_stop(is_start=0) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| |
| pkts = self.pg0.get_capture(4, filter_out_fn=None) |
| |
| # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent |
| self.verify_vrrp6_mlr(pkts[0], vr) |
| self.verify_vrrp6_adv(pkts[1], vr, prio=prio) |
| self.verify_vrrp6_gna(pkts[2], vr) |
| # Master -> Init: Adv with priority 0 sent to force an election |
| self.verify_vrrp6_adv(pkts[3], vr, prio=0) |
| |
| vr.remove_vpp_config() |
| self._vrs = [] |
| |
| # Same as above but with the update API, and add a change |
| # of parameters to test that too |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_master_adv_update(self): |
| """ IPv6 Master VR adv + Update to Backup """ |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| prio = 255 |
| intvl = self._default_adv |
| vr = VppVRRPVirtualRouter(self, self.pg0, 100, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags) |
| |
| vr.update_vpp_config() |
| vr.start_stop(is_start=1) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| # Update VR with lower prio and larger interval |
| # we need to keep old VR for the adv checks |
| upd_vr = VppVRRPVirtualRouter(self, self.pg0, 100, |
| prio=100, intvl=2*intvl, |
| flags=self._default_flags, |
| vips=[self.pg0.remote_ip6]) |
| upd_vr._vrrp_index = vr._vrrp_index |
| upd_vr.update_vpp_config() |
| start_time = time.time() |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| self._vrs = [upd_vr] |
| |
| pkts = self.pg0.get_capture(5, filter_out_fn=None) |
| |
| # Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent |
| self.verify_vrrp6_mlr(pkts[0], vr) |
| self.verify_vrrp6_adv(pkts[1], vr, prio=prio) |
| self.verify_vrrp6_gna(pkts[2], vr) |
| # Master -> Init: Adv with priority 0 sent to force an election |
| self.verify_vrrp6_adv(pkts[3], vr, prio=0) |
| # Init -> Backup: A multicast listener report should be sent |
| # not actually verified in the test below, where I took this from |
| |
| # send higher prio advertisements, should not see VPP send any |
| src_ip = self.pg0.remote_ip6_ll |
| pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)] |
| self.logger.info(self.vapi.cli("show vlib graph")) |
| end_time = start_time + 2 * upd_vr.master_down_seconds() |
| while time.time() < end_time: |
| self.send_and_assert_no_replies( |
| self.pg0, pkts, timeout=0.01*upd_vr._intvl) |
| self.logger.info(self.vapi.cli("show trace")) |
| |
| vr.start_stop(is_start=0) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| |
| # VR with priority < 255 enters backup state and does not advertise as |
| # long as it receives higher priority advertisements |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_backup_noadv(self): |
| """ IPv6 Backup VR does not advertise """ |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[self.pg0.remote_ip6]) |
| vr.add_vpp_config() |
| self._vrs.append(vr) |
| |
| vr.start_stop(is_start=1) |
| |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| # watch for advertisements for 2x the master down preemption timeout |
| end_time = vr.start_time() + 2 * vr.master_down_seconds() |
| |
| # Init -> Backup: A multicast listener report should be sent |
| pkts = self.pg0.get_capture(1, filter_out_fn=None) |
| |
| # send higher prio advertisements, should not see VPP send any |
| src_ip = self.pg0.remote_ip6_ll |
| num_advs = 5 |
| pkts = [vr.vrrp_adv_packet(prio=prio+10, src_ip=src_ip)] |
| self.logger.info(self.vapi.cli("show vlib graph")) |
| while time.time() < end_time: |
| self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) |
| self.logger.info(self.vapi.cli("show trace")) |
| num_advs -= 1 |
| |
| vr.start_stop(is_start=0) |
| self.logger.info(self.vapi.cli("show vrrp vr")) |
| vr.remove_vpp_config() |
| self._vrs = [] |
| |
| def test_vrrp6_master_nd(self): |
| """ IPv6 Master VR replies to NDP """ |
| self.pg_start() |
| |
| # VR virtual IP is the default, which is the pg local IP |
| vr_id = 100 |
| prio = 255 |
| intvl = self._default_adv |
| vr = VppVRRPVirtualRouter(self, self.pg0, 100, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags) |
| vr.add_vpp_config() |
| self._vrs.append(vr) |
| |
| # before the VR is up, NDP should resolve to interface MAC |
| self.pg0.resolve_ndp() |
| self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac()) |
| |
| # start the VR, NDP should now resolve to virtual MAC |
| vr.start_stop(is_start=1) |
| self.pg0.resolve_ndp() |
| self.assertEqual(self.pg0.local_mac, vr.virtual_mac()) |
| |
| # stop the VR, ARP should resolve to interface MAC again |
| vr.start_stop(is_start=0) |
| self.pg0.resolve_ndp() |
| self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac()) |
| |
| vr.remove_vpp_config() |
| self._vrs = [] |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_backup_nond(self): |
| """ IPv6 Backup VR ignores NDP """ |
| # We need an address for a virtual IP that is not the IP that |
| # ARP requests will originate from |
| |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.remote_hosts[1].ip6 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| vr.add_vpp_config() |
| self._vrs.append(vr) |
| |
| nsma = in6_getnsma(inet_pton(socket.AF_INET6, vip)) |
| dmac = in6_getnsmac(nsma) |
| dst_ip = inet_ntop(socket.AF_INET6, nsma) |
| |
| ndp_req = (Ether(dst=dmac, src=self.pg0.remote_mac) / |
| IPv6(dst=dst_ip, src=self.pg0.remote_ip6) / |
| ICMPv6ND_NS(tgt=vip) / |
| ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)) |
| |
| # Before the VR is started make sure no reply to request for VIP |
| self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1) |
| |
| # VR should start in backup state and still should not reply to NDP |
| # send a higher priority adv to make sure it does not become master |
| adv = vr.vrrp_adv_packet(prio=prio+10, src_ip=self.pg0.remote_ip6) |
| pkts = [adv, ndp_req] |
| vr.start_stop(is_start=1) |
| self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) |
| |
| vr.start_stop(is_start=0) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_election(self): |
| """ IPv6 Backup VR becomes master if no advertisements received """ |
| |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.remote_ip6 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| self.pg_start() |
| vr.start_stop(is_start=1) |
| |
| # VR should be in backup state after starting |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| end_time = vr.start_time() + vr.master_down_seconds() |
| |
| # no advertisements should arrive until timer expires |
| self.pg0.enable_capture() |
| while (time.time() + intvl_s) < end_time: |
| time.sleep(intvl_s) |
| self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv) |
| |
| # VR should be in master state after timer expires |
| self.pg0.enable_capture() |
| self.pg0.wait_for_packet(intvl_s, is_not_adv) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_backup_preempts(self): |
| """ IPv6 Backup VR preempts lower priority master """ |
| |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.remote_ip6 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| self.pg_start() |
| vr.start_stop(is_start=1) |
| |
| # VR should be in backup state after starting |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| end_time = vr.start_time() + vr.master_down_seconds() |
| |
| # send lower prio advertisements until timer expires |
| src_ip = self.pg0.remote_ip6 |
| pkts = [vr.vrrp_adv_packet(prio=prio-10, src_ip=src_ip)] |
| while (time.time() + intvl_s) < end_time: |
| self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s) |
| self.logger.info(self.vapi.cli("show trace")) |
| |
| # when timer expires, VR should take over as master |
| self.pg0.enable_capture() |
| self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_master_preempted(self): |
| """ IPv6 Master VR preempted by higher priority backup """ |
| |
| # A prio 255 VR cannot be preempted so the prio has to be lower and |
| # we have to wait for it to take over |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| vip = self.pg0.remote_ip6 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # start VR |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| |
| # wait for VR to take over as master |
| end_time = vr.start_time() + vr.master_down_seconds() |
| sleep_s = end_time - time.time() |
| time.sleep(sleep_s) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| # Build advertisement packet and send it |
| pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip6)] |
| self.pg_send(self.pg0, pkts) |
| |
| # VR should be in backup state again |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_accept_mode_disabled(self): |
| """ IPv6 Master VR does not reply for VIP w/ accept mode off """ |
| |
| # accept mode only matters when prio < 255, so it will have to |
| # come up as a backup and take over as master after the timeout |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| vip = self.pg0.remote_hosts[4].ip6 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # start VR |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| |
| # wait for VR to take over as master |
| end_time = vr.start_time() + vr.master_down_seconds() |
| sleep_s = end_time - time.time() |
| time.sleep(sleep_s) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| # send an ICMPv6 echo to the VR virtual IP address |
| echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / |
| IPv6(dst=vip, src=self.pg0.remote_ip6) / |
| ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)) |
| self.pg_send(self.pg0, [echo]) |
| |
| # wait for an echo reply. none should be received |
| time.sleep(1) |
| self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_accept_mode_enabled(self): |
| """ IPv6 Master VR replies for VIP w/ accept mode on """ |
| |
| # A prio 255 VR cannot be preempted so the prio has to be lower and |
| # we have to wait for it to take over |
| vr_id = 100 |
| prio = 100 |
| intvl = self._default_adv |
| vip = self.pg0.remote_hosts[4].ip6 |
| flags = (self._default_flags | VRRP_VR_FLAG_ACCEPT) |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # start VR |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_BACKUP) |
| |
| # wait for VR to take over as master |
| end_time = vr.start_time() + vr.master_down_seconds() |
| sleep_s = end_time - time.time() |
| time.sleep(sleep_s) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| # send an ICMP echo to the VR virtual IP address |
| echo = (Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac) / |
| IPv6(dst=vip, src=self.pg0.remote_ip6) / |
| ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)) |
| self.pg_send(self.pg0, [echo]) |
| |
| # wait for an echo reply. |
| time.sleep(1) |
| rx_pkts = self.pg0.get_capture(expected_count=1, timeout=1, |
| filter_out_fn=is_not_echo_reply) |
| |
| self.assertEqual(rx_pkts[0][IPv6].src, vip) |
| self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6) |
| self.assertEqual(rx_pkts[0][ICMPv6EchoReply].seq, 1) |
| self.assertEqual(rx_pkts[0][ICMPv6EchoReply].id, self.pg0.sw_if_index) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_intf_tracking(self): |
| """ IPv6 Master VR adjusts priority based on tracked interface """ |
| |
| vr_id = 100 |
| prio = 255 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.local_ip6 |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=self._default_flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # add pg1 as a tracked interface and start the VR |
| adjustment = 50 |
| adjusted_prio = prio - adjustment |
| vr.add_del_tracked_interface(is_add=1, |
| sw_if_index=self.pg1.sw_if_index, |
| prio=adjustment) |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| adv_configured = vr.vrrp_adv_packet(prio=prio) |
| adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio) |
| |
| # tracked intf is up -> advertised priority == configured priority |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_configured) |
| |
| # take down pg1, verify priority is now being adjusted |
| self.pg1.admin_down() |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_adjusted) |
| |
| # bring up pg1, verify priority now matches configured value |
| self.pg1.admin_up() |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_configured) |
| |
| # remove IP address from pg1, verify priority now being adjusted |
| self.pg1.unconfig_ip6() |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_adjusted) |
| |
| # add IP address to pg1, verify priority now matches configured value |
| self.pg1.config_ip6() |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| self.assertEqual(rx, adv_configured) |
| |
| @unittest.skipUnless(config.extended, "part of extended tests") |
| def test_vrrp6_master_adv_unicast(self): |
| """ IPv6 Master VR advertises (unicast) """ |
| |
| vr_id = 100 |
| prio = 255 |
| intvl = self._default_adv |
| intvl_s = intvl * 0.01 |
| vip = self.pg0.local_ip6 |
| flags = (self._default_flags | VRRP_VR_FLAG_UNICAST) |
| unicast_peer = self.pg0.remote_hosts[4] |
| vr = VppVRRPVirtualRouter(self, self.pg0, vr_id, |
| prio=prio, intvl=intvl, |
| flags=flags, |
| vips=[vip]) |
| self._vrs.append(vr) |
| vr.add_vpp_config() |
| vr.set_unicast_peers([unicast_peer.ip6]) |
| |
| # After adding the VR, it should be in the init state |
| vr.assert_state_equals(VRRP_VR_STATE_INIT) |
| |
| # Start VR, transition to master |
| vr.start_stop(is_start=1) |
| vr.assert_state_equals(VRRP_VR_STATE_MASTER) |
| |
| self.pg0.enable_capture() |
| rx = self.pg0.wait_for_packet(timeout=intvl_s, |
| filter_out_fn=is_not_adv) |
| |
| self.assertTrue(rx.haslayer(Ether)) |
| self.assertTrue(rx.haslayer(IPv6)) |
| self.assertTrue(rx.haslayer(VRRPv3)) |
| self.assertEqual(rx[Ether].src, self.pg0.local_mac) |
| self.assertEqual(rx[Ether].dst, unicast_peer.mac) |
| self.assertEqual(ip6_normalize(rx[IPv6].src), |
| ip6_normalize(self.pg0.local_ip6_ll)) |
| self.assertEqual(ip6_normalize(rx[IPv6].dst), |
| ip6_normalize(unicast_peer.ip6)) |
| self.assertEqual(rx[VRRPv3].vrid, vr_id) |
| self.assertEqual(rx[VRRPv3].priority, prio) |
| self.assertEqual(rx[VRRPv3].ipcount, 1) |
| self.assertEqual(rx[VRRPv3].addrlist, [vip]) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner=VppTestRunner) |