| #!/usr/bin/env python3 |
| |
| import unittest |
| from framework import VppTestCase, VppTestRunner |
| from util import ppp |
| from scapy.packet import Raw |
| from scapy.layers.inet import IP, UDP |
| from syslog_rfc5424_parser import SyslogMessage, ParseError |
| from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity |
| from vpp_papi import VppEnum |
| |
| |
| class TestSyslog(VppTestCase): |
| """Syslog Protocol Test Cases""" |
| |
| @property |
| def SYSLOG_SEVERITY(self): |
| return VppEnum.vl_api_syslog_severity_t |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestSyslog, cls).setUpClass() |
| |
| try: |
| (cls.pg0,) = cls.create_pg_interfaces(range(1)) |
| cls.pg0.admin_up() |
| cls.pg0.config_ip4() |
| cls.pg0.resolve_arp() |
| |
| except Exception: |
| super(TestSyslog, cls).tearDownClass() |
| raise |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestSyslog, cls).tearDownClass() |
| |
| def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None): |
| """ |
| Generate syslog message |
| |
| :param facility: facility value |
| :param severity: severity level |
| :param appname: application name that originate message |
| :param msgid: message identifier |
| :param sd: structured data (optional) |
| :param msg: free-form message (optional) |
| """ |
| facility_str = [ |
| "kernel", |
| "user-level", |
| "mail-system", |
| "system-daemons", |
| "security-authorization", |
| "syslogd", |
| "line-printer", |
| "network-news", |
| "uucp", |
| "clock-daemon", |
| "", |
| "ftp-daemon", |
| "ntp-subsystem", |
| "log-audit", |
| "log-alert", |
| "", |
| "local0", |
| "local1", |
| "local2", |
| "local3", |
| "local4", |
| "local5", |
| "local6", |
| "local7", |
| ] |
| |
| severity_str = [ |
| "emergency", |
| "alert", |
| "critical", |
| "error", |
| "warning", |
| "notice", |
| "informational", |
| "debug", |
| ] |
| |
| cli_str = "test syslog %s %s %s %s" % ( |
| facility_str[facility], |
| severity_str[severity], |
| appname, |
| msgid, |
| ) |
| if sd is not None: |
| for sd_id, sd_params in sd.items(): |
| cli_str += " sd-id %s" % (sd_id) |
| for name, value in sd_params.items(): |
| cli_str += " sd-param %s %s" % (name, value) |
| if msg is not None: |
| cli_str += " %s" % (msg) |
| self.vapi.cli(cli_str) |
| |
| def syslog_verify( |
| self, data, facility, severity, appname, msgid, sd=None, msg=None |
| ): |
| """ |
| Verify syslog message |
| |
| :param data: syslog message |
| :param facility: facility value |
| :param severity: severity level |
| :param appname: application name that originate message |
| :param msgid: message identifier |
| :param sd: structured data (optional) |
| :param msg: free-form message (optional) |
| """ |
| message = data.decode("utf-8") |
| if sd is None: |
| sd = {} |
| try: |
| message = SyslogMessage.parse(message) |
| except ParseError as e: |
| self.logger.error(e) |
| raise |
| else: |
| self.assertEqual(message.facility, facility) |
| self.assertEqual(message.severity, severity) |
| self.assertEqual(message.appname, appname) |
| self.assertEqual(message.msgid, msgid) |
| self.assertEqual(message.msg, msg) |
| self.assertEqual(message.sd, sd) |
| self.assertEqual(message.version, 1) |
| self.assertEqual(message.hostname, self.pg0.local_ip4) |
| |
| def test_syslog(self): |
| """Syslog Protocol test""" |
| self.vapi.syslog_set_sender( |
| src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4 |
| ) |
| config = self.vapi.syslog_get_sender() |
| self.assertEqual(str(config.collector_address), self.pg0.remote_ip4) |
| self.assertEqual(config.collector_port, 514) |
| self.assertEqual(str(config.src_address), self.pg0.local_ip4) |
| self.assertEqual(config.vrf_id, 0) |
| self.assertEqual(config.max_msg_size, 480) |
| |
| appname = "test" |
| msgid = "testMsg" |
| msg = "this is message" |
| sd1 = { |
| "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"} |
| } |
| sd2 = { |
| "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"}, |
| "examplePriority@32473": {"class": "high"}, |
| } |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.syslog_generate( |
| SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg |
| ) |
| capture = self.pg0.get_capture(1) |
| try: |
| self.assertEqual(capture[0][IP].src, self.pg0.local_ip4) |
| self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4) |
| self.assertEqual(capture[0][UDP].dport, 514) |
| self.assert_packet_checksums_valid(capture[0], False) |
| except: |
| self.logger.error(ppp("invalid packet:", capture[0])) |
| raise |
| self.syslog_verify( |
| capture[0][Raw].load, |
| SyslogFacility.local7, |
| SyslogSeverity.info, |
| appname, |
| msgid, |
| None, |
| msg, |
| ) |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN) |
| filter = self.vapi.syslog_get_filter() |
| self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN) |
| self.syslog_generate( |
| SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg |
| ) |
| self.pg0.assert_nothing_captured() |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.syslog_generate( |
| SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg |
| ) |
| capture = self.pg0.get_capture(1) |
| self.syslog_verify( |
| capture[0][Raw].load, |
| SyslogFacility.local6, |
| SyslogSeverity.warning, |
| appname, |
| msgid, |
| sd1, |
| msg, |
| ) |
| |
| self.vapi.syslog_set_sender( |
| self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345 |
| ) |
| config = self.vapi.syslog_get_sender() |
| self.assertEqual(config.collector_port, 12345) |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.syslog_generate( |
| SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None |
| ) |
| capture = self.pg0.get_capture(1) |
| try: |
| self.assertEqual(capture[0][UDP].dport, 12345) |
| except: |
| self.logger.error(ppp("invalid packet:", capture[0])) |
| raise |
| self.syslog_verify( |
| capture[0][Raw].load, |
| SyslogFacility.local5, |
| SyslogSeverity.err, |
| appname, |
| msgid, |
| sd2, |
| None, |
| ) |
| |
| |
| if __name__ == "__main__": |
| unittest.main(testRunner=VppTestRunner) |