| # ============LICENSE_START======================================================= |
| # Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved. |
| # ================================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END========================================================= |
| |
| import datetime |
| import glob |
| import io |
| import json |
| import os |
| from pathlib import Path |
| import sys |
| import tempfile |
| import unittest |
| from unittest.mock import patch |
| |
| import snmptrapd |
| import trapd_settings as tds |
| import trapd_runtime_pid |
| import trapd_io |
| |
| |
| class test_trapd_io(unittest.TestCase): |
| """ |
| Test the save_pid mod |
| """ |
| |
| class PseudoFile(): |
| """ test file-like object that does nothing """ |
| def write(self): |
| pass |
| def close(self): |
| pass |
| |
| class WriteThrows(): |
| """ test file-like object that throws on a write """ |
| def write(self): |
| raise RuntimeError("close() throws") |
| |
| |
| @classmethod |
| def setUpClass(cls): |
| |
| tds.init() |
| |
| snmptrap_dir = "/tmp/opt/app/snmptrap" |
| try: |
| Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True) |
| Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True) |
| Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True) |
| except Exception as e: |
| print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) |
| sys.exit(1) |
| |
| # fmt: off |
| tds.c_config = json.loads( |
| '{ "snmptrapd": { ' |
| ' "version": "1.4.0", ' |
| ' "title": "ONAP SNMP Trap Receiver" }, ' |
| '"protocols": { ' |
| ' "transport": "udp", ' |
| ' "ipv4_interface": "0.0.0.0", ' |
| ' "ipv4_port": 6162, ' |
| ' "ipv6_interface": "::1", ' |
| ' "ipv6_port": 6162 }, ' |
| '"cache": { ' |
| ' "dns_cache_ttl_seconds": 60 }, ' |
| '"publisher": { ' |
| ' "http_timeout_milliseconds": 1500, ' |
| ' "http_retries": 3, ' |
| ' "http_milliseconds_between_retries": 750, ' |
| ' "http_primary_publisher": "true", ' |
| ' "http_peer_publisher": "unavailable", ' |
| ' "max_traps_between_publishes": 10, ' |
| ' "max_milliseconds_between_publishes": 10000 }, ' |
| '"streams_publishes": { ' |
| ' "sec_fault_unsecure": { ' |
| ' "type": "message_router", ' |
| ' "aaf_password": null, ' |
| ' "dmaap_info": { ' |
| ' "location": "mtl5", ' |
| ' "client_id": null, ' |
| ' "client_role": null, ' |
| ' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, ' |
| ' "aaf_username": null } }, ' |
| '"files": { ' |
| ' "runtime_base_dir": "/tmp/opt/app/snmptrap", ' |
| ' "log_dir": "logs", ' |
| ' "data_dir": "data", ' |
| ' "pid_dir": "tmp", ' |
| ' "arriving_traps_log": "snmptrapd_arriving_traps.log", ' |
| ' "snmptrapd_diag": "snmptrapd_prog_diag.log", ' |
| ' "traps_stats_log": "snmptrapd_stats.csv", ' |
| ' "perm_status_file": "snmptrapd_status.log", ' |
| ' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", ' |
| ' "eelf_error": "error.log", ' |
| ' "eelf_debug": "debug.log", ' |
| ' "eelf_audit": "audit.log", ' |
| ' "eelf_metrics": "metrics.log", ' |
| ' "roll_frequency": "day", ' |
| ' "minimum_severity_to_log": 2 }, ' |
| '"trap_config": { ' |
| ' "sw_interval_in_seconds": 60, ' |
| ' "notify_oids": { ' |
| ' ".1.3.6.1.4.1.9.0.1": { ' |
| ' "sw_high_water_in_interval": 102, ' |
| ' "sw_low_water_in_interval": 7, ' |
| ' "category": "logonly" }, ' |
| ' ".1.3.6.1.4.1.9.0.2": { ' |
| ' "sw_high_water_in_interval": 101, ' |
| ' "sw_low_water_in_interval": 7, ' |
| ' "category": "logonly" }, ' |
| ' ".1.3.6.1.4.1.9.0.3": { ' |
| ' "sw_high_water_in_interval": 102, ' |
| ' "sw_low_water_in_interval": 7, ' |
| ' "category": "logonly" }, ' |
| ' ".1.3.6.1.4.1.9.0.4": { ' |
| ' "sw_high_water_in_interval": 10, ' |
| ' "sw_low_water_in_interval": 3, ' |
| ' "category": "logonly" } } }, ' |
| '"snmpv3_config": { ' |
| ' "usm_users": [ { ' |
| ' "user": "usr-sha-aes256", ' |
| ' "engineId": "8000000001020304", ' |
| ' "usmHMACSHAAuth": "authkey1", ' |
| ' "usmAesCfb256": "privkey1" }, ' |
| ' { "user": "user1", ' |
| ' "engineId": "8000000000000001", ' |
| ' "usmHMACMD5Auth": "authkey1", ' |
| ' "usmDESPriv": "privkey1" }, ' |
| ' { "user": "user2", ' |
| ' "engineId": "8000000000000002", ' |
| ' "usmHMACSHAAuth": "authkey2", ' |
| ' "usmAesCfb128": "privkey2" }, ' |
| ' { "user": "user3", ' |
| ' "engineId": "8000000000000003", ' |
| ' "usmHMACSHAAuth": "authkey3", ' |
| ' "usmAesCfb256": "privkey3" } ' |
| '] } }' |
| ) |
| # fmt: on |
| tds.json_traps_filename = ( |
| tds.c_config["files"]["runtime_base_dir"] + "/json_traps.json" |
| ) |
| tds.arriving_traps_filename = ( |
| tds.c_config["files"]["runtime_base_dir"] + "/arriving_traps.log" |
| ) |
| |
| |
| def test_open_eelf_error_file(self): |
| """ |
| Test bad error file location |
| """ |
| |
| with patch.dict(tds.c_config["files"]): |
| # open eelf error logs |
| tds.c_config["files"]["eelf_error"] = "/bad_dir/error.log" |
| |
| # try to open file in non-existent dir |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_eelf_logs() |
| |
| |
| def test_open_eelf_debug_file(self): |
| """ |
| Test bad debug file location |
| """ |
| |
| # open eelf debug logs |
| with patch.dict(tds.c_config["files"]): |
| tds.c_config["files"]["eelf_debug"] = "/bad_dir/debug.log" |
| |
| # try to open file in non-existent dir |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_eelf_logs() |
| |
| |
| def test_open_eelf_audit_file(self): |
| """ |
| Test bad audit file location |
| """ |
| |
| with patch.dict(tds.c_config["files"]): |
| # open eelf debug logs |
| tds.c_config["files"]["eelf_audit"] = "/bad_dir/audit.log" |
| |
| # try to open file in non-existent dir |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_eelf_logs() |
| |
| |
| def test_open_eelf_metrics_file(self): |
| """ |
| Test bad metrics file location |
| """ |
| |
| with patch.dict(tds.c_config["files"]): |
| # open eelf debug logs |
| tds.c_config["files"]["eelf_metrics"] = "/bad_dir/metrics.log" |
| |
| # try to open file in non-existent dir |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_eelf_logs() |
| |
| |
| def test_open_eelf_error_file_missing_name(self): |
| """ |
| Test bad error file location |
| """ |
| |
| with patch.dict(tds.c_config["files"]): |
| # open eelf error logs |
| del tds.c_config["files"]["eelf_error"] |
| |
| # try to open file in non-existent dir |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_eelf_logs() |
| |
| |
| def test_open_eelf_debug_file_missing_name(self): |
| """ |
| Test bad debug file location |
| """ |
| |
| # open eelf debug logs |
| with patch.dict(tds.c_config["files"]): |
| del tds.c_config["files"]["eelf_debug"] |
| |
| # try to open file in non-existent dir |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_eelf_logs() |
| |
| |
| def test_open_eelf_audit_file_missing_name(self): |
| """ |
| Test bad audit file location |
| """ |
| |
| with patch.dict(tds.c_config["files"]): |
| # open eelf debug logs |
| del tds.c_config["files"]["eelf_audit"] |
| |
| # try to open file in non-existent dir |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_eelf_logs() |
| |
| |
| def test_open_eelf_metrics_file_missing_name(self): |
| """ |
| Test bad metrics file location |
| """ |
| |
| with patch.dict(tds.c_config["files"]): |
| # open eelf debug logs |
| del tds.c_config["files"]["eelf_metrics"] |
| |
| # try to open file in non-existent dir |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_eelf_logs() |
| |
| |
| def test_roll_all_logs_not_open(self): |
| """ |
| Test roll of logs when not open |
| """ |
| |
| # try to roll logs when not open. Shouldn't fail |
| trapd_io.roll_all_logs() |
| self.assertIsNotNone(tds.eelf_error_fd) |
| |
| def test_roll_all_logs(self): |
| """ |
| Test rolling files that they are open |
| """ |
| |
| trapd_io.open_eelf_logs() |
| # try to roll logs |
| trapd_io.roll_all_logs() |
| self.assertIsNotNone(tds.eelf_error_fd) |
| |
| |
| def test_roll_all_logs_roll_file_throws(self): |
| """ |
| Test rolling files that they are open |
| but roll_file throws an exception |
| """ |
| |
| trapd_io.open_eelf_logs() |
| # try to roll logs |
| with patch('trapd_io.roll_file') as roll_file_throws: |
| roll_file_throws.side_effect = RuntimeError("roll_file() throws") |
| with self.assertRaises(SystemExit): |
| trapd_io.roll_all_logs() |
| self.assertIsNotNone(tds.eelf_error_fd) |
| |
| |
| def test_roll_all_logs_open_eelf_logs_returns_false(self): |
| """ |
| Test rolling files that they are open |
| but open_eelf_logs returns false |
| """ |
| |
| trapd_io.open_eelf_logs() |
| # try to roll logs |
| with patch('trapd_io.open_eelf_logs') as open_eelf_logs_throws: |
| open_eelf_logs_throws.return_value = False |
| with self.assertRaises(SystemExit): |
| trapd_io.roll_all_logs() |
| self.assertIsNotNone(tds.eelf_error_fd) |
| |
| |
| def test_roll_all_logs_open_file_json_traps_throws(self): |
| """ |
| Test rolling files that they are open |
| but open_file(json_traps_filename) throws an exception |
| """ |
| |
| def tmp_func(nm): |
| if nm == tds.json_traps_filename: |
| raise RuntimeError("json_traps_filename throws") |
| return test_trapd_io.PseudoFile() |
| |
| trapd_io.open_eelf_logs() |
| # try to roll logs |
| with patch('trapd_io.open_file') as open_file_throws: |
| open_file_throws.side_effect = tmp_func |
| with self.assertRaises(SystemExit): |
| trapd_io.roll_all_logs() |
| self.assertIsNotNone(tds.eelf_error_fd) |
| |
| |
| def test_roll_all_logs_open_file_arriving_traps_throws(self): |
| """ |
| Test rolling files that they are open |
| but open_file(arriving_traps_filename) throws an exception |
| """ |
| |
| def tmp_func(nm): |
| if nm == tds.arriving_traps_filename: |
| raise RuntimeError("arriving_traps_filename throws") |
| return test_trapd_io.PseudoFile() |
| |
| trapd_io.open_eelf_logs() |
| # try to roll logs |
| with patch('trapd_io.open_file') as open_file_throws: |
| open_file_throws.side_effect = tmp_func |
| with self.assertRaises(SystemExit): |
| trapd_io.roll_all_logs() |
| self.assertIsNotNone(tds.eelf_error_fd) |
| |
| |
| def test_roll_file(self): |
| """ |
| Test roll of individual file when not present |
| """ |
| |
| # try to roll a valid log file |
| with tempfile.TemporaryDirectory() as ntd: |
| fn = ntd + "/test.log" |
| with open(fn, "w") as ofp: |
| self.assertTrue(trapd_io.roll_file(fn)) |
| # The file will be renamed to something like |
| # test.log.2022-08-17T20:28:32 |
| self.assertFalse(os.path.exists(fn)) |
| # We could also add a test to see if there is a file |
| # with a name like that. |
| files = list(glob.glob(f"{ntd}/*")) |
| print(f"files={files}") |
| self.assertEqual(len(files), 1) |
| self.assertTrue(files[0].startswith(fn + ".")) |
| |
| |
| def test_roll_file_not_present(self): |
| """ |
| Test roll of individual file when not present |
| """ |
| |
| # try to roll logs when not open |
| self.assertFalse(trapd_io.roll_file("/file/not/present")) |
| |
| |
| def test_roll_file_no_write_perms(self): |
| """ |
| try to roll logs when not enough perms |
| """ |
| |
| with tempfile.TemporaryDirectory() as no_perms_dir: |
| # no_perms_dir = "/tmp/opt/app/snmptrap/no_perms" |
| no_perms_file = "test.dat" |
| no_perms_fp = no_perms_dir + "/" + no_perms_file |
| |
| # required directory tree |
| #try: |
| # Path(no_perms_dir).mkdir(parents=True, exist_ok=True) |
| # os.chmod(no_perms_dir, 0o700) |
| #except Exception as e: |
| # self.fail("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror))) |
| |
| # create empty file |
| open(no_perms_fp, "w").close() |
| os.chmod(no_perms_dir, 0o555) |
| |
| # try to roll file in dir with no write perms |
| self.assertFalse(trapd_io.roll_file(no_perms_fp)) |
| |
| # the file should still be there |
| open(no_perms_fp).close() |
| |
| # allow the directory to be removed |
| os.chmod(no_perms_dir, 0o700) |
| |
| |
| def test_open_file_exists(self): |
| """ |
| Test file open in directory present |
| """ |
| |
| # create copy of snmptrapd.json for pytest |
| test_file = "/tmp/snmptrap_pytest" |
| |
| # try to roll logs when not open |
| result = trapd_io.open_file(test_file) |
| self.assertTrue(str(result).startswith("<_io.TextIOWrapper name=")) |
| self.assertIsInstance(result, io.TextIOWrapper) |
| |
| |
| def test_open_file_exists_does_not_exist(self): |
| """ |
| Test file open in directory present |
| """ |
| |
| # create copy of snmptrapd.json for pytest |
| test_file = "/tmp/no_such_dir/snmptrap_pytest" |
| |
| # try to open file when dir not present |
| with self.assertRaises(SystemExit): |
| result = trapd_io.open_file(test_file) |
| |
| |
| def test_close_file_exists(self): |
| """ |
| Test closing a file that's present |
| """ |
| |
| # create copy of snmptrapd.json for pytest |
| test_file_name = "/tmp/snmptrap_pytest" |
| test_file = trapd_io.open_file(test_file_name) |
| |
| # close active file |
| self.assertTrue(trapd_io.close_file(test_file, test_file_name)) |
| |
| |
| def test_close_file_does_not_exist(self): |
| """ |
| Test closing non-existent file |
| """ |
| |
| # try to roll logs when not open |
| self.assertFalse(trapd_io.close_file(None, None)) |
| |
| |
| def test_ecomp_logger_type_error(self): |
| """ |
| test trapd_io.ecomp_logger |
| """ |
| |
| trapd_io.open_eelf_logs() |
| msg = "this is a test" |
| self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) |
| |
| |
| def test_ecomp_logger_type_error_bad_fd(self): |
| """ |
| test trapd_io.ecomp_logger, but write() throws |
| """ |
| |
| trapd_io.open_eelf_logs() |
| msg = "this is a test" |
| # the following SHOULD be done with a context patch |
| sv_eelf_error_fd = tds.eelf_error_fd |
| tds.eelf_error_fd = test_trapd_io.WriteThrows() |
| self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) |
| tds.eelf_error_fd = sv_eelf_error_fd |
| |
| |
| def test_ecomp_logger_type_unknown_bad_fd(self): |
| """ |
| test trapd_io.ecomp_logger, unknown type, but write() throws |
| """ |
| |
| trapd_io.open_eelf_logs() |
| msg = "this is a test" |
| # the following SHOULD be done with a context patch |
| sv_eelf_error_fd = tds.eelf_error_fd |
| tds.eelf_error_fd = test_trapd_io.WriteThrows() |
| self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) |
| tds.eelf_error_fd = sv_eelf_error_fd |
| |
| |
| def test_ecomp_logger_type_metrics(self): |
| """ |
| test trapd_io.ecomp_logger to metrics |
| """ |
| |
| trapd_io.open_eelf_logs() |
| msg = "this is a test" |
| self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) |
| |
| |
| def test_ecomp_logger_type_metrics_bad_fd(self): |
| """ |
| test trapd_io.ecomp_logger to metrics, but write() throws |
| """ |
| |
| trapd_io.open_eelf_logs() |
| msg = "this is a test" |
| # the following SHOULD be done with a context patch |
| sv_eelf_metrics_fd = tds.eelf_metrics_fd |
| tds.eelf_metrics_fd = test_trapd_io.WriteThrows() |
| self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) |
| tds.eelf_metrics_fd = sv_eelf_metrics_fd |
| |
| |
| def test_ecomp_logger_type_audit(self): |
| """ |
| test trapd_io.ecomp_logger to audit log |
| """ |
| |
| trapd_io.open_eelf_logs() |
| msg = "this is a test" |
| self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) |
| |
| |
| def test_ecomp_logger_type_audit_bad_fd(self): |
| """ |
| test trapd_io.ecomp_logger to audit log, but write() throws |
| """ |
| |
| trapd_io.open_eelf_logs() |
| msg = "this is a test" |
| # the following SHOULD be done with a context patch |
| sv_eelf_audit_fd = tds.eelf_audit_fd |
| tds.eelf_audit_fd = test_trapd_io.WriteThrows() |
| self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) |
| tds.eelf_audit_fd = sv_eelf_audit_fd |
| |
| |
| def test_ecomp_logger_type_unknown(self): |
| """ |
| test trapd_io.ecomp_logger |
| """ |
| |
| trapd_io.open_eelf_logs() |
| msg = "this is a test" |
| self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg)) |
| |
| |
| if __name__ == "__main__": # pragma: no cover |
| unittest.main() |