| from vpp_object import VppObject |
| from vpp_ip import INVALID_INDEX |
| from enum import Enum |
| |
| |
| class Dir(Enum): |
| RX = 0 |
| TX = 1 |
| |
| |
| class PolicerAction: |
| """sse2 qos action""" |
| |
| def __init__(self, type, dscp): |
| self.type = type |
| self.dscp = dscp |
| |
| def encode(self): |
| return {"type": self.type, "dscp": self.dscp} |
| |
| |
| class VppPolicer(VppObject): |
| """Policer""" |
| |
| def __init__( |
| self, |
| test, |
| name, |
| cir, |
| eir, |
| commited_burst, |
| excess_burst, |
| rate_type=0, |
| round_type=0, |
| type=0, |
| color_aware=False, |
| conform_action=PolicerAction(1, 0), |
| exceed_action=PolicerAction(0, 0), |
| violate_action=PolicerAction(0, 0), |
| ): |
| self._test = test |
| self.name = name |
| self.cir = cir |
| self.eir = eir |
| self.commited_burst = commited_burst |
| self.excess_burst = excess_burst |
| self.rate_type = rate_type |
| self.round_type = round_type |
| self.type = type |
| self.color_aware = color_aware |
| self.conform_action = conform_action |
| self.exceed_action = exceed_action |
| self.violate_action = violate_action |
| self._policer_index = INVALID_INDEX |
| |
| @property |
| def policer_index(self): |
| return self._policer_index |
| |
| def add_vpp_config(self): |
| r = self._test.vapi.policer_add_del( |
| name=self.name, |
| cir=self.cir, |
| eir=self.eir, |
| cb=self.commited_burst, |
| eb=self.excess_burst, |
| rate_type=self.rate_type, |
| round_type=self.round_type, |
| type=self.type, |
| color_aware=self.color_aware, |
| conform_action=self.conform_action.encode(), |
| exceed_action=self.exceed_action.encode(), |
| violate_action=self.violate_action.encode(), |
| ) |
| self._test.registry.register(self, self._test.logger) |
| self._policer_index = r.policer_index |
| return self |
| |
| def remove_vpp_config(self): |
| self._test.vapi.policer_add_del(is_add=False, name=self.name) |
| self._policer_index = INVALID_INDEX |
| |
| def bind_vpp_config(self, worker, bind): |
| self._test.vapi.policer_bind( |
| name=self.name, worker_index=worker, bind_enable=bind |
| ) |
| |
| def apply_vpp_config(self, if_index, dir: Dir, apply): |
| if dir == Dir.RX: |
| self._test.vapi.policer_input( |
| name=self.name, sw_if_index=if_index, apply=apply |
| ) |
| else: |
| self._test.vapi.policer_output( |
| name=self.name, sw_if_index=if_index, apply=apply |
| ) |
| |
| def query_vpp_config(self): |
| dump = self._test.vapi.policer_dump(match_name_valid=True, match_name=self.name) |
| for policer in dump: |
| if policer.name == self.name: |
| return True |
| return False |
| |
| def object_id(self): |
| return "policer-%s" % (self.name) |
| |
| def get_stats(self, worker=None): |
| conform = self._test.statistics.get_counter("/net/policer/conform") |
| exceed = self._test.statistics.get_counter("/net/policer/exceed") |
| violate = self._test.statistics.get_counter("/net/policer/violate") |
| |
| counters = {"conform": conform, "exceed": exceed, "violate": violate} |
| |
| total = {} |
| for name, c in counters.items(): |
| total[f"{name}_packets"] = 0 |
| total[f"{name}_bytes"] = 0 |
| for i in range(len(c)): |
| t = c[i] |
| if worker is not None and i != worker + 1: |
| continue |
| stat_index = self._policer_index |
| total[f"{name}_packets"] += t[stat_index]["packets"] |
| total[f"{name}_bytes"] += t[stat_index]["bytes"] |
| |
| return total |