tests: cpus awareness
Introduce MAX_CPUS parameters to control maximum number of CPUs used by
VPP(s) during testing, with default value 'auto' corresponding to all
CPUs available.
Calculate test CPU requirements by taking into account the number of
workers, so a test requires 1 (main thread) + # of worker CPUs.
When running tests, keep track of both running test jobs (controlled by
TEST_JOBS parameter) and free CPUs. This then causes two limits in the
system - to not exceed number of jobs in parallel but also to not exceed
number of CPUs available.
Skip tests which require more CPUs than are available in system (or more
than MAX_CPUS) and print a warning message.
Type: improvement
Change-Id: Ib8fda54e4c6a36179d64160bb87fbd3a0011762d
Signed-off-by: Klement Sekera <ksekera@cisco.com>
diff --git a/test/Makefile b/test/Makefile
index dc6aa09..0ee61a2 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -366,7 +366,8 @@
@echo "Arguments controlling test runs:"
@echo " V=[0|1|2] - set test verbosity level"
@echo " 0=ERROR, 1=INFO, 2=DEBUG"
- @echo " TEST_JOBS=[<n>|auto] - use <n> parallel processes for test execution or automatic discovery of maximum acceptable processes (default: 1)"
+ @echo " TEST_JOBS=[<n>|auto] - use at most <n> parallel python processes for test execution, if auto, set to number of available cpus (default: 1)"
+ @echo " MAX_VPP_CPUS=[<n>|auto]- use at most <n> cpus for running vpp main and worker threads, if auto, set to number of available cpus (default: auto)"
@echo " CACHE_OUTPUT=[0|1] - cache VPP stdout/stderr and log as one block after test finishes (default: 1)"
@echo " FAILFAST=[0|1] - fail fast if 1, complete all tests if 0"
@echo " TIMEOUT=<timeout> - fail test suite if any single test takes longer than <timeout> (in seconds) to finish (default: 600)"
diff --git a/test/cpu_config.py b/test/cpu_config.py
new file mode 100644
index 0000000..b4e5d1a
--- /dev/null
+++ b/test/cpu_config.py
@@ -0,0 +1,21 @@
+import os
+import psutil
+
+available_cpus = psutil.Process().cpu_affinity()
+num_cpus = len(available_cpus)
+
+max_vpp_cpus = os.getenv("MAX_VPP_CPUS", "auto").lower()
+
+if max_vpp_cpus == "auto":
+ max_vpp_cpus = num_cpus
+else:
+ try:
+ max_vpp_cpus = int(max_vpp_cpus)
+ except ValueError as e:
+ raise ValueError("Invalid MAX_VPP_CPUS value specified, valid "
+ "values are a positive integer or 'auto'") from e
+ if max_vpp_cpus <= 0:
+ raise ValueError("Invalid MAX_VPP_CPUS value specified, valid "
+ "values are a positive integer or 'auto'")
+ if max_vpp_cpus > num_cpus:
+ max_vpp_cpus = num_cpus
diff --git a/test/framework.py b/test/framework.py
index 67ac495..1cbd814 100644
--- a/test/framework.py
+++ b/test/framework.py
@@ -22,6 +22,7 @@
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
from enum import Enum
+from abc import ABC, abstractmethod
import scapy.compat
from scapy.packet import Raw
@@ -42,6 +43,8 @@
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
+from cpu_config import available_cpus, num_cpus, max_vpp_cpus
+
logger = logging.getLogger(__name__)
# Set up an empty logger for the testcase that can be overridden as necessary
@@ -53,6 +56,7 @@
ERROR = 2
SKIP = 3
TEST_RUN = 4
+SKIP_CPU_SHORTAGE = 5
class BoolEnvironmentVariable(object):
@@ -223,6 +227,21 @@
running_gcov_tests = _running_gcov_tests()
+def get_environ_vpp_worker_count():
+ worker_config = os.getenv("VPP_WORKER_CONFIG", None)
+ if worker_config:
+ elems = worker_config.split(" ")
+ if elems[0] != "workers" or len(elems) != 2:
+ raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
+ worker_config)
+ return int(elems[1])
+ else:
+ return 0
+
+
+environ_vpp_worker_count = get_environ_vpp_worker_count()
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
@@ -292,7 +311,21 @@
pass
-class VppTestCase(unittest.TestCase):
+class CPUInterface(ABC):
+ cpus = []
+ skipped_due_to_cpu_lack = False
+
+ @classmethod
+ @abstractmethod
+ def get_cpus_required(cls):
+ pass
+
+ @classmethod
+ def assign_cpus(cls, cpus):
+ cls.cpus = cpus
+
+
+class VppTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
@@ -358,34 +391,18 @@
if dl == "gdb-all" or dl == "gdbserver-all":
cls.debug_all = True
- @staticmethod
- def get_least_used_cpu():
- cpu_usage_list = [set(range(psutil.cpu_count()))]
- vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
- if 'vpp_main' == p.info['name']]
- for vpp_process in vpp_processes:
- for cpu_usage_set in cpu_usage_list:
- try:
- cpu_num = vpp_process.cpu_num()
- if cpu_num in cpu_usage_set:
- cpu_usage_set_index = cpu_usage_list.index(
- cpu_usage_set)
- if cpu_usage_set_index == len(cpu_usage_list) - 1:
- cpu_usage_list.append({cpu_num})
- else:
- cpu_usage_list[cpu_usage_set_index + 1].add(
- cpu_num)
- cpu_usage_set.remove(cpu_num)
- break
- except psutil.NoSuchProcess:
- pass
+ @classmethod
+ def get_vpp_worker_count(cls):
+ if not hasattr(cls, "vpp_worker_count"):
+ if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
+ cls.vpp_worker_count = 0
+ else:
+ cls.vpp_worker_count = environ_vpp_worker_count
+ return cls.vpp_worker_count
- for cpu_usage_set in cpu_usage_list:
- if len(cpu_usage_set) > 0:
- min_usage_set = cpu_usage_set
- break
-
- return random.choice(tuple(min_usage_set))
+ @classmethod
+ def get_cpus_required(cls):
+ return 1 + cls.get_vpp_worker_count()
@classmethod
def setUpConstants(cls):
@@ -417,20 +434,6 @@
if coredump_size is None:
coredump_size = "coredump-size unlimited"
- cpu_core_number = cls.get_least_used_cpu()
- if not hasattr(cls, "vpp_worker_count"):
- cls.vpp_worker_count = 0
- worker_config = os.getenv("VPP_WORKER_CONFIG", "")
- if worker_config:
- elems = worker_config.split(" ")
- if elems[0] != "workers" or len(elems) != 2:
- raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
- worker_config)
- cls.vpp_worker_count = int(elems[1])
- if cls.vpp_worker_count > 0 and\
- cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
- cls.vpp_worker_count = 0
-
default_variant = os.getenv("VARIANT")
if default_variant is not None:
default_variant = "defaults { %s 100 }" % default_variant
@@ -447,9 +450,10 @@
coredump_size, "runtime-dir", cls.tempdir, "}",
"api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
- "cpu", "{", "main-core", str(cpu_core_number), ]
- if cls.vpp_worker_count:
- cls.vpp_cmdline.extend(["workers", str(cls.vpp_worker_count)])
+ "cpu", "{", "main-core", str(cls.cpus[0]), ]
+ if cls.get_vpp_worker_count():
+ cls.vpp_cmdline.extend([
+ "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
cls.vpp_cmdline.extend([
"}",
"physmem", "{", "max-size", "32m", "}",
@@ -509,11 +513,12 @@
@classmethod
def run_vpp(cls):
+ cls.logger.debug(f"Assigned cpus: {cls.cpus}")
cmdline = cls.vpp_cmdline
if cls.debug_gdbserver:
gdbserver = '/usr/bin/gdbserver'
- if not os.path.isfile(gdbserver) or \
+ if not os.path.isfile(gdbserver) or\
not os.access(gdbserver, os.X_OK):
raise Exception("gdbserver binary '%s' does not exist or is "
"not executable" % gdbserver)
@@ -1349,6 +1354,7 @@
self.verbosity = verbosity
self.result_string = None
self.runner = runner
+ self.printed = []
def addSuccess(self, test):
"""
@@ -1383,7 +1389,10 @@
unittest.TestResult.addSkip(self, test, reason)
self.result_string = colorize("SKIP", YELLOW)
- self.send_result_through_pipe(test, SKIP)
+ if reason == "not enough cpus":
+ self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
+ else:
+ self.send_result_through_pipe(test, SKIP)
def symlink_failed(self):
if self.current_test_case_info:
@@ -1501,28 +1510,34 @@
"""
def print_header(test):
+ if test.__class__ in self.printed:
+ return
+
test_doc = getdoc(test)
if not test_doc:
raise Exception("No doc string for test '%s'" % test.id())
+
test_title = test_doc.splitlines()[0]
- test_title_colored = colorize(test_title, GREEN)
+ test_title = colorize(test_title, GREEN)
if test.is_tagged_run_solo():
- # long live PEP-8 and 80 char width limitation...
- c = YELLOW
- test_title_colored = colorize("SOLO RUN: " + test_title, c)
+ test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
# This block may overwrite the colorized title above,
# but we want this to stand out and be fixed
if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
- c = RED
- w = "FIXME with VPP workers: "
- test_title_colored = colorize(w + test_title, c)
+ test_title = colorize(
+ f"FIXME with VPP workers: {test_title}", RED)
- if not hasattr(test.__class__, '_header_printed'):
- print(double_line_delim)
- print(test_title_colored)
- print(double_line_delim)
- test.__class__._header_printed = True
+ if test.__class__.skipped_due_to_cpu_lack:
+ test_title = colorize(
+ f"{test_title} [skipped - not enough cpus, "
+ f"required={test.__class__.get_cpus_required()}, "
+ f"available={max_vpp_cpus}]", YELLOW)
+
+ print(double_line_delim)
+ print(test_title)
+ print(double_line_delim)
+ self.printed.append(test.__class__)
print_header(test)
self.start_test = time.time()
diff --git a/test/log.py b/test/log.py
index b0fe037..5fd91c7 100644
--- a/test/log.py
+++ b/test/log.py
@@ -11,7 +11,7 @@
def colorize(msg, color):
- return color + msg + COLOR_RESET
+ return f"{color}{msg}{COLOR_RESET}"
class ColorFormatter(logging.Formatter):
diff --git a/test/run_tests.py b/test/run_tests.py
index 008828c..5d091ad 100644
--- a/test/run_tests.py
+++ b/test/run_tests.py
@@ -9,22 +9,21 @@
import time
import threading
import signal
-import psutil
import re
-import multiprocessing
-from multiprocessing import Process, Pipe, cpu_count
+from multiprocessing import Process, Pipe, get_context
from multiprocessing.queues import Queue
from multiprocessing.managers import BaseManager
import framework
from framework import VppTestRunner, running_extended_tests, VppTestCase, \
get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
- TEST_RUN
+ TEST_RUN, SKIP_CPU_SHORTAGE
from debug import spawn_gdb, start_vpp_in_gdb
from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
colorize, single_line_delim
from discover_tests import discover_tests
from subprocess import check_output, CalledProcessError
from util import check_core_path, get_core_path, is_core_present
+from cpu_config import num_cpus, max_vpp_cpus, available_cpus
# timeout which controls how long the child has to finish after seeing
# a core dump in test temporary directory. If this is exceeded, parent assumes
@@ -59,6 +58,7 @@
self[FAIL] = []
self[ERROR] = []
self[SKIP] = []
+ self[SKIP_CPU_SHORTAGE] = []
self[TEST_RUN] = []
self.crashed = False
self.testcase_suite = testcase_suite
@@ -67,8 +67,8 @@
def was_successful(self):
return 0 == len(self[FAIL]) == len(self[ERROR]) \
- and len(self[PASS] + self[SKIP]) \
- == self.testcase_suite.countTestCases() == len(self[TEST_RUN])
+ and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
+ == self.testcase_suite.countTestCases()
def no_tests_run(self):
return 0 == len(self[TEST_RUN])
@@ -80,7 +80,7 @@
rerun_ids = set([])
for testcase in self.testcase_suite:
tc_id = testcase.id()
- if tc_id not in self[PASS] and tc_id not in self[SKIP]:
+ if tc_id not in self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]:
rerun_ids.add(tc_id)
if rerun_ids:
return suite_from_failed(self.testcase_suite, rerun_ids)
@@ -142,11 +142,7 @@
self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
self.result_parent_end, self.result_child_end = Pipe(duplex=False)
self.testcase_suite = testcase_suite
- if sys.version[0] == '2':
- self.stdouterr_queue = manager.StreamQueue()
- else:
- from multiprocessing import get_context
- self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
+ self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
self.logger = get_parallel_logger(self.stdouterr_queue)
self.child = Process(target=test_runner_wrapper,
args=(testcase_suite,
@@ -213,6 +209,13 @@
def was_successful(self):
return self.result.was_successful()
+ @property
+ def cpus_used(self):
+ return self.testcase_suite.cpus_used
+
+ def get_assigned_cpus(self):
+ return self.testcase_suite.get_assigned_cpus()
+
def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
read_testcases):
@@ -324,7 +327,6 @@
def run_forked(testcase_suites):
wrapped_testcase_suites = set()
solo_testcase_suites = []
- total_test_runners = 0
# suites are unhashable, need to use list
results = []
@@ -332,31 +334,53 @@
finished_unread_testcases = set()
manager = StreamQueueManager()
manager.start()
- total_test_runners = 0
- while total_test_runners < concurrent_tests:
- if testcase_suites:
+ tests_running = 0
+ free_cpus = list(available_cpus)
+
+ def on_suite_start(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running + 1
+
+ def on_suite_finish(tc):
+ nonlocal tests_running
+ nonlocal free_cpus
+ tests_running = tests_running - 1
+ assert tests_running >= 0
+ free_cpus.extend(tc.get_assigned_cpus())
+
+ def run_suite(suite):
+ nonlocal manager
+ nonlocal wrapped_testcase_suites
+ nonlocal unread_testcases
+ nonlocal free_cpus
+ suite.assign_cpus(free_cpus[:suite.cpus_used])
+ free_cpus = free_cpus[suite.cpus_used:]
+ wrapper = TestCaseWrapper(suite, manager)
+ wrapped_testcase_suites.add(wrapper)
+ unread_testcases.add(wrapper)
+ on_suite_start(suite)
+
+ def can_run_suite(suite):
+ return (tests_running < max_concurrent_tests and
+ (suite.cpus_used <= len(free_cpus) or
+ suite.cpus_used > max_vpp_cpus))
+
+ while free_cpus and testcase_suites:
+ a_suite = testcase_suites[0]
+ if a_suite.is_tagged_run_solo:
a_suite = testcase_suites.pop(0)
- if a_suite.is_tagged_run_solo:
- solo_testcase_suites.append(a_suite)
- continue
- wrapped_testcase_suite = TestCaseWrapper(a_suite,
- manager)
- wrapped_testcase_suites.add(wrapped_testcase_suite)
- unread_testcases.add(wrapped_testcase_suite)
- total_test_runners = total_test_runners + 1
+ solo_testcase_suites.append(a_suite)
+ continue
+ if can_run_suite(a_suite):
+ a_suite = testcase_suites.pop(0)
+ run_suite(a_suite)
else:
break
- while total_test_runners < 1 and solo_testcase_suites:
- if solo_testcase_suites:
- a_suite = solo_testcase_suites.pop(0)
- wrapped_testcase_suite = TestCaseWrapper(a_suite,
- manager)
- wrapped_testcase_suites.add(wrapped_testcase_suite)
- unread_testcases.add(wrapped_testcase_suite)
- total_test_runners = total_test_runners + 1
- else:
- break
+ if tests_running == 0 and solo_testcase_suites:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
read_from_testcases = threading.Event()
read_from_testcases.set()
@@ -466,31 +490,23 @@
wrapped_testcase_suites.remove(finished_testcase)
finished_unread_testcases.add(finished_testcase)
finished_testcase.stdouterr_queue.put(None)
- total_test_runners = total_test_runners - 1
+ on_suite_finish(finished_testcase)
if stop_run:
while testcase_suites:
results.append(TestResult(testcase_suites.pop(0)))
elif testcase_suites:
- a_testcase = testcase_suites.pop(0)
- while a_testcase and a_testcase.is_tagged_run_solo:
- solo_testcase_suites.append(a_testcase)
+ a_suite = testcase_suites.pop(0)
+ while a_suite and a_suite.is_tagged_run_solo:
+ solo_testcase_suites.append(a_suite)
if testcase_suites:
- a_testcase = testcase_suites.pop(0)
+ a_suite = testcase_suites.pop(0)
else:
- a_testcase = None
- if a_testcase:
- new_testcase = TestCaseWrapper(a_testcase,
- manager)
- wrapped_testcase_suites.add(new_testcase)
- total_test_runners = total_test_runners + 1
- unread_testcases.add(new_testcase)
- if solo_testcase_suites and total_test_runners == 0:
- a_testcase = solo_testcase_suites.pop(0)
- new_testcase = TestCaseWrapper(a_testcase,
- manager)
- wrapped_testcase_suites.add(new_testcase)
- total_test_runners = total_test_runners + 1
- unread_testcases.add(new_testcase)
+ a_suite = None
+ if a_suite and can_run_suite(a_suite):
+ run_suite(a_suite)
+ if solo_testcase_suites and tests_running == 0:
+ a_suite = solo_testcase_suites.pop(0)
+ run_suite(a_suite)
time.sleep(0.1)
except Exception:
for wrapped_testcase_suite in wrapped_testcase_suites:
@@ -506,19 +522,41 @@
return results
+class TestSuiteWrapper(unittest.TestSuite):
+ cpus_used = 0
+
+ def __init__(self):
+ return super().__init__()
+
+ def addTest(self, test):
+ self.cpus_used = max(self.cpus_used, test.get_cpus_required())
+ super().addTest(test)
+
+ def assign_cpus(self, cpus):
+ self.cpus = cpus
+
+ def _handleClassSetUp(self, test, result):
+ if not test.__class__.skipped_due_to_cpu_lack:
+ test.assign_cpus(self.cpus)
+ super()._handleClassSetUp(test, result)
+
+ def get_assigned_cpus(self):
+ return self.cpus
+
+
class SplitToSuitesCallback:
def __init__(self, filter_callback):
self.suites = {}
self.suite_name = 'default'
self.filter_callback = filter_callback
- self.filtered = unittest.TestSuite()
+ self.filtered = TestSuiteWrapper()
def __call__(self, file_name, cls, method):
test_method = cls(method)
if self.filter_callback(file_name, cls.__name__, method):
self.suite_name = file_name + cls.__name__
if self.suite_name not in self.suites:
- self.suites[self.suite_name] = unittest.TestSuite()
+ self.suites[self.suite_name] = TestSuiteWrapper()
self.suites[self.suite_name].is_tagged_run_solo = False
self.suites[self.suite_name].addTest(test_method)
if test_method.is_tagged_run_solo():
@@ -563,7 +601,7 @@
def filter_tests(tests, filter_cb):
- result = unittest.suite.TestSuite()
+ result = TestSuiteWrapper()
for t in tests:
if isinstance(t, unittest.suite.TestSuite):
# this is a bunch of tests, recursively filter...
@@ -628,13 +666,14 @@
self[FAIL] = 0
self[ERROR] = 0
self[SKIP] = 0
+ self[SKIP_CPU_SHORTAGE] = 0
self[TEST_RUN] = 0
self.rerun = []
self.testsuites_no_tests_run = []
def add_results(self, result):
self.results_per_suite.append(result)
- result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN]
+ result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN, SKIP_CPU_SHORTAGE]
for result_type in result_types:
self[result_type] += len(result[result_type])
@@ -661,22 +700,29 @@
print('')
print(double_line_delim)
print('TEST RESULTS:')
- print(' Scheduled tests: {}'.format(self.all_testcases))
- print(' Executed tests: {}'.format(self[TEST_RUN]))
- print(' Passed tests: {}'.format(
- colorize(str(self[PASS]), GREEN)))
- if self[SKIP] > 0:
- print(' Skipped tests: {}'.format(
- colorize(str(self[SKIP]), YELLOW)))
- if self.not_executed > 0:
- print(' Not Executed tests: {}'.format(
- colorize(str(self.not_executed), RED)))
- if self[FAIL] > 0:
- print(' Failures: {}'.format(
- colorize(str(self[FAIL]), RED)))
- if self[ERROR] > 0:
- print(' Errors: {}'.format(
- colorize(str(self[ERROR]), RED)))
+
+ def indent_results(lines):
+ lines = list(filter(None, lines))
+ maximum = max(lines, key=lambda x: x.index(":"))
+ maximum = 4 + maximum.index(":")
+ for l in lines:
+ padding = " " * (maximum - l.index(":"))
+ print(f"{padding}{l}")
+
+ indent_results([
+ f'Scheduled tests: {self.all_testcases}',
+ f'Executed tests: {self[TEST_RUN]}',
+ f'Passed tests: {colorize(self[PASS], GREEN)}',
+ f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
+ if self[SKIP] else None,
+ f'Not Executed tests: {colorize(self.not_executed, RED)}'
+ if self.not_executed else None,
+ f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
+ f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
+ 'Tests skipped due to lack of CPUS: '
+ f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
+ if self[SKIP_CPU_SHORTAGE] else None
+ ])
if self.all_failed > 0:
print('FAILURES AND ERRORS IN TESTS:')
@@ -713,6 +759,10 @@
for tc_class in tc_classes:
print(' {}'.format(colorize(tc_class, RED)))
+ if self[SKIP_CPU_SHORTAGE]:
+ print()
+ print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
+ ' ENOUGH CPUS AVAILABLE', YELLOW))
print(double_line_delim)
print('')
@@ -793,31 +843,34 @@
run_interactive = debug or step or force_foreground
- try:
- num_cpus = len(os.sched_getaffinity(0))
- except AttributeError:
- num_cpus = multiprocessing.cpu_count()
-
- print("OS reports %s available cpu(s)." % num_cpus)
+ max_concurrent_tests = 0
+ print(f"OS reports {num_cpus} available cpu(s).")
test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process
if test_jobs == 'auto':
if run_interactive:
- concurrent_tests = 1
- print('Interactive mode required, running on one core')
+ max_concurrent_tests = 1
+ print('Interactive mode required, running tests consecutively.')
else:
- concurrent_tests = num_cpus
- print('Found enough resources to run tests with %s cores'
- % concurrent_tests)
- elif test_jobs.isdigit():
- concurrent_tests = int(test_jobs)
- print("Running on %s core(s) as set by 'TEST_JOBS'." %
- concurrent_tests)
+ max_concurrent_tests = num_cpus
+ print(f"Running at most {max_concurrent_tests} python test "
+ "processes concurrently.")
else:
- concurrent_tests = 1
- print('Running on one core.')
+ try:
+ test_jobs = int(test_jobs)
+ except ValueError as e:
+ raise ValueError("Invalid TEST_JOBS value specified, valid "
+ "values are a positive integer or 'auto'") from e
+ if test_jobs <= 0:
+ raise ValueError("Invalid TEST_JOBS value specified, valid "
+ "values are a positive integer or 'auto'")
+ max_concurrent_tests = int(test_jobs)
+ print(f"Running at most {max_concurrent_tests} python test processes "
+ "concurrently as set by 'TEST_JOBS'.")
- if run_interactive and concurrent_tests > 1:
+ print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
+
+ if run_interactive and max_concurrent_tests > 1:
raise NotImplementedError(
'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
@@ -833,7 +886,7 @@
failfast = args.failfast
descriptions = True
- print("Running tests using custom test runner") # debug message
+ print("Running tests using custom test runner.")
filter_file, filter_class, filter_func = parse_test_option()
print("Active filters: file=%s, class=%s, function=%s" % (
@@ -852,6 +905,21 @@
tests_amount = 0
for testcase_suite in cb.suites.values():
tests_amount += testcase_suite.countTestCases()
+ if testcase_suite.cpus_used > max_vpp_cpus:
+ # here we replace test functions with lambdas to just skip them
+ # but we also replace setUp/tearDown functions to do nothing
+ # so that the test can be "started" and "stopped", so that we can
+ # still keep those prints (test description - SKIP), which are done
+ # in stopTest() (for that to trigger, test function must run)
+ for t in testcase_suite:
+ for m in dir(t):
+ if m.startswith('test_'):
+ setattr(t, m, lambda: t.skipTest("not enough cpus"))
+ setattr(t.__class__, 'setUpClass', lambda: None)
+ setattr(t.__class__, 'tearDownClass', lambda: None)
+ setattr(t, 'setUp', lambda: None)
+ setattr(t, 'tearDown', lambda: None)
+ t.__class__.skipped_due_to_cpu_lack = True
suites.append(testcase_suite)
print("%s out of %s tests match specified filters" % (
@@ -868,6 +936,14 @@
# don't fork if requiring interactive terminal
print('Running tests in foreground in the current process')
full_suite = unittest.TestSuite()
+ free_cpus = list(available_cpus)
+ cpu_shortage = False
+ for suite in suites:
+ if suite.cpus_used <= max_vpp_cpus:
+ suite.assign_cpus(free_cpus[:suite.cpus_used])
+ else:
+ suite.assign_cpus([])
+ cpu_shortage = True
full_suite.addTests(suites)
result = VppTestRunner(verbosity=verbose,
failfast=failfast,
@@ -883,10 +959,16 @@
test_case_info.tempdir,
test_case_info.core_crash_test)
+ if cpu_shortage:
+ print()
+ print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
+ ' ENOUGH CPUS AVAILABLE', YELLOW))
+ print()
sys.exit(not was_successful)
else:
print('Running each VPPTestCase in a separate background process'
- ' with {} parallel process(es)'.format(concurrent_tests))
+ f' with at most {max_concurrent_tests} parallel python test '
+ 'process(es)')
exit_code = 0
while suites and attempts > 0:
results = run_forked(suites)
diff --git a/test/sanity_run_vpp.py b/test/sanity_run_vpp.py
index f91dc83..4b21de1 100644
--- a/test/sanity_run_vpp.py
+++ b/test/sanity_run_vpp.py
@@ -9,7 +9,7 @@
class SanityTestCase(VppTestCase):
""" Sanity test case - verify whether VPP is able to start """
- pass
+ cpus = [0]
# don't ask to debug SanityTestCase
@classmethod
diff --git a/test/test_util.py b/test/test_util.py
index 421afce..3a61d64 100755
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -2,11 +2,11 @@
"""Test framework utility functions tests"""
import unittest
-from framework import VppTestRunner
+from framework import VppTestRunner, CPUInterface
from vpp_papi import mac_pton, mac_ntop
-class TestUtil (unittest.TestCase):
+class TestUtil (CPUInterface, unittest.TestCase):
""" Test framework utility tests """
@classmethod
@@ -23,6 +23,10 @@
pass
return False
+ @classmethod
+ def get_cpus_required(cls):
+ return 0
+
def test_mac_to_binary(self):
""" MAC to binary and back """
mac = 'aa:bb:cc:dd:ee:ff'