blob: f3d1a4c1fc8a0536b7aefb747016916d6d964771 [file] [log] [blame]
# ============LICENSE_START=======================================================
# Copyright (c) 2019-2022 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
import datetime
import glob
import io
import json
import os
from pathlib import Path
import sys
import tempfile
import unittest
from unittest.mock import patch
import snmptrapd
import trapd_settings as tds
import trapd_runtime_pid
import trapd_io
class test_trapd_io(unittest.TestCase):
"""
Test the save_pid mod
"""
class PseudoFile():
""" test file-like object that does nothing """
def write(self):
pass
def close(self):
pass
class WriteThrows():
""" test file-like object that throws on a write """
def write(self):
raise RuntimeError("close() throws")
@classmethod
def setUpClass(cls):
tds.init()
snmptrap_dir = "/tmp/opt/app/snmptrap"
try:
Path(snmptrap_dir + "/logs").mkdir(parents=True, exist_ok=True)
Path(snmptrap_dir + "/tmp").mkdir(parents=True, exist_ok=True)
Path(snmptrap_dir + "/etc").mkdir(parents=True, exist_ok=True)
except Exception as e:
print("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
sys.exit(1)
# fmt: off
tds.c_config = json.loads(
'{ "snmptrapd": { '
' "version": "1.4.0", '
' "title": "ONAP SNMP Trap Receiver" }, '
'"protocols": { '
' "transport": "udp", '
' "ipv4_interface": "0.0.0.0", '
' "ipv4_port": 6162, '
' "ipv6_interface": "::1", '
' "ipv6_port": 6162 }, '
'"cache": { '
' "dns_cache_ttl_seconds": 60 }, '
'"publisher": { '
' "http_timeout_milliseconds": 1500, '
' "http_retries": 3, '
' "http_milliseconds_between_retries": 750, '
' "http_primary_publisher": "true", '
' "http_peer_publisher": "unavailable", '
' "max_traps_between_publishes": 10, '
' "max_milliseconds_between_publishes": 10000 }, '
'"streams_publishes": { '
' "sec_fault_unsecure": { '
' "type": "message_router", '
' "aaf_password": null, '
' "dmaap_info": { '
' "location": "mtl5", '
' "client_id": null, '
' "client_role": null, '
' "topic_url": "http://localhost:3904/events/ONAP-COLLECTOR-SNMPTRAP" }, '
' "aaf_username": null } }, '
'"files": { '
' "runtime_base_dir": "/tmp/opt/app/snmptrap", '
' "log_dir": "logs", '
' "data_dir": "data", '
' "pid_dir": "tmp", '
' "arriving_traps_log": "snmptrapd_arriving_traps.log", '
' "snmptrapd_diag": "snmptrapd_prog_diag.log", '
' "traps_stats_log": "snmptrapd_stats.csv", '
' "perm_status_file": "snmptrapd_status.log", '
' "eelf_base_dir": "/tmp/opt/app/snmptrap/logs", '
' "eelf_error": "error.log", '
' "eelf_debug": "debug.log", '
' "eelf_audit": "audit.log", '
' "eelf_metrics": "metrics.log", '
' "roll_frequency": "day", '
' "minimum_severity_to_log": 2 }, '
'"trap_config": { '
' "sw_interval_in_seconds": 60, '
' "notify_oids": { '
' ".1.3.6.1.4.1.9.0.1": { '
' "sw_high_water_in_interval": 102, '
' "sw_low_water_in_interval": 7, '
' "category": "logonly" }, '
' ".1.3.6.1.4.1.9.0.2": { '
' "sw_high_water_in_interval": 101, '
' "sw_low_water_in_interval": 7, '
' "category": "logonly" }, '
' ".1.3.6.1.4.1.9.0.3": { '
' "sw_high_water_in_interval": 102, '
' "sw_low_water_in_interval": 7, '
' "category": "logonly" }, '
' ".1.3.6.1.4.1.9.0.4": { '
' "sw_high_water_in_interval": 10, '
' "sw_low_water_in_interval": 3, '
' "category": "logonly" } } }, '
'"snmpv3_config": { '
' "usm_users": [ { '
' "user": "usr-sha-aes256", '
' "engineId": "8000000001020304", '
' "usmHMACSHAAuth": "authkey1", '
' "usmAesCfb256": "privkey1" }, '
' { "user": "user1", '
' "engineId": "8000000000000001", '
' "usmHMACMD5Auth": "authkey1", '
' "usmDESPriv": "privkey1" }, '
' { "user": "user2", '
' "engineId": "8000000000000002", '
' "usmHMACSHAAuth": "authkey2", '
' "usmAesCfb128": "privkey2" }, '
' { "user": "user3", '
' "engineId": "8000000000000003", '
' "usmHMACSHAAuth": "authkey3", '
' "usmAesCfb256": "privkey3" } '
'] } }'
)
# fmt: on
tds.json_traps_filename = (
tds.c_config["files"]["runtime_base_dir"] + "/json_traps.json"
)
tds.arriving_traps_filename = (
tds.c_config["files"]["runtime_base_dir"] + "/arriving_traps.log"
)
def test_open_eelf_error_file(self):
"""
Test bad error file location
"""
with patch.dict(tds.c_config["files"]):
# open eelf error logs
tds.c_config["files"]["eelf_error"] = "/bad_dir/error.log"
# try to open file in non-existent dir
with self.assertRaises(SystemExit):
result = trapd_io.open_eelf_logs()
def test_open_eelf_debug_file(self):
"""
Test bad debug file location
"""
# open eelf debug logs
with patch.dict(tds.c_config["files"]):
tds.c_config["files"]["eelf_debug"] = "/bad_dir/debug.log"
# try to open file in non-existent dir
with self.assertRaises(SystemExit):
result = trapd_io.open_eelf_logs()
def test_open_eelf_audit_file(self):
"""
Test bad audit file location
"""
with patch.dict(tds.c_config["files"]):
# open eelf debug logs
tds.c_config["files"]["eelf_audit"] = "/bad_dir/audit.log"
# try to open file in non-existent dir
with self.assertRaises(SystemExit):
result = trapd_io.open_eelf_logs()
def test_open_eelf_metrics_file(self):
"""
Test bad metrics file location
"""
with patch.dict(tds.c_config["files"]):
# open eelf debug logs
tds.c_config["files"]["eelf_metrics"] = "/bad_dir/metrics.log"
# try to open file in non-existent dir
with self.assertRaises(SystemExit):
result = trapd_io.open_eelf_logs()
def test_open_eelf_error_file_missing_name(self):
"""
Test bad error file location
"""
with patch.dict(tds.c_config["files"]):
# open eelf error logs
del tds.c_config["files"]["eelf_error"]
# try to open file in non-existent dir
with self.assertRaises(SystemExit):
result = trapd_io.open_eelf_logs()
def test_open_eelf_debug_file_missing_name(self):
"""
Test bad debug file location
"""
# open eelf debug logs
with patch.dict(tds.c_config["files"]):
del tds.c_config["files"]["eelf_debug"]
# try to open file in non-existent dir
with self.assertRaises(SystemExit):
result = trapd_io.open_eelf_logs()
def test_open_eelf_audit_file_missing_name(self):
"""
Test bad audit file location
"""
with patch.dict(tds.c_config["files"]):
# open eelf debug logs
del tds.c_config["files"]["eelf_audit"]
# try to open file in non-existent dir
with self.assertRaises(SystemExit):
result = trapd_io.open_eelf_logs()
def test_open_eelf_metrics_file_missing_name(self):
"""
Test bad metrics file location
"""
with patch.dict(tds.c_config["files"]):
# open eelf debug logs
del tds.c_config["files"]["eelf_metrics"]
# try to open file in non-existent dir
with self.assertRaises(SystemExit):
result = trapd_io.open_eelf_logs()
def test_roll_all_logs_not_open(self):
"""
Test roll of logs when not open
"""
# try to roll logs when not open. Shouldn't fail
trapd_io.roll_all_logs()
self.assertIsNotNone(tds.eelf_error_fd)
def test_roll_all_logs(self):
"""
Test rolling files that they are open
"""
trapd_io.open_eelf_logs()
# try to roll logs
trapd_io.roll_all_logs()
self.assertIsNotNone(tds.eelf_error_fd)
def test_roll_all_logs_roll_file_throws(self):
"""
Test rolling files that they are open
but roll_file throws an exception
"""
trapd_io.open_eelf_logs()
# try to roll logs
with patch('trapd_io.roll_file') as roll_file_throws:
roll_file_throws.side_effect = RuntimeError("roll_file() throws")
with self.assertRaises(SystemExit):
trapd_io.roll_all_logs()
self.assertIsNotNone(tds.eelf_error_fd)
def test_roll_all_logs_open_eelf_logs_returns_false(self):
"""
Test rolling files that they are open
but open_eelf_logs returns false
"""
trapd_io.open_eelf_logs()
# try to roll logs
with patch('trapd_io.open_eelf_logs') as open_eelf_logs_throws:
open_eelf_logs_throws.return_value = False
with self.assertRaises(SystemExit):
trapd_io.roll_all_logs()
self.assertIsNotNone(tds.eelf_error_fd)
def test_roll_all_logs_open_file_json_traps_throws(self):
"""
Test rolling files that they are open
but open_file(json_traps_filename) throws an exception
"""
def tmp_func(nm):
if nm == tds.json_traps_filename:
raise RuntimeError("json_traps_filename throws")
return test_trapd_io.PseudoFile()
trapd_io.open_eelf_logs()
# try to roll logs
with patch('trapd_io.open_file') as open_file_throws:
open_file_throws.side_effect = tmp_func
with self.assertRaises(SystemExit):
trapd_io.roll_all_logs()
self.assertIsNotNone(tds.eelf_error_fd)
def test_roll_all_logs_open_file_arriving_traps_throws(self):
"""
Test rolling files that they are open
but open_file(arriving_traps_filename) throws an exception
"""
def tmp_func(nm):
if nm == tds.arriving_traps_filename:
raise RuntimeError("arriving_traps_filename throws")
return test_trapd_io.PseudoFile()
trapd_io.open_eelf_logs()
# try to roll logs
with patch('trapd_io.open_file') as open_file_throws:
open_file_throws.side_effect = tmp_func
with self.assertRaises(SystemExit):
trapd_io.roll_all_logs()
self.assertIsNotNone(tds.eelf_error_fd)
def test_roll_file(self):
"""
Test roll of individual file when not present
"""
# try to roll a valid log file
with tempfile.TemporaryDirectory() as ntd:
fn = ntd + "/test.log"
with open(fn, "w") as ofp:
self.assertTrue(trapd_io.roll_file(fn))
# The file will be renamed to something like
# test.log.2022-08-17T20:28:32
self.assertFalse(os.path.exists(fn))
# We could also add a test to see if there is a file
# with a name like that.
files = list(glob.glob(f"{ntd}/*"))
print(f"files={files}")
self.assertEqual(len(files), 1)
self.assertTrue(files[0].startswith(fn + "."))
def test_roll_file_not_present(self):
"""
Test roll of individual file when not present
"""
# try to roll logs when not open
self.assertFalse(trapd_io.roll_file("/file/not/present"))
def test_roll_file_no_write_perms(self):
"""
try to roll logs when not enough perms
"""
with tempfile.TemporaryDirectory() as no_perms_dir:
# no_perms_dir = "/tmp/opt/app/snmptrap/no_perms"
no_perms_file = "test.dat"
no_perms_fp = no_perms_dir + "/" + no_perms_file
# required directory tree
#try:
# Path(no_perms_dir).mkdir(parents=True, exist_ok=True)
# os.chmod(no_perms_dir, 0o700)
#except Exception as e:
# self.fail("Error while running %s : %s" % (os.path.basename(__file__), str(e.strerror)))
# create empty file
open(no_perms_fp, "w").close()
os.chmod(no_perms_dir, 0o555)
# try to roll file in dir with no write perms
self.assertFalse(trapd_io.roll_file(no_perms_fp))
# the file should still be there
open(no_perms_fp).close()
# allow the directory to be removed
os.chmod(no_perms_dir, 0o700)
def test_open_file_exists(self):
"""
Test file open in directory present
"""
# create copy of snmptrapd.json for pytest
test_file = "/tmp/snmptrap_pytest"
# try to roll logs when not open
result = trapd_io.open_file(test_file)
self.assertTrue(str(result).startswith("<_io.TextIOWrapper name="))
self.assertIsInstance(result, io.TextIOWrapper)
def test_open_file_exists_does_not_exist(self):
"""
Test file open in directory present
"""
# create copy of snmptrapd.json for pytest
test_file = "/tmp/no_such_dir/snmptrap_pytest"
# try to open file when dir not present
with self.assertRaises(SystemExit):
result = trapd_io.open_file(test_file)
def test_close_file_exists(self):
"""
Test closing a file that's present
"""
# create copy of snmptrapd.json for pytest
test_file_name = "/tmp/snmptrap_pytest"
test_file = trapd_io.open_file(test_file_name)
# close active file
self.assertTrue(trapd_io.close_file(test_file, test_file_name))
def test_close_file_does_not_exist(self):
"""
Test closing non-existent file
"""
# try to roll logs when not open
self.assertFalse(trapd_io.close_file(None, None))
def test_ecomp_logger_type_error(self):
"""
test trapd_io.ecomp_logger
"""
trapd_io.open_eelf_logs()
msg = "this is a test"
self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
def test_ecomp_logger_type_error_bad_fd(self):
"""
test trapd_io.ecomp_logger, but write() throws
"""
trapd_io.open_eelf_logs()
msg = "this is a test"
# the following SHOULD be done with a context patch
sv_eelf_error_fd = tds.eelf_error_fd
tds.eelf_error_fd = test_trapd_io.WriteThrows()
self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_ERROR, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
tds.eelf_error_fd = sv_eelf_error_fd
def test_ecomp_logger_type_unknown_bad_fd(self):
"""
test trapd_io.ecomp_logger, unknown type, but write() throws
"""
trapd_io.open_eelf_logs()
msg = "this is a test"
# the following SHOULD be done with a context patch
sv_eelf_error_fd = tds.eelf_error_fd
tds.eelf_error_fd = test_trapd_io.WriteThrows()
self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
tds.eelf_error_fd = sv_eelf_error_fd
def test_ecomp_logger_type_metrics(self):
"""
test trapd_io.ecomp_logger to metrics
"""
trapd_io.open_eelf_logs()
msg = "this is a test"
self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
def test_ecomp_logger_type_metrics_bad_fd(self):
"""
test trapd_io.ecomp_logger to metrics, but write() throws
"""
trapd_io.open_eelf_logs()
msg = "this is a test"
# the following SHOULD be done with a context patch
sv_eelf_metrics_fd = tds.eelf_metrics_fd
tds.eelf_metrics_fd = test_trapd_io.WriteThrows()
self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_METRICS, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
tds.eelf_metrics_fd = sv_eelf_metrics_fd
def test_ecomp_logger_type_audit(self):
"""
test trapd_io.ecomp_logger to audit log
"""
trapd_io.open_eelf_logs()
msg = "this is a test"
self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
def test_ecomp_logger_type_audit_bad_fd(self):
"""
test trapd_io.ecomp_logger to audit log, but write() throws
"""
trapd_io.open_eelf_logs()
msg = "this is a test"
# the following SHOULD be done with a context patch
sv_eelf_audit_fd = tds.eelf_audit_fd
tds.eelf_audit_fd = test_trapd_io.WriteThrows()
self.assertTrue(trapd_io.ecomp_logger(tds.LOG_TYPE_AUDIT, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
tds.eelf_audit_fd = sv_eelf_audit_fd
def test_ecomp_logger_type_unknown(self):
"""
test trapd_io.ecomp_logger
"""
trapd_io.open_eelf_logs()
msg = "this is a test"
self.assertFalse(trapd_io.ecomp_logger(-1, tds.SEV_ERROR, tds.CODE_GENERAL, msg))
if __name__ == "__main__": # pragma: no cover
unittest.main()