| # ============LICENSE_START======================================================= |
| # Copyright (c) 2018-2022 AT&T Intellectual Property. All rights reserved. |
| # ================================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END========================================================= |
| |
| import copy |
| import datetime |
| import os |
| import unittest |
| from pathlib import Path |
| import time |
| from unittest.mock import patch, Mock |
| |
| import requests |
| |
| import snmptrapd |
| |
| import trapd_settings as tds |
| import trapd_stormwatch_settings as sws |
| import trapd_stormwatch as sw |
| import trapd_http_session |
| import trapd_runtime_pid |
| import trapd_io |
| import trapd_get_cbs_config |
| |
| from pysnmp.hlapi import * |
| from pysnmp import debug |
| |
| |
| class test_snmptrapd(unittest.TestCase): |
| """ |
| Test the save_pid mod |
| """ |
| |
| class WriteThrows(): |
| def write(): |
| raise RuntimeError("write() throws") |
| |
| |
| @classmethod |
| def setUpClass(cls): |
| |
| # init vars |
| tds.init() |
| sw.sw_init() |
| |
| # fmt: off |
| test_snmptrapd.pytest_empty_data = "{}" |
| test_snmptrapd.pytest_json_data = ( |
| '{ "snmptrapd": { ' |
| ' "version": "2.0.3", ' |
| ' "title": "ONAP SNMP Trap Receiver" }, ' |
| ' "protocols": { ' |
| ' "transport": "udp", ' |
| ' "ipv4_interface": "0.0.0.0", ' |
| ' "ipv4_port": 6162, ' |
| ' "ipv6_interface": "::1", ' |
| ' "ipv6_port": 6162 }, ' |
| ' "cache": { ' |
| ' "dns_cache_ttl_seconds": 60 }, ' |
| ' "publisher": { ' |
| ' "http_timeout_milliseconds": 1500, ' |
| ' "http_retries": 3, ' |
| ' "http_milliseconds_between_retries": 750, ' |
| ' "http_primary_publisher": "true", ' |
| ' "http_peer_publisher": "unavailable", ' |
| ' "max_traps_between_publishes": 10, ' |
| ' "max_milliseconds_between_publishes": 10000 }, ' |
| ' "streams_publishes": { ' |
| ' "sec_fault_unsecure": { ' |
| ' "type": "message_router", ' |
| ' "aaf_password": null, ' |
| ' "dmaap_info": { ' |
| ' "location": "mtl5", ' |
| ' "client_id": null, ' |
| ' "client_role": null, ' |
| ' "topic_url": "http://uebsb91kcdc.it.att.com:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' |
| ' "aaf_username": null } }, ' |
| ' "files": { ' |
| ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' |
| ' "log_dir": "logs", ' |
| ' "data_dir": "data", ' |
| ' "pid_dir": "tmp", ' |
| ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' |
| ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' |
| ' "traps_stats_log": "snmptrapd_stats.csv", ' |
| ' "perm_status_file": "snmptrapd_status.log", ' |
| ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' |
| ' "eelf_error": "error.log", ' |
| ' "eelf_debug": "debug.log", ' |
| ' "eelf_audit": "audit.log", ' |
| ' "eelf_metrics": "metrics.log", ' |
| ' "roll_frequency": "day", ' |
| ' "minimum_severity_to_log": 2 }, ' |
| ' "trap_config": { ' |
| ' "sw_interval_in_seconds": 60, ' |
| ' "notify_oids": { ' |
| ' ".1.3.6.1.4.1.9.0.1": { ' |
| ' "sw_high_water_in_interval": 102, ' |
| ' "sw_low_water_in_interval": 7, ' |
| ' "category": "logonly" }, ' |
| ' ".1.3.6.1.4.1.9.0.2": { ' |
| ' "sw_high_water_in_interval": 101, ' |
| ' "sw_low_water_in_interval": 7, ' |
| ' "category": "logonly" }, ' |
| ' ".1.3.6.1.4.1.9.0.3": { ' |
| ' "sw_high_water_in_interval": 102, ' |
| ' "sw_low_water_in_interval": 7, ' |
| ' "category": "logonly" }, ' |
| ' ".1.3.6.1.4.1.9.0.4": { ' |
| ' "sw_high_water_in_interval": 10, ' |
| ' "sw_low_water_in_interval": 3, ' |
| ' "category": "logonly" } } }, ' |
| ' "snmpv3_config": { ' |
| ' "usm_users": [ { ' |
| ' "user": "usr-sha-aes256", ' |
| ' "engineId": "8000000001020304", ' |
| ' "usmHMACSHAAuth": "authkey1", ' |
| ' "usmAesCfb256": "privkey1" }, ' |
| ' { "user": "user1", ' |
| ' "engineId": "8000000000000001", ' |
| ' "usmHMACMD5Auth": "authkey1", ' |
| ' "usmDESPriv": "privkey1" }, ' |
| ' { "user": "user2", ' |
| ' "engineId": "8000000000000002", ' |
| ' "usmHMACSHAAuth": "authkey2", ' |
| ' "usmAesCfb128": "privkey2" }, ' |
| ' { "user": "user3", ' |
| ' "engineId": "8000000000000003", ' |
| ' "usmHMACSHAAuth": "authkey3", ' |
| ' "usmAesCfb256": "privkey3" } ' |
| ' ] } }' |
| ) |
| # fmt: off |
| |
| test_snmptrapd.trap_dict_info = { |
| "uuid": "06f6e91c-3236-11e8-9953-005056865aac", |
| "agent address": "1.2.3.4", |
| "agent name": "test-agent.nodomain.com", |
| "cambria.partition": "test-agent.nodomain.com", |
| "community": "", |
| "community len": 0, |
| "epoch_serno": 15222068260000, |
| "protocol version": "v2c", |
| "time received": 1522206826.2938566, |
| "trap category": "ONAP-COLLECTOR-SNMPTRAP", |
| "sysUptime": "218567736", |
| "notify OID": "1.3.6.1.4.1.9999.9.9.999", |
| "notify OID len": 10, |
| } |
| |
| snmptrap_dir = "/tmp/opt/app/snmptrap" |
| try: |
| Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) |
| Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) |
| Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) |
| except Exception as e: |
| print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) |
| sys.exit(1) |
| |
| # create copy of snmptrapd.json for pytest |
| test_snmptrapd.pytest_json_config = "/tmp/opt/app/snmptrap/etc/snmptrapd.json" |
| with open(test_snmptrapd.pytest_json_config, "w") as outfile: |
| outfile.write(test_snmptrapd.pytest_json_data) |
| |
| test_snmptrapd.pytest_empty_config = "/tmp/opt/app/snmptrap/etc/empty.json" |
| with open(test_snmptrapd.pytest_empty_config, "w") as outfile: |
| outfile.write(test_snmptrapd.pytest_empty_data) |
| |
| |
| def test_usage_err(self): |
| """ |
| Test usage error |
| """ |
| |
| with self.assertRaises(SystemExit) as exc: |
| snmptrapd.usage_err() |
| self.assertEqual(str(exc.exception), "1") |
| |
| |
| def test_load_all_configs(self): |
| """ |
| Test load of all configs |
| """ |
| # request load of CBS data |
| with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): |
| self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) |
| |
| result = trapd_get_cbs_config.get_cbs_config() |
| self.assertEqual(result, True) |
| |
| # request load of CBS data |
| self.assertEqual(snmptrapd.load_all_configs(0, 1), True) |
| |
| |
| def test_resolve_ip(self): |
| """ Test resolve_ip """ |
| with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): |
| self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) |
| |
| time_base = 1000000 |
| time_offset = 10000 |
| with patch('time.time', return_value=time_base): |
| self.assertEqual(time.time(), time_base) |
| |
| fqdn = "foo.example" |
| ip = "1.2.3.4" |
| with patch.dict(tds.dns_cache_ip_to_name): |
| tds.dns_cache_ip_to_name = { } |
| # DOUBLE EXCEPTION - nothing in tds.dns_cache_ip_expires |
| # and gethostbyaddr() fails |
| with patch('socket.gethostbyaddr', side_effect=RuntimeError("gethostbyaddr raises")): |
| self.assertEqual(snmptrapd.resolve_ip(ip), ip) |
| self.assertEqual(tds.dns_cache_ip_to_name[ip], ip) |
| self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) |
| |
| tds.dns_cache_ip_to_name = { ip: fqdn } |
| with patch('socket.gethostbyaddr', return_value=(fqdn,fqdn,[ip])): |
| # EXCEPTION - nothing in tds.dns_cache_ip_expires |
| del tds.dns_cache_ip_expires[ip] |
| self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) |
| self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) |
| self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) |
| |
| with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() - 10000}): |
| self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) |
| self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) |
| self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + 60) |
| |
| |
| with patch.dict(tds.dns_cache_ip_expires, {ip: time.time() + 10000}): |
| self.assertEqual(snmptrapd.resolve_ip(ip), fqdn) |
| self.assertEqual(tds.dns_cache_ip_to_name[ip], fqdn) |
| self.assertEqual(tds.dns_cache_ip_expires[ip], time_base + time_offset) |
| |
| |
| def test_load_all_configs_signal(self): |
| """ |
| Test load of all configs via runtime signal |
| """ |
| |
| # init vars |
| tds.init() |
| |
| # request load of CBS data |
| with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): |
| self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) |
| |
| self.assertTrue(trapd_get_cbs_config.get_cbs_config()) |
| |
| # request load of CBS data |
| self.assertTrue(snmptrapd.load_all_configs(1, 1)) |
| |
| with patch('snmptrapd.get_cbs_config', return_value=False): |
| with self.assertRaises(SystemExit): |
| snmptrapd.load_all_configs(1, 1) |
| |
| |
| def test_log_all_arriving_traps(self): |
| """ |
| Test logging of traps |
| """ |
| # init vars |
| tds.init() |
| |
| # don't open files, but try to log - should raise exception |
| with self.assertRaises(Exception) as exc: |
| snmptrapd.log_all_arriving_traps() |
| self.assertIsInstance(exc.exception, TypeError) |
| |
| # request load of CBS data |
| with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): |
| # trap dict for logging |
| with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): |
| self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) |
| |
| result = trapd_get_cbs_config.get_cbs_config() |
| |
| # set last day to current |
| tds.last_day = datetime.datetime.now().day |
| |
| |
| # open eelf logs |
| trapd_io.open_eelf_logs() |
| |
| # open trap logs |
| tds.arriving_traps_filename = ( |
| tds.c_config["files"]["runtime_base_dir"] |
| + "/" |
| + tds.c_config["files"]["log_dir"] |
| + "/" |
| + (tds.c_config["files"]["arriving_traps_log"]) |
| ) |
| tds.arriving_traps_fd = trapd_io.open_file(tds.arriving_traps_filename) |
| |
| # name and open json trap log |
| tds.json_traps_filename = ( |
| tds.c_config["files"]["runtime_base_dir"] |
| + "/" |
| + tds.c_config["files"]["log_dir"] |
| + "/" |
| + "DMAAP_" |
| + (tds.c_config["streams_publishes"]["sec_fault_unsecure"]["dmaap_info"]["topic_url"].split("/")[-1]) |
| + ".json" |
| ) |
| tds.json_traps_fd = trapd_io.open_file(tds.json_traps_filename) |
| msg = "published traps logged to: %s" % tds.json_traps_filename |
| trapd_io.stdout_logger(msg) |
| trapd_io.ecomp_logger(tds.LOG_TYPE_DEBUG, tds.SEV_INFO, tds.CODE_GENERAL, msg) |
| # also force it to daily roll |
| snmptrapd.log_all_arriving_traps() |
| |
| # try again, but with day rolling |
| tds.last_day = datetime.datetime.now().day - 1 |
| snmptrapd.log_all_arriving_traps() |
| |
| # try again, with roll_frequency set to minute |
| tds.last_minute = datetime.datetime.now().minute - 1 |
| tds.c_config["files"]["roll_frequency"] = "minute" |
| snmptrapd.log_all_arriving_traps() |
| |
| # try again, with roll_frequency set to hour |
| tds.last_hour = datetime.datetime.now().hour - 1 |
| tds.c_config["files"]["roll_frequency"] = "hour" |
| snmptrapd.log_all_arriving_traps() |
| |
| # try again, with a bad trap_dict[time_received] |
| tds.trap_dict["time received"] = "bad_value" |
| snmptrapd.log_all_arriving_traps() |
| |
| # also test log_published_messages() |
| snmptrapd.log_published_messages("data #1") |
| |
| # work even if there is an exception |
| # this SHOULD be done with a context |
| sv_json_traps_fd = tds.json_traps_fd |
| tds.json_traps_fd = test_snmptrapd.WriteThrows() |
| snmptrapd.log_published_messages("data #2") |
| tds.json_traps_fd = sv_json_traps_fd |
| |
| |
| def test_log_all_incorrect_log_type(self): |
| """ |
| Test logging of traps |
| """ |
| |
| # init vars |
| tds.init() |
| |
| # request load of CBS data |
| with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): |
| self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) |
| |
| trapd_get_cbs_config.get_cbs_config() |
| |
| # open eelf logs |
| trapd_io.open_eelf_logs() |
| |
| |
| def test_v1_trap_receipt(self): |
| """ |
| Test receiving traps |
| """ |
| |
| # init vars |
| tds.init() |
| |
| # request load of CBS data |
| with patch.dict(os.environ, {'CBS_SIM_JSON':test_snmptrapd.pytest_json_config}): |
| self.assertEqual(os.getenv('CBS_SIM_JSON'), test_snmptrapd.pytest_json_config) |
| |
| trapd_get_cbs_config.get_cbs_config() |
| |
| errorIndication, errorStatus, errorIndex, varbinds = next( |
| sendNotification( |
| SnmpEngine(), |
| CommunityData("not_public"), |
| UdpTransportTarget(("localhost", 6162)), |
| ContextData(), |
| "trap", |
| [ |
| ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.1"), OctetString("test trap - ignore")), |
| ObjectType(ObjectIdentity(".1.3.6.1.4.1.999.2"), OctetString("ONAP pytest trap")), |
| ], |
| ) |
| ) |
| |
| result = errorIndication |
| self.assertEqual(result, None) |
| |
| |
| def test_post_dmaap(self): |
| """ |
| Test post_dmaap() |
| """ |
| |
| # trap dict for logging |
| with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): |
| with patch('snmptrapd.ecomp_logger') as magic_ecomp_logger: |
| with patch('requests.Session.post') as magic_session_post: |
| fake_post_resp = Mock() |
| magic_session_post.return_value = fake_post_resp |
| |
| fake_post_resp.status_code = requests.codes.moved_permanently |
| snmptrapd.post_dmaap() |
| self.assertEqual(magic_ecomp_logger.call_count, 11) |
| |
| fake_post_resp.status_code = requests.codes.ok |
| snmptrapd.post_dmaap() |
| self.assertEqual(magic_ecomp_logger.call_count, 16) |
| |
| magic_ecomp_logger.call_count = 0 |
| tds.traps_since_last_publish = 1 |
| snmptrapd.post_dmaap() |
| self.assertEqual(magic_ecomp_logger.call_count, 5) |
| |
| magic_ecomp_logger.call_count = 0 |
| tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_username"] = "aaf_username" |
| tds.c_config["streams_publishes"]["sec_fault_unsecure"]["aaf_password"] = "aaf_password" |
| snmptrapd.post_dmaap() |
| self.assertEqual(magic_ecomp_logger.call_count, 5) |
| |
| # for some reason this exception is being seen as an OSError ???? |
| magic_ecomp_logger.call_count = 0 |
| magic_session_post.side_effect = requests.exceptions.RequestException("test throw") |
| snmptrapd.post_dmaap() |
| self.assertEqual(magic_ecomp_logger.call_count, 10) |
| |
| magic_ecomp_logger.call_count = 0 |
| magic_session_post.side_effect = OSError() |
| snmptrapd.post_dmaap() |
| self.assertEqual(magic_ecomp_logger.call_count, 10) |
| |
| |
| @unittest.skip("do not know what to pass in for vars. Need an object with a clone() method") |
| def test_comm_string_rewrite_observer(self): |
| """ |
| test comm_string_rewrite_observer() |
| """ |
| vars = { "communityName": ["name"] } |
| snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx") |
| assertEqual(vars["communitName"], "public") |
| |
| vars = { "communityName": [] } |
| snmptrapd.comm_string_rewrite_observer("snmpEngine", "execpoint", vars, "cbCtx") |
| assertEqual(vars["communitName"], "") |
| |
| |
| def test_snmp_engine_observer_cb(self): |
| """ |
| test snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx): |
| """ |
| |
| snmp_engine = "snmp_engine" |
| execpoint = "execpoint" |
| cbCtx = "cbCtx" |
| variables = { |
| 'transportDomain': [ 1, 2, 3 ], |
| 'transportAddress': [ "a", "b", "c" ] |
| } |
| for secmodel,ret in [ (1, "v1"), (2, "v2c"), (3, "v3"), (4, "unknown") ]: |
| variables["securityModel"] = secmodel |
| snmptrapd.snmp_engine_observer_cb(snmp_engine, execpoint, variables, cbCtx) |
| self.assertEqual(tds.trap_dict["protocol version"], ret) |
| |
| |
| def test_add_varbind_to_log_string(self): |
| """ |
| test add_varbind_to_log_string(vb_idx, vb_oid, vb_type, vb_val) |
| """ |
| vb_oid = "vb_oid" |
| vb_type = "vb_type" |
| |
| class TempPP(): |
| def prettyPrint(self): |
| return "pp ret" |
| |
| vb_val = TempPP() |
| |
| self.assertEqual(tds.all_vb_str, "") |
| |
| snmptrapd.add_varbind_to_log_string(0, vb_oid, vb_type, vb_val) |
| self.assertEqual(tds.all_vb_str, |
| 'varbinds: [0] vb_oid {vb_type} pp ret') |
| |
| snmptrapd.add_varbind_to_log_string(1, vb_oid, vb_type, vb_val) |
| self.assertEqual(tds.all_vb_str, |
| 'varbinds: [0] vb_oid {vb_type} pp ret [1] vb_oid {vb_type} pp ret') |
| |
| |
| def test_add_varbind_to_json(self): |
| """ |
| test add_varbind_to_json(vb_idx, vb_oid, vb_type, vb_val) |
| """ |
| |
| class TempPP(): |
| def __init__(self, ret): |
| self.ret = ret |
| def prettyPrint(self): |
| return self.ret |
| |
| with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): |
| vb_oid = TempPP("1.2.3") |
| override_vb_oid = TempPP("1.3.6.1.6.3.18.1.3.0") |
| |
| vb_type = "vb_type" |
| |
| vb_val = TempPP("foo.example") |
| |
| self.assertEqual(snmptrapd.add_varbind_to_json(0, vb_oid, vb_type, vb_val), 0) |
| self.assertEqual(snmptrapd.add_varbind_to_json(1, vb_oid, vb_type, vb_val), 0) |
| self.assertEqual(tds.trap_dict["notify OID"], ".foo.example") |
| self.assertEqual(tds.trap_dict["notify OID len"], 2) |
| |
| with patch('snmptrapd.resolve_ip') as magic_resolve_ip: |
| magic_resolve_ip.return_value = 'foo.example' |
| self.assertEqual(snmptrapd.add_varbind_to_json(2, override_vb_oid, vb_type, vb_val), 0) |
| self.assertEqual(tds.trap_dict["agent address"], "foo.example") |
| self.assertEqual(tds.trap_dict["agent name"], "foo.example") |
| |
| sv_protocol_version = tds.trap_dict["protocol version"] |
| tds.trap_dict["protocol version"] = "v1" |
| self.assertEqual(snmptrapd.add_varbind_to_json(4, vb_oid, vb_type, vb_val), 0) |
| self.assertEqual(snmptrapd.add_varbind_to_json(5, vb_oid, vb_type, vb_val), 1) |
| |
| tds.trap_dict["protocol version"] = sv_protocol_version |
| self.assertEqual(snmptrapd.add_varbind_to_json(6, vb_oid, vb_type, vb_val), 1) |
| self.assertEqual(tds.all_vb_json_str, |
| ', "varbinds": [{"varbind_oid": ".1.2.3", ' |
| '"varbind_type": "octet", "varbind_value": ' |
| '"foo.example"} ,{"varbind_oid": ".1.2.3", ' |
| '"varbind_type": "octet", "varbind_value": ' |
| '"foo.example"}') |
| |
| self.assertEqual(snmptrapd.add_varbind_to_json(7, vb_oid, vb_type, vb_val), 1) |
| self.assertEqual(tds.all_vb_json_str, |
| ', "varbinds": [{"varbind_oid": ".1.2.3", ' |
| '"varbind_type": "octet", "varbind_value": ' |
| '"foo.example"} ,{"varbind_oid": ".1.2.3", ' |
| '"varbind_type": "octet", "varbind_value": ' |
| '"foo.example"} ,{"varbind_oid": ".1.2.3", ' |
| '"varbind_type": "octet", "varbind_value": ' |
| '"foo.example"}') |
| |
| |
| @patch('snmptrapd.log_all_arriving_traps') |
| @patch('snmptrapd.post_dmaap', return_value = 0) |
| @patch('snmptrapd.ecomp_logger', return_value = 0) |
| @patch('snmptrapd.add_varbind_to_json', return_value = 1) |
| @patch('snmptrapd.add_varbind_to_log_string', return_value = 0) |
| def test_notif_receiver_cb(self, magic_add_varbind_to_log_string, magic_add_varbind_to_json, |
| magic_ecomp_logger, magic_port_dmaap, magic_lost_all_arriving_traps): |
| """ notif_receiver_cb(snmp_engine, stateReference, contextEngineId, contextName, varBinds, cbCtx) """ |
| with patch.dict(tds.trap_dict, copy.deepcopy(test_snmptrapd.trap_dict_info)): |
| with patch('trapd_stormwatch.sw_storm_active', return_value=True): |
| snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") |
| with patch('trapd_stormwatch.sw_storm_active', return_value=False): |
| snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") |
| self.assertFalse(tds.first_trap) |
| self.assertEqual(magic_ecomp_logger.call_count, 8) |
| |
| magic_ecomp_logger.call_count = 0 |
| with patch('trapd_stormwatch.sw_storm_active', return_value=False): |
| snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") |
| self.assertEqual(magic_ecomp_logger.call_count, 4) |
| |
| magic_ecomp_logger.call_count = 0 |
| tds.c_config["publisher"]["max_traps_between_publishes"] = 1 |
| with patch('trapd_stormwatch.sw_storm_active', return_value=False): |
| snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") |
| self.assertEqual(magic_ecomp_logger.call_count, 4) |
| |
| magic_ecomp_logger.call_count = 0 |
| tds.c_config["publisher"]["max_traps_between_publishes"] = 100 |
| tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 0 |
| tds.last_pub_time = 0 |
| with patch('time.time', return_value=0): |
| with patch('trapd_stormwatch.sw_storm_active', return_value=False): |
| snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") |
| self.assertEqual(magic_ecomp_logger.call_count, 4) |
| |
| magic_ecomp_logger.call_count = 0 |
| tds.last_pub_time = 100000 |
| tds.c_config["publisher"]["max_milliseconds_between_publishes"] = 1 |
| with patch('time.time', return_value=10): |
| with patch('trapd_stormwatch.sw_storm_active', return_value=False): |
| snmptrapd.notif_receiver_cb("snmp_engine", "stateReference", "contextEngineId", "contextName", [("varBinds1", "varbinds2")], "cbCtx") |
| self.assertEqual(magic_ecomp_logger.call_count, 4) |
| |
| |
| if __name__ == "__main__": # pragma: no cover |
| unittest.main() |