CSIT-1139: Implement parallel test execution
The implementation of parallel test execution in VPP Test Framework.
- VPPTestCase test methods are grouped together
- tests are running in separate processes
- VPP instances spawned by tests are assigned to different cores
- output from these processes is redirected through pipes and printed
out testcase by testcase
- TEST_JOBS env var is used to specify the number of parallel processes
- improved test summary
- a bit of code cleanup
Change-Id: I9ca93904d9fe2c3daf980500c64a8611838ae28c
Signed-off-by: juraj.linkes <juraj.linkes@pantheon.tech>
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index 4afb616..d5ad837 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -63,7 +63,7 @@
def vpp_atexit(vpp_weakref):
"""Clean up VPP connection on shutdown."""
vpp_instance = vpp_weakref()
- if vpp_instance.connected:
+ if vpp_instance and vpp_instance.connected:
vpp_instance.logger.debug('Cleaning up VPP on exit')
vpp_instance.disconnect()
diff --git a/test/Makefile b/test/Makefile
index 8af94f9..0338062 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -21,6 +21,12 @@
FORCE_FOREGROUND=1
else ifeq ($(DEBUG),core)
FORCE_FOREGROUND=1
+else ifeq ($(STEP),yes)
+FORCE_FOREGROUND=1
+else ifeq ($(STEP),y)
+FORCE_FOREGROUND=1
+else ifeq ($(STEP),1)
+FORCE_FOREGROUND=1
else
FORCE_FOREGROUND=0
endif
@@ -121,8 +127,25 @@
SANITY_RUN_VPP_CMD=source $(PYTHON_VENV_PATH)/bin/activate && python sanity_run_vpp.py
endif
+ifndef TEST_JOBS
+PARALLEL_ILLEGAL=0
+else ifeq ($(FORCE_FOREGROUND),0)
+PARALLEL_ILLEGAL=0
+else ifeq ($(TEST_JOBS),auto)
+PARALLEL_ILLEGAL=0
+else ifeq ($(TEST_JOBS),1)
+PARALLEL_ILLEGAL=0
+else
+PARALLEL_ILLEGAL=1
+endif
+
sanity: verify-no-running-vpp
@sys_req/dev_shm_size.sh
+ @bash -c "test $(PARALLEL_ILLEGAL) -eq 0 ||\
+ (echo \"*******************************************************************\" &&\
+ echo \"* Sanity check failed, TEST_JOBS is not 1 or 'auto' and DEBUG, STEP or PROFILE is set\" &&\
+ echo \"*******************************************************************\" &&\
+ false)"
@bash -c "$(SANITY_IMPORT_VPP_PAPI_CMD) ||\
(echo \"*******************************************************************\" &&\
echo \"* Sanity check failed, cannot import vpp_papi\" &&\
@@ -236,18 +259,19 @@
@echo ""
@echo "Arguments controlling test runs:"
@echo " V=[0|1|2] - set test verbosity level"
+ @echo " TEST_JOBS=[<n>|auto] - use <n> parallel processes for test execution or automatic discovery of maximum acceptable processes (default: 1)"
@echo " CACHE_OUTPUT=[0|1] - cache VPP stdout/stderr and log as one block after test finishes (default: 1)"
@echo " FAILFAST=[0|1] - fail fast if 1, complete all tests if 0"
- @echo " TIMEOUT=<timeout> - fail test suite if any single test takes longer than <timeout> (in seconds) to finish"
+ @echo " TIMEOUT=<timeout> - fail test suite if any single test takes longer than <timeout> (in seconds) to finish (default: 600)"
@echo " RETRIES=<n> - retry failed tests <n> times"
@echo " DEBUG=<type> - set VPP debugging kind"
@echo " DEBUG=core - detect coredump and load it in gdb on crash"
- @echo " DEBUG=gdb - allow easy debugging by printing VPP PID "
- @echo " and waiting for user input before running "
+ @echo " DEBUG=gdb - allow easy debugging by printing VPP PID"
+ @echo " and waiting for user input before running"
@echo " and tearing down a testcase"
- @echo " DEBUG=gdbserver - run gdb inside a gdb server, otherwise "
+ @echo " DEBUG=gdbserver - run gdb inside a gdb server, otherwise"
@echo " same as above"
- @echo " STEP=[yes|no] - ease debugging by stepping through a testcase "
+ @echo " STEP=[yes|no] - ease debugging by stepping through a testcase"
@echo " SANITY=[yes|no] - perform sanity import of vpp-api/sanity vpp run before running tests (default: yes)"
@echo " EXTENDED_TESTS=[1|y] - run extended tests"
@echo " TEST=<filter> - filter the set of tests:"
diff --git a/test/framework.py b/test/framework.py
index 4f7c76a..6a5477d 100644
--- a/test/framework.py
+++ b/test/framework.py
@@ -11,6 +11,7 @@
import faulthandler
import random
import copy
+import psutil
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
@@ -212,6 +213,35 @@
raise Exception("Unrecognized DEBUG option: '%s'" % d)
@classmethod
+ def get_least_used_cpu(self):
+ cpu_usage_list = [set(range(psutil.cpu_count()))]
+ vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
+ if 'vpp_main' == p.info['name']]
+ for vpp_process in vpp_processes:
+ for cpu_usage_set in cpu_usage_list:
+ try:
+ cpu_num = vpp_process.cpu_num()
+ if cpu_num in cpu_usage_set:
+ cpu_usage_set_index = cpu_usage_list.index(
+ cpu_usage_set)
+ if cpu_usage_set_index == len(cpu_usage_list) - 1:
+ cpu_usage_list.append({cpu_num})
+ else:
+ cpu_usage_list[cpu_usage_set_index + 1].add(
+ cpu_num)
+ cpu_usage_set.remove(cpu_num)
+ break
+ except psutil.NoSuchProcess:
+ pass
+
+ for cpu_usage_set in cpu_usage_list:
+ if len(cpu_usage_set) > 0:
+ min_usage_set = cpu_usage_set
+ break
+
+ return random.choice(tuple(min_usage_set))
+
+ @classmethod
def setUpConstants(cls):
""" Set-up the test case class based on environment variables """
s = os.getenv("STEP", "n")
@@ -241,10 +271,14 @@
coredump_size = "coredump-size %s" % size
if coredump_size is None:
coredump_size = "coredump-size unlimited"
+
+ cpu_core_number = cls.get_least_used_cpu()
+
cls.vpp_cmdline = [cls.vpp_bin, "unix",
"{", "nodaemon", debug_cli, "full-coredump",
coredump_size, "}", "api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.shm_prefix, "}",
+ "cpu", "{", "main-core", str(cpu_core_number), "}",
"plugins", "{", "plugin", "dpdk_plugin.so", "{",
"disable", "}", "plugin", "unittest_plugin.so",
"{", "enable", "}", "}", ]
@@ -310,7 +344,10 @@
"""
gc.collect() # run garbage collection first
random.seed()
- cls.logger = getLogger(cls.__name__)
+ if not hasattr(cls, 'logger'):
+ cls.logger = getLogger(cls.__name__)
+ else:
+ cls.logger.name = cls.__name__
cls.tempdir = tempfile.mkdtemp(
prefix='vpp-unittest-%s-' % cls.__name__)
cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir)
@@ -319,7 +356,7 @@
datefmt="%H:%M:%S"))
cls.file_handler.setLevel(DEBUG)
cls.logger.addHandler(cls.file_handler)
- cls.shm_prefix = cls.tempdir.split("/")[-1]
+ cls.shm_prefix = os.path.basename(cls.tempdir)
os.chdir(cls.tempdir)
cls.logger.info("Temporary dir is %s, shm prefix is %s",
cls.tempdir, cls.shm_prefix)
@@ -392,8 +429,11 @@
raw_input("When done debugging, press ENTER to kill the "
"process and finish running the testcase...")
- cls.pump_thread_stop_flag.set()
- os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
+ # first signal that we want to stop the pump thread, then wake it up
+ if hasattr(cls, 'pump_thread_stop_flag'):
+ cls.pump_thread_stop_flag.set()
+ if hasattr(cls, 'pump_thread_wakeup_pipe'):
+ os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
if hasattr(cls, 'pump_thread'):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
@@ -859,6 +899,19 @@
return rx
+def get_testcase_doc_name(test):
+ return getdoc(test.__class__).splitlines()[0]
+
+
+def get_test_description(descriptions, test):
+ # TODO: if none print warning not raise exception
+ short_description = test.shortDescription()
+ if descriptions and short_description:
+ return short_description
+ else:
+ return str(test)
+
+
class TestCasePrinter(object):
_shared_state = {}
@@ -870,7 +923,7 @@
def print_test_case_heading_if_first_time(self, case):
if case.__class__ not in self._test_case_set:
print(double_line_delim)
- print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW))
+ print(colorize(get_testcase_doc_name(case), GREEN))
print(double_line_delim)
self._test_case_set.add(case.__class__)
@@ -944,41 +997,22 @@
if hasattr(test, 'tempdir'):
try:
failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
- link_path = '%s/%s-FAILED' % (failed_dir,
- test.tempdir.split("/")[-1])
+ link_path = os.path.join(failed_dir, '%s-FAILED' %
+ os.path.basename(test.tempdir))
if logger:
logger.debug("creating a link to the failed test")
logger.debug("os.symlink(%s, %s)" %
(test.tempdir, link_path))
- os.symlink(test.tempdir, link_path)
+ if os.path.exists(link_path):
+ if logger:
+ logger.debug('symlink already exists')
+ else:
+ os.symlink(test.tempdir, link_path)
+
except Exception as e:
if logger:
logger.error(e)
- def send_failure_through_pipe(self, test):
- if hasattr(self, 'test_framework_failed_pipe'):
- pipe = self.test_framework_failed_pipe
- if pipe:
- if test.__class__.__name__ == "_ErrorHolder":
- x = str(test)
- if x.startswith("setUpClass"):
- # x looks like setUpClass (test_function.test_class)
- cls = x.split(".")[1].split(")")[0]
- for t in self.test_suite:
- if t.__class__.__name__ == cls:
- pipe.send(t.__class__)
- break
- else:
- raise Exception("Can't find class name `%s' "
- "(from ErrorHolder) in test suite "
- "`%s'" % (cls, self.test_suite))
- else:
- raise Exception("FIXME: unexpected special case - "
- "ErrorHolder description is `%s'" %
- str(test))
- else:
- pipe.send(test.__class__)
-
def addFailure(self, test, err):
"""
Record a test failed result
@@ -1002,8 +1036,6 @@
else:
self.result_string = colorize("FAIL", RED) + ' [no temp dir]'
- self.send_failure_through_pipe(test)
-
def addError(self, test, err):
"""
Record a test error result
@@ -1027,8 +1059,6 @@
else:
self.result_string = colorize("ERROR", RED) + ' [no temp dir]'
- self.send_failure_through_pipe(test)
-
def getDescription(self, test):
"""
Get test description
@@ -1037,12 +1067,7 @@
:returns: test description
"""
- # TODO: if none print warning not raise exception
- short_description = test.shortDescription()
- if self.descriptions and short_description:
- return short_description
- else:
- return str(test)
+ return get_test_description(self.descriptions, test)
def startTest(self, test):
"""
@@ -1100,22 +1125,6 @@
self.stream.writeln("%s" % err)
-class Filter_by_test_option:
- def __init__(self, filter_file_name, filter_class_name, filter_func_name):
- self.filter_file_name = filter_file_name
- self.filter_class_name = filter_class_name
- self.filter_func_name = filter_func_name
-
- def __call__(self, file_name, class_name, func_name):
- if self.filter_file_name and file_name != self.filter_file_name:
- return False
- if self.filter_class_name and class_name != self.filter_class_name:
- return False
- if self.filter_func_name and func_name != self.filter_func_name:
- return False
- return True
-
-
class VppTestRunner(unittest.TextTestRunner):
"""
A basic test runner implementation which prints results to standard error.
@@ -1125,9 +1134,8 @@
"""Class maintaining the results of the tests"""
return VppTestResult
- def __init__(self, keep_alive_pipe=None, failed_pipe=None,
- stream=sys.stderr, descriptions=True,
- verbosity=1, failfast=False, buffer=False, resultclass=None):
+ def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
+ failfast=False, buffer=False, resultclass=None):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
super(VppTestRunner, self).__init__(sys.stdout, descriptions,
@@ -1135,63 +1143,6 @@
resultclass)
reporter = KeepAliveReporter()
reporter.pipe = keep_alive_pipe
- # this is super-ugly, but very simple to implement and works as long
- # as we run only one test at the same time
- VppTestResult.test_framework_failed_pipe = failed_pipe
-
- test_option = "TEST"
-
- def parse_test_option(self):
- f = os.getenv(self.test_option, None)
- filter_file_name = None
- filter_class_name = None
- filter_func_name = None
- if f:
- if '.' in f:
- parts = f.split('.')
- if len(parts) > 3:
- raise Exception("Unrecognized %s option: %s" %
- (self.test_option, f))
- if len(parts) > 2:
- if parts[2] not in ('*', ''):
- filter_func_name = parts[2]
- if parts[1] not in ('*', ''):
- filter_class_name = parts[1]
- if parts[0] not in ('*', ''):
- if parts[0].startswith('test_'):
- filter_file_name = parts[0]
- else:
- filter_file_name = 'test_%s' % parts[0]
- else:
- if f.startswith('test_'):
- filter_file_name = f
- else:
- filter_file_name = 'test_%s' % f
- return filter_file_name, filter_class_name, filter_func_name
-
- @staticmethod
- def filter_tests(tests, filter_cb):
- result = unittest.suite.TestSuite()
- for t in tests:
- if isinstance(t, unittest.suite.TestSuite):
- # this is a bunch of tests, recursively filter...
- x = VppTestRunner.filter_tests(t, filter_cb)
- if x.countTestCases() > 0:
- result.addTest(x)
- elif isinstance(t, unittest.TestCase):
- # this is a single test
- parts = t.id().split('.')
- # t.id() for common cases like this:
- # test_classifier.TestClassifier.test_acl_ip
- # apply filtering only if it is so
- if len(parts) == 3:
- if not filter_cb(parts[0], parts[1], parts[2]):
- continue
- result.addTest(t)
- else:
- # unexpected object, don't touch it
- result.addTest(t)
- return result
def run(self, test):
"""
@@ -1201,20 +1152,9 @@
"""
faulthandler.enable() # emit stack trace to stderr if killed by signal
- print("Running tests using custom test runner") # debug message
- filter_file, filter_class, filter_func = self.parse_test_option()
- print("Active filters: file=%s, class=%s, function=%s" % (
- filter_file, filter_class, filter_func))
- filter_cb = Filter_by_test_option(
- filter_file, filter_class, filter_func)
- filtered = self.filter_tests(test, filter_cb)
- print("%s out of %s tests match specified filters" % (
- filtered.countTestCases(), test.countTestCases()))
- if not running_extended_tests():
- print("Not running extended tests (some tests will be skipped)")
- # super-ugly hack #2
- VppTestResult.test_suite = filtered
- return super(VppTestRunner, self).run(filtered)
+
+ result = super(VppTestRunner, self).run(test)
+ return result
class Worker(Thread):
diff --git a/test/hook.py b/test/hook.py
index 0e94dcd..d7bfef6 100644
--- a/test/hook.py
+++ b/test/hook.py
@@ -176,19 +176,19 @@
print('%02d.\t%s\t%s:%d\t[%s]' % (counter, e[2], e[0], e[1], e[3]))
counter += 1
print(single_line_delim)
- print("You can enter a number of stack frame chosen from above")
+ print("You may enter a number of stack frame chosen from above")
print("Calls in/below that stack frame will be not be stepped anymore")
print(single_line_delim)
while True:
- choice = sys.stdin.readline(
- "Enter your choice, if any, and press ENTER to continue "
- "running the testcase...")
+ print("Enter your choice, if any, and press ENTER to continue "
+ "running the testcase...")
+ choice = sys.stdin.readline()
if choice == "":
choice = None
try:
if choice is not None:
num = int(choice)
- except TypeError:
+ except ValueError:
print("Invalid input")
continue
if choice is not None and (num < 0 or num >= len(stack)):
diff --git a/test/log.py b/test/log.py
index 1e541d3..aff182e 100644
--- a/test/log.py
+++ b/test/log.py
@@ -38,8 +38,9 @@
log_level = 40
handler = logging.StreamHandler(sys.stdout)
-handler.setFormatter(ColorFormatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
- datefmt="%H:%M:%S"))
+color_formatter = ColorFormatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
+ datefmt="%H:%M:%S")
+handler.setFormatter(color_formatter)
handler.setLevel(log_level)
global_logger = logging.getLogger()
@@ -54,6 +55,16 @@
logger.setLevel(logging.DEBUG)
return logger
+
+def get_parallel_logger(stream):
+ logger = logging.getLogger('parallel_logger_{}'.format(stream))
+ logger.propagate = False
+ handler = logging.StreamHandler(stream)
+ handler.setFormatter(color_formatter)
+ handler.setLevel(log_level)
+ logger.addHandler(handler)
+ return logger
+
# Static variables to store color formatting strings.
#
# These variables (RED, GREEN, YELLOW and LPURPLE) are used to configure
diff --git a/test/run_tests.py b/test/run_tests.py
index 02e4738..aac28d1 100644
--- a/test/run_tests.py
+++ b/test/run_tests.py
@@ -7,10 +7,17 @@
import unittest
import argparse
import time
-from multiprocessing import Process, Pipe
-from framework import VppTestRunner
+import threading
+import signal
+import psutil
+from multiprocessing import Process, Pipe, cpu_count
+from multiprocessing.queues import Queue
+from multiprocessing.managers import BaseManager
+from framework import VppTestRunner, running_extended_tests, VppTestCase, \
+ get_testcase_doc_name, get_test_description
from debug import spawn_gdb
-from log import global_logger
+from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
+ colorize
from discover_tests import discover_tests
from subprocess import check_output, CalledProcessError
from util import check_core_path
@@ -20,29 +27,349 @@
# that child process is stuck (e.g. waiting for shm mutex, which will never
# get unlocked) and kill the child
core_timeout = 3
+min_req_shm = 536870912 # min 512MB shm required
+# 128MB per extra process
+shm_per_process = 134217728
-def test_runner_wrapper(suite, keep_alive_pipe, result_pipe, failed_pipe):
- result = not VppTestRunner(
- keep_alive_pipe=keep_alive_pipe,
- failed_pipe=failed_pipe,
- verbosity=verbose,
- failfast=failfast).run(suite).wasSuccessful()
+class StreamQueue(Queue):
+ def write(self, msg):
+ self.put(msg)
+
+ def flush(self):
+ sys.__stdout__.flush()
+ sys.__stderr__.flush()
+
+ def fileno(self):
+ return self._writer.fileno()
+
+
+class StreamQueueManager(BaseManager):
+ pass
+
+
+StreamQueueManager.register('Queue', StreamQueue)
+
+
+def test_runner_wrapper(suite, keep_alive_pipe, result_pipe, stdouterr_queue,
+ logger):
+ sys.stdout = stdouterr_queue
+ sys.stderr = stdouterr_queue
+ VppTestCase.logger = logger
+ result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
+ descriptions=descriptions,
+ verbosity=verbose,
+ failfast=failfast).run(suite)
result_pipe.send(result)
result_pipe.close()
keep_alive_pipe.close()
- failed_pipe.close()
-class add_to_suite_callback:
- def __init__(self, suite):
- self.suite = suite
+class TestCaseWrapper(object):
+ def __init__(self, testcase_suite, manager):
+ self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
+ duplex=False)
+ self.result_parent_end, self.result_child_end = Pipe(duplex=False)
+ self.testcase_suite = testcase_suite
+ self.stdouterr_queue = manager.Queue()
+ self.logger = get_parallel_logger(self.stdouterr_queue)
+ self.child = Process(target=test_runner_wrapper,
+ args=(testcase_suite, self.keep_alive_child_end,
+ self.result_child_end, self.stdouterr_queue,
+ self.logger)
+ )
+ self.child.start()
+ self.pid = self.child.pid
+ self.last_test_temp_dir = None
+ self.last_test_vpp_binary = None
+ self.last_test = None
+ self.result = None
+ self.last_heard = time.time()
+ self.core_detected_at = None
+ self.failed_tests = []
+
+ def close_pipes(self):
+ self.keep_alive_child_end.close()
+ self.result_child_end.close()
+ self.keep_alive_parent_end.close()
+ self.result_parent_end.close()
+
+
+def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
+ read_testcases):
+ read_testcase = None
+ while read_testcases.is_set() or len(unread_testcases) > 0:
+ if not read_testcase:
+ if len(finished_unread_testcases) > 0:
+ read_testcase = finished_unread_testcases.pop()
+ unread_testcases.remove(read_testcase)
+ elif len(unread_testcases) > 0:
+ read_testcase = unread_testcases.pop()
+ if read_testcase:
+ data = ''
+ while data is not None:
+ sys.stdout.write(data)
+ data = read_testcase.stdouterr_queue.get()
+
+ read_testcase.stdouterr_queue.close()
+ finished_unread_testcases.discard(read_testcase)
+ read_testcase = None
+
+
+def run_forked(testcases):
+ wrapped_testcase_suites = set()
+
+ # suites are unhashable, need to use list
+ results = []
+ debug_core = os.getenv("DEBUG", "").lower() == "core"
+ unread_testcases = set()
+ finished_unread_testcases = set()
+ manager = StreamQueueManager()
+ manager.start()
+ for i in range(concurrent_tests):
+ if len(testcases) > 0:
+ wrapped_testcase_suite = TestCaseWrapper(testcases.pop(0), manager)
+ wrapped_testcase_suites.add(wrapped_testcase_suite)
+ unread_testcases.add(wrapped_testcase_suite)
+ # time.sleep(1)
+ else:
+ break
+
+ read_from_testcases = threading.Event()
+ read_from_testcases.set()
+ stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
+ args=(unread_testcases,
+ finished_unread_testcases,
+ read_from_testcases))
+ stdouterr_thread.start()
+
+ while len(wrapped_testcase_suites) > 0:
+ finished_testcase_suites = set()
+ for wrapped_testcase_suite in wrapped_testcase_suites:
+ readable = select.select(
+ [wrapped_testcase_suite.keep_alive_parent_end.fileno(),
+ wrapped_testcase_suite.result_parent_end.fileno()],
+ [], [], 1)[0]
+ if wrapped_testcase_suite.result_parent_end.fileno() in readable:
+ results.append(
+ (wrapped_testcase_suite.testcase_suite,
+ wrapped_testcase_suite.result_parent_end.recv()))
+ finished_testcase_suites.add(wrapped_testcase_suite)
+ continue
+
+ if wrapped_testcase_suite.keep_alive_parent_end.fileno() \
+ in readable:
+ while wrapped_testcase_suite.keep_alive_parent_end.poll():
+ wrapped_testcase_suite.last_test, \
+ wrapped_testcase_suite.last_test_vpp_binary, \
+ wrapped_testcase_suite.last_test_temp_dir, \
+ wrapped_testcase_suite.vpp_pid = \
+ wrapped_testcase_suite.keep_alive_parent_end.recv()
+ wrapped_testcase_suite.last_heard = time.time()
+
+ fail = False
+ if wrapped_testcase_suite.last_heard + test_timeout < time.time() \
+ and not os.path.isfile(
+ "%s/_core_handled" %
+ wrapped_testcase_suite.last_test_temp_dir):
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Timeout while waiting for child test "
+ "runner process (last test running was "
+ "`%s' in `%s')!" %
+ (wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir))
+ elif not wrapped_testcase_suite.child.is_alive():
+ fail = True
+ wrapped_testcase_suite.logger.critical(
+ "Child python process unexpectedly died "
+ "(last test running was `%s' in `%s')!" %
+ (wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir))
+ elif wrapped_testcase_suite.last_test_temp_dir and \
+ wrapped_testcase_suite.last_test_vpp_binary:
+ core_path = "%s/core" % \
+ wrapped_testcase_suite.last_test_temp_dir
+ if os.path.isfile(core_path):
+ if wrapped_testcase_suite.core_detected_at is None:
+ wrapped_testcase_suite.core_detected_at = time.time()
+ elif wrapped_testcase_suite.core_detected_at + \
+ core_timeout < time.time():
+ if not os.path.isfile(
+ "%s/_core_handled" %
+ wrapped_testcase_suite.
+ last_test_temp_dir):
+ wrapped_testcase_suite.logger.critical(
+ "Child python process unresponsive and core-"
+ "file exists in test temporary directory!")
+ fail = True
+
+ if fail:
+ failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
+ lttd = os.path.basename(
+ wrapped_testcase_suite.last_test_temp_dir)
+ link_path = '%s%s-FAILED' % (failed_dir, lttd)
+ wrapped_testcase_suite.logger.error(
+ "Creating a link to the failed test: %s -> %s" %
+ (link_path, lttd))
+ if not os.path.exists(link_path):
+ os.symlink(wrapped_testcase_suite.last_test_temp_dir,
+ link_path)
+ api_post_mortem_path = "/tmp/api_post_mortem.%d" % \
+ wrapped_testcase_suite.vpp_pid
+ if os.path.isfile(api_post_mortem_path):
+ wrapped_testcase_suite.logger.error(
+ "Copying api_post_mortem.%d to %s" %
+ (wrapped_testcase_suite.vpp_pid,
+ wrapped_testcase_suite.last_test_temp_dir))
+ shutil.copy2(api_post_mortem_path,
+ wrapped_testcase_suite.last_test_temp_dir)
+ if wrapped_testcase_suite.last_test_temp_dir and \
+ wrapped_testcase_suite.last_test_vpp_binary:
+ core_path = "%s/core" % \
+ wrapped_testcase_suite.last_test_temp_dir
+ if os.path.isfile(core_path):
+ wrapped_testcase_suite.logger.error(
+ "Core-file exists in test temporary directory: %s!"
+ % core_path)
+ check_core_path(wrapped_testcase_suite.logger,
+ core_path)
+ wrapped_testcase_suite.logger.debug(
+ "Running `file %s':" % core_path)
+ try:
+ info = check_output(["file", core_path])
+ wrapped_testcase_suite.logger.debug(info)
+ except CalledProcessError as e:
+ wrapped_testcase_suite.logger.error(
+ "Could not run `file' utility on core-file, "
+ "rc=%s" % e.returncode)
+ pass
+ if debug_core:
+ spawn_gdb(
+ wrapped_testcase_suite.last_test_vpp_binary,
+ core_path, wrapped_testcase_suite.logger)
+ wrapped_testcase_suite.child.terminate()
+ try:
+ # terminating the child process tends to leave orphan
+ # VPP process around
+ os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
+ except OSError:
+ # already dead
+ pass
+ results.append((wrapped_testcase_suite.testcase_suite, None))
+ finished_testcase_suites.add(wrapped_testcase_suite)
+
+ for finished_testcase in finished_testcase_suites:
+ finished_testcase.child.join()
+ finished_testcase.close_pipes()
+ wrapped_testcase_suites.remove(finished_testcase)
+ finished_unread_testcases.add(finished_testcase)
+ finished_testcase.stdouterr_queue.put(None)
+ if len(testcases) > 0:
+ new_testcase = TestCaseWrapper(testcases.pop(0), manager)
+ wrapped_testcase_suites.add(new_testcase)
+ unread_testcases.add(new_testcase)
+
+ read_from_testcases.clear()
+ stdouterr_thread.join(test_timeout)
+ manager.shutdown()
+ return results
+
+
+class SplitToSuitesCallback:
+ def __init__(self, filter_callback):
+ self.suites = {}
+ self.suite_name = 'default'
+ self.filter_callback = filter_callback
+ self.filtered = unittest.TestSuite()
def __call__(self, file_name, cls, method):
- suite.addTest(cls(method))
+ test_method = cls(method)
+ if self.filter_callback(file_name, cls.__name__, method):
+ self.suite_name = file_name + cls.__name__
+ if self.suite_name not in self.suites:
+ self.suites[self.suite_name] = unittest.TestSuite()
+ self.suites[self.suite_name].addTest(test_method)
+
+ else:
+ self.filtered.addTest(test_method)
-class Filter_by_class_list:
+test_option = "TEST"
+
+
+def parse_test_option():
+ f = os.getenv(test_option, None)
+ filter_file_name = None
+ filter_class_name = None
+ filter_func_name = None
+ if f:
+ if '.' in f:
+ parts = f.split('.')
+ if len(parts) > 3:
+ raise Exception("Unrecognized %s option: %s" %
+ (test_option, f))
+ if len(parts) > 2:
+ if parts[2] not in ('*', ''):
+ filter_func_name = parts[2]
+ if parts[1] not in ('*', ''):
+ filter_class_name = parts[1]
+ if parts[0] not in ('*', ''):
+ if parts[0].startswith('test_'):
+ filter_file_name = parts[0]
+ else:
+ filter_file_name = 'test_%s' % parts[0]
+ else:
+ if f.startswith('test_'):
+ filter_file_name = f
+ else:
+ filter_file_name = 'test_%s' % f
+ if filter_file_name:
+ filter_file_name = '%s.py' % filter_file_name
+ return filter_file_name, filter_class_name, filter_func_name
+
+
+def filter_tests(tests, filter_cb):
+ result = unittest.suite.TestSuite()
+ for t in tests:
+ if isinstance(t, unittest.suite.TestSuite):
+ # this is a bunch of tests, recursively filter...
+ x = filter_tests(t, filter_cb)
+ if x.countTestCases() > 0:
+ result.addTest(x)
+ elif isinstance(t, unittest.TestCase):
+ # this is a single test
+ parts = t.id().split('.')
+ # t.id() for common cases like this:
+ # test_classifier.TestClassifier.test_acl_ip
+ # apply filtering only if it is so
+ if len(parts) == 3:
+ if not filter_cb(parts[0], parts[1], parts[2]):
+ continue
+ result.addTest(t)
+ else:
+ # unexpected object, don't touch it
+ result.addTest(t)
+ return result
+
+
+class FilterByTestOption:
+ def __init__(self, filter_file_name, filter_class_name, filter_func_name):
+ self.filter_file_name = filter_file_name
+ self.filter_class_name = filter_class_name
+ self.filter_func_name = filter_func_name
+
+ def __call__(self, file_name, class_name, func_name):
+ if self.filter_file_name and file_name != self.filter_file_name:
+ return False
+ if self.filter_class_name and class_name != self.filter_class_name:
+ return False
+ if self.filter_func_name and func_name != self.filter_func_name:
+ return False
+ return True
+
+
+class FilterByClassList:
def __init__(self, class_list):
self.class_list = class_list
@@ -51,174 +378,320 @@
def suite_from_failed(suite, failed):
- filter_cb = Filter_by_class_list(failed)
- suite = VppTestRunner.filter_tests(suite, filter_cb)
- if 0 == suite.countTestCases():
- raise Exception("Suite is empty after filtering out the failed tests!")
+ filter_cb = FilterByClassList(failed)
+ suite = filter_tests(suite, filter_cb)
return suite
-def run_forked(suite):
- keep_alive_parent_end, keep_alive_child_end = Pipe(duplex=False)
- result_parent_end, result_child_end = Pipe(duplex=False)
- failed_parent_end, failed_child_end = Pipe(duplex=False)
+class NonPassedResults(dict):
+ def __init__(self):
+ super(NonPassedResults, self).__init__()
+ self.all_testcases = 0
+ self.results_per_suite = {}
+ self.failures_id = 'failures'
+ self.errors_id = 'errors'
+ self.crashes_id = 'crashes'
+ self.skipped_id = 'skipped'
+ self.expectedFailures_id = 'expectedFailures'
+ self.unexpectedSuccesses_id = 'unexpectedSuccesses'
+ self.rerun = []
+ self[self.failures_id] = 0
+ self[self.errors_id] = 0
+ self[self.crashes_id] = 0
+ self[self.skipped_id] = 0
+ self[self.expectedFailures_id] = 0
+ self[self.unexpectedSuccesses_id] = 0
- child = Process(target=test_runner_wrapper,
- args=(suite, keep_alive_child_end, result_child_end,
- failed_child_end))
- child.start()
- last_test_temp_dir = None
- last_test_vpp_binary = None
- last_test = None
- result = None
- failed = set()
- last_heard = time.time()
- core_detected_at = None
- debug_core = os.getenv("DEBUG", "").lower() == "core"
- while True:
- readable = select.select([keep_alive_parent_end.fileno(),
- result_parent_end.fileno(),
- failed_parent_end.fileno(),
- ],
- [], [], 1)[0]
- if result_parent_end.fileno() in readable:
- result = result_parent_end.recv()
- break
- if keep_alive_parent_end.fileno() in readable:
- while keep_alive_parent_end.poll():
- last_test, last_test_vpp_binary,\
- last_test_temp_dir, vpp_pid = keep_alive_parent_end.recv()
- last_heard = time.time()
- if failed_parent_end.fileno() in readable:
- while failed_parent_end.poll():
- failed_test = failed_parent_end.recv()
- failed.add(failed_test.__name__)
- last_heard = time.time()
- fail = False
- if last_heard + test_timeout < time.time() and \
- not os.path.isfile("%s/_core_handled" % last_test_temp_dir):
- fail = True
- global_logger.critical("Timeout while waiting for child test "
- "runner process (last test running was "
- "`%s' in `%s')!" %
- (last_test, last_test_temp_dir))
- elif not child.is_alive():
- fail = True
- global_logger.critical("Child python process unexpectedly died "
- "(last test running was `%s' in `%s')!" %
- (last_test, last_test_temp_dir))
- elif last_test_temp_dir and last_test_vpp_binary:
- core_path = "%s/core" % last_test_temp_dir
- if os.path.isfile(core_path):
- if core_detected_at is None:
- core_detected_at = time.time()
- elif core_detected_at + core_timeout < time.time():
- if not os.path.isfile(
- "%s/_core_handled" % last_test_temp_dir):
- global_logger.critical(
- "Child python process unresponsive and core-file "
- "exists in test temporary directory!")
- fail = True
+ def _add_result(self, test, result_id):
+ if isinstance(test, VppTestCase):
+ parts = test.id().split('.')
+ if len(parts) == 3:
+ tc_class = get_testcase_doc_name(test)
+ if tc_class not in self.results_per_suite:
+ # failed, errored, skipped, expectedly failed,
+ # unexpectedly passed
+ self.results_per_suite[tc_class] = \
+ {self.failures_id: [],
+ self.errors_id: [],
+ self.crashes_id: [],
+ self.skipped_id: [],
+ self.expectedFailures_id: [],
+ self.unexpectedSuccesses_id: []}
+ self.results_per_suite[tc_class][result_id].append(test)
+ return True
+ return False
- if fail:
- failed_dir = os.getenv('VPP_TEST_FAILED_DIR')
- lttd = last_test_temp_dir.split("/")[-1]
- link_path = '%s%s-FAILED' % (failed_dir, lttd)
- global_logger.error("Creating a link to the failed " +
- "test: %s -> %s" % (link_path, lttd))
- try:
- os.symlink(last_test_temp_dir, link_path)
- except Exception:
- pass
- api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
- if os.path.isfile(api_post_mortem_path):
- global_logger.error("Copying api_post_mortem.%d to %s" %
- (vpp_pid, last_test_temp_dir))
- shutil.copy2(api_post_mortem_path, last_test_temp_dir)
- if last_test_temp_dir and last_test_vpp_binary:
- core_path = "%s/core" % last_test_temp_dir
- if os.path.isfile(core_path):
- global_logger.error("Core-file exists in test temporary "
- "directory: %s!" % core_path)
- check_core_path(global_logger, core_path)
- global_logger.debug("Running `file %s':" % core_path)
- try:
- info = check_output(["file", core_path])
- global_logger.debug(info)
- except CalledProcessError as e:
- global_logger.error(
- "Could not run `file' utility on core-file, "
- "rc=%s" % e.returncode)
- pass
- if debug_core:
- spawn_gdb(last_test_vpp_binary, core_path,
- global_logger)
- child.terminate()
- result = -1
- break
- keep_alive_parent_end.close()
- result_parent_end.close()
- failed_parent_end.close()
- return result, failed
+ def add_results(self, testcases, testcase_result,
+ duplicates=None):
+ for failed_testcase, _ in testcases:
+ if self._add_result(failed_testcase, testcase_result):
+ if duplicates:
+ if failed_testcase not in duplicates:
+ self[testcase_result] += 1
+ else:
+ self[testcase_result] += 1
+
+ def add_result(self, testcase_suite, result):
+ retval = 0
+ self.all_testcases += testcase_suite.countTestCases()
+ if result:
+ # suite finished properly
+ if not result.wasSuccessful():
+ retval = 1
+
+ self.add_results(result.failures, self.failures_id)
+ self.add_results(result.errors, self.errors_id,
+ result.failures + result.errors)
+ self.add_results(result.skipped, self.skipped_id)
+ self.add_results(result.expectedFailures,
+ self.expectedFailures_id)
+ self.add_results(result.unexpectedSuccesses,
+ self.unexpectedSuccesses_id)
+
+ else:
+ # suite crashed
+ retval = -1
+ self.add_results([(x, None) for x in testcase_suite],
+ self.crashes_id)
+
+ if retval != 0:
+ if concurrent_tests == 1:
+ if result:
+ rerun_classes = {x[0].__class__.__name__ for
+ x in result.errors}
+ rerun_classes.update({x[0].__class__.__name__ for
+ x in result.failures})
+ self.rerun.append(suite_from_failed(testcase_suite,
+ rerun_classes))
+ else:
+ self.rerun.append(testcase_suite)
+ else:
+ self.rerun.append(testcase_suite)
+
+ return retval
+
+ def print_results(self):
+ print('')
+ print(double_line_delim)
+ print('TEST RESULTS:')
+ print(' Executed tests: {}'.format(self.all_testcases))
+ print(' Passed tests: {}'.format(
+ colorize(str(self.all_testcases -
+ self.all_nonpassed), GREEN)))
+ if self[self.failures_id] > 0:
+ print(' Failed tests: {}'.format(
+ colorize(str(self[self.failures_id]), RED)))
+ if self[self.errors_id] > 0:
+ print(' Errored tests: {}'.format(
+ colorize(str(self[self.errors_id]), RED)))
+ if self[self.crashes_id] > 0:
+ print(' Crashed tests: {}'.format(
+ colorize(str(self[self.crashes_id]), RED)))
+ if self[self.skipped_id] > 0:
+ print(' Skipped tests: {}'.format(
+ colorize(str(self[self.skipped_id]), YELLOW)))
+ if self[self.expectedFailures_id] > 0:
+ print(' Expected failures: {}'.format(
+ colorize(str(self[self.expectedFailures_id]), GREEN)))
+ if self[self.unexpectedSuccesses_id] > 0:
+ print(' Unexpected successes: {}'.format(
+ colorize(str(self[self.unexpectedSuccesses_id]), YELLOW)))
+
+ if self.all_failed > 0:
+ print('FAILED TESTS:')
+ for testcase_class, suite_results in \
+ self.results_per_suite.items():
+ failed_testcases = suite_results[
+ self.failures_id]
+ errored_testcases = suite_results[
+ self.errors_id]
+ crashed_testcases = suite_results[
+ self.crashes_id]
+ if len(failed_testcases) or len(errored_testcases) \
+ or len(crashed_testcases):
+ print(' Testcase name: {}'.format(
+ colorize(testcase_class, RED)))
+ for failed_test in failed_testcases:
+ print(' FAILED: {}'.format(
+ colorize(get_test_description(
+ descriptions, failed_test), RED)))
+ for failed_test in errored_testcases:
+ print(' ERRORED: {}'.format(
+ colorize(get_test_description(
+ descriptions, failed_test), RED)))
+ for failed_test in crashed_testcases:
+ print(' CRASHED: {}'.format(
+ colorize(get_test_description(
+ descriptions, failed_test), RED)))
+
+ print(double_line_delim)
+ print('')
+
+ @property
+ def all_nonpassed(self):
+ return self[self.failures_id] + self[self.errors_id] + \
+ self[self.crashes_id] + self[self.skipped_id] + \
+ self[self.expectedFailures_id] + \
+ self[self.unexpectedSuccesses_id]
+
+ @property
+ def all_failed(self):
+ return self[self.failures_id] + self[self.errors_id] + \
+ self[self.crashes_id]
+
+
+def parse_results(results):
+ """
+ Prints the number of executed, passed, failed, errored, skipped,
+ expectedly failed and unexpectedly passed tests and details about
+ failed, errored, expectedly failed and unexpectedly passed tests.
+
+ Also returns any suites where any test failed.
+
+ :param results:
+ :return:
+ """
+
+ results_per_suite = NonPassedResults()
+ crashed = False
+ failed = False
+ for testcase_suite, result in results:
+ result_code = results_per_suite.add_result(testcase_suite, result)
+ if result_code == 1:
+ failed = True
+ elif result_code == -1:
+ crashed = True
+
+ results_per_suite.print_results()
+
+ if crashed:
+ return_code = -1
+ elif failed:
+ return_code = 1
+ else:
+ return_code = 0
+ return return_code, results_per_suite.rerun
+
+
+def parse_digit_env(env_var, default):
+ value = os.getenv(env_var, default)
+ if value != default:
+ if value.isdigit():
+ value = int(value)
+ else:
+ print('WARNING: unsupported value "%s" for env var "%s",'
+ 'defaulting to %s' % (value, env_var, default))
+ value = default
+ return value
if __name__ == '__main__':
- try:
- verbose = int(os.getenv("V", 0))
- except ValueError:
- verbose = 0
+ verbose = parse_digit_env("V", 0)
- default_test_timeout = 600 # 10 minutes
- try:
- test_timeout = int(os.getenv("TIMEOUT", default_test_timeout))
- except ValueError:
- test_timeout = default_test_timeout
+ test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes
- debug = os.getenv("DEBUG")
+ retries = parse_digit_env("RETRIES", 0)
- s = os.getenv("STEP", "n")
- step = True if s.lower() in ("y", "yes", "1") else False
+ debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"]
+
+ step = os.getenv("STEP", "n").lower() in ("y", "yes", "1")
+
+ force_foreground = \
+ os.getenv("FORCE_FOREGROUND", "n").lower() in ("y", "yes", "1")
+
+ run_interactive = debug or step or force_foreground
+
+ test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
+ if test_jobs == 'auto':
+ if run_interactive:
+ concurrent_tests = 1
+ print('Interactive mode required, running on one core')
+ else:
+ shm_free = psutil.disk_usage('/dev/shm').free
+ shm_max_processes = 1
+ if shm_free < min_req_shm:
+ raise Exception('Not enough free space in /dev/shm. Required '
+ 'free space is at least %sM.'
+ % (min_req_shm >> 20))
+ else:
+ extra_shm = shm_free - min_req_shm
+ shm_max_processes += extra_shm / shm_per_process
+ concurrent_tests = max(cpu_count(), shm_max_processes)
+ print('Found enough resources to run tests with %s cores'
+ % concurrent_tests)
+ elif test_jobs.isdigit():
+ concurrent_tests = int(test_jobs)
+ else:
+ concurrent_tests = 1
+
+ if run_interactive and concurrent_tests > 1:
+ raise NotImplementedError(
+ 'Running tests interactively (DEBUG, STEP or FORCE_FOREGROUND is '
+ 'set) in parallel (TEST_JOBS is more than 1) is not '
+ 'supported')
parser = argparse.ArgumentParser(description="VPP unit tests")
- parser.add_argument("-f", "--failfast", action='count',
+ parser.add_argument("-f", "--failfast", action='store_true',
help="fast failure flag")
parser.add_argument("-d", "--dir", action='append', type=str,
help="directory containing test files "
"(may be specified multiple times)")
args = parser.parse_args()
- failfast = True if args.failfast == 1 else False
+ failfast = args.failfast
+ descriptions = True
- suite = unittest.TestSuite()
- cb = add_to_suite_callback(suite)
+ print("Running tests using custom test runner") # debug message
+ filter_file, filter_class, filter_func = parse_test_option()
+
+ print("Active filters: file=%s, class=%s, function=%s" % (
+ filter_file, filter_class, filter_func))
+
+ filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
+
+ cb = SplitToSuitesCallback(filter_cb)
for d in args.dir:
print("Adding tests from directory tree %s" % d)
discover_tests(d, cb)
- try:
- retries = int(os.getenv("RETRIES", 0))
- except ValueError:
- retries = 0
+ # suites are not hashable, need to use list
+ suites = []
+ tests_amount = 0
+ for testcase_suite in cb.suites.values():
+ tests_amount += testcase_suite.countTestCases()
+ suites.append(testcase_suite)
- try:
- force_foreground = int(os.getenv("FORCE_FOREGROUND", 0))
- except ValueError:
- force_foreground = 0
+ if concurrent_tests == 1:
+ new_suite = unittest.TestSuite()
+ for suite in suites:
+ new_suite.addTest(suite)
+
+ suites = [new_suite]
+
+ print("%s out of %s tests match specified filters" % (
+ tests_amount, tests_amount + cb.filtered.countTestCases()))
+
+ if not running_extended_tests():
+ print("Not running extended tests (some tests will be skipped)")
+
attempts = retries + 1
if attempts > 1:
print("Perform %s attempts to pass the suite..." % attempts)
- if (debug is not None and debug.lower() in ["gdb", "gdbserver"]) or step\
- or force_foreground:
- # don't fork if requiring interactive terminal..
+
+ if run_interactive:
+ # don't fork if requiring interactive terminal
sys.exit(not VppTestRunner(
- verbosity=verbose, failfast=failfast).run(suite).wasSuccessful())
+ verbosity=verbose, failfast=failfast)
+ .run(suites[0]).wasSuccessful())
else:
- while True:
- result, failed = run_forked(suite)
- attempts = attempts - 1
- print("%s test(s) failed, %s attempt(s) left" %
- (len(failed), attempts))
- if len(failed) > 0 and attempts > 0:
- suite = suite_from_failed(suite, failed)
- continue
- sys.exit(result)
+ exit_code = 0
+ while len(suites) > 0 and attempts > 0:
+ tests_amount = sum([x.countTestCases() for x in suites])
+ results = run_forked(suites)
+ exit_code, suites = parse_results(results)
+ attempts -= 1
+ if exit_code == 0:
+ print('Test run was successful')
+ else:
+ print('%s attempt(s) left.' % attempts)
+ sys.exit(exit_code)