Add validation abilities to test cases
Change-Id: I76b28e6170d6e91836b195d58c0b882168c11a67
Issue-ID: VNFSDK-275
Signed-off-by: Moshe <moshehoa@amdocs.com>
Add unit tests
Issue-ID: VNFSDK-275
Change-Id: I34bc9a11e16e4092fdad3b4a1733c7219e624f5f
Signed-off-by: Moshe <moshehoa@amdocs.com>
add unit tests
Issue-ID: VNFSDK-275
Change-Id: Ib99c3521438b002e0d8aaff9870224673e34899f
Signed-off-by: Moshe <moshehoa@amdocs.com>
add unit tests
Issue-ID: VNFSDK-275
Change-Id: I1ac560dfb40df5f346b0db8f40b8c52a2fb6b350
Signed-off-by: Moshe <moshehoa@amdocs.com>
diff --git a/vnftest/common/exceptions.py b/vnftest/common/exceptions.py
index 85cb203..9e29c52 100644
--- a/vnftest/common/exceptions.py
+++ b/vnftest/common/exceptions.py
@@ -43,3 +43,7 @@
class InputParameterMissing(VnftestException):
message_tmplate = 'No value found for parameter "{param_name}" in "{source}"'
+
+
+class ResourceNotFound(VnftestException):
+ message_tmplate = 'Resource not found "{resource}"'
diff --git a/vnftest/common/openstack_utils.py b/vnftest/common/openstack_utils.py
index c97c1c2..829b916 100644
--- a/vnftest/common/openstack_utils.py
+++ b/vnftest/common/openstack_utils.py
@@ -27,6 +27,7 @@
from novaclient import client as novaclient
from glanceclient import client as glanceclient
from neutronclient.neutron import client as neutronclient
+from heatclient.client import Client as heatclient
log = logging.getLogger(__name__)
@@ -159,6 +160,11 @@
return neutronclient.Client(get_neutron_client_version(), session=sess)
+def get_heat_client(): # pragma: no cover
+ sess = get_session()
+ return heatclient(get_heat_api_version(), session=sess)
+
+
def get_glance_client_version(): # pragma: no cover
try:
api_version = os.environ['OS_IMAGE_API_VERSION']
@@ -199,6 +205,14 @@
instance_name)
+def get_instance_by_id(instance_id): # pragma: no cover
+ try:
+ return get_nova_client().servers.find(id=instance_id)
+ except Exception:
+ log.exception("Error [get_instance_by_id(nova_client, '%s')]",
+ instance_id)
+
+
def get_aggregates(nova_client): # pragma: no cover
try:
return nova_client.aggregates.list()
@@ -440,6 +454,15 @@
# *********************************************
# NEUTRON
# *********************************************
+def get_network_by_name(network_name): # pragma: no cover
+ try:
+ networks = get_neutron_client().list_networks()['networks']
+ return next((n for n in networks if n['name'] == network_name), None)
+ except Exception:
+ log.exception("Error [get_instance_by_id(nova_client, '%s')]",
+ network_name)
+
+
def get_network_id(neutron_client, network_name): # pragma: no cover
networks = neutron_client.list_networks()['networks']
return next((n['id'] for n in networks if n['name'] == network_name), None)
@@ -760,3 +783,31 @@
log.exception("Error [detach_server_volume(nova_client, '%s', '%s')]",
server_id, volume_id)
return False
+# *********************************************
+# HEAT
+# *********************************************
+
+
+def get_stack(heat_stack_id): # pragma: no cover
+ try:
+ client = get_heat_client()
+ return client.stacks.get(heat_stack_id)
+ except Exception as e:
+ log.exception("Error [get_stack(heat_stack_id)]", e)
+
+
+def get_stack_resources(heat_stack_id): # pragma: no cover
+ try:
+ client = get_heat_client()
+ return client.resources.list(heat_stack_id)
+ except Exception as e:
+ log.exception("Error [get_stack_resources(heat_stack_id)]", e)
+
+
+def get_stack_vms(heat_stack_id): # pragma: no cover
+ resources = get_stack_resources(heat_stack_id)
+ ret_vms = []
+ for resource in resources:
+ if resource.resource_type == "OS::Nova::Server":
+ ret_vms.append(resource)
+ return ret_vms
diff --git a/vnftest/common/rest_client.py b/vnftest/common/rest_client.py
index 23a108c..051f5dd 100644
--- a/vnftest/common/rest_client.py
+++ b/vnftest/common/rest_client.py
@@ -14,9 +14,17 @@
##############################################################################
import json
+
+import logging
+import os
import urllib2
import requests
+from vnftest.common import utils
+
+logger = logging.getLogger(__name__)
+os.putenv('PYTHONHTTPSVERIFY', "0")
+
def post(url, headers, data, logger):
return call(url, 'POST', headers, data, logger)
@@ -31,10 +39,15 @@
f = urllib2.urlopen(req)
return_code = f.code
response_body = f.read()
+ headers = f.headers
+ content_type = headers.dict['content-type'] if 'content-type' in headers.dict else 'application/json'
f.close()
if len(str(response_body)) == 0:
response_body = "{}"
- response_body = json.loads(response_body)
+ if 'application/xml' in content_type:
+ response_body = utils.xml_to_dict(response_body)
+ else:
+ response_body = json.loads(response_body)
result = {'return_code': return_code, 'body': response_body}
return result
diff --git a/vnftest/common/utils.py b/vnftest/common/utils.py
index ad1b2f3..406796d 100644
--- a/vnftest/common/utils.py
+++ b/vnftest/common/utils.py
@@ -15,10 +15,13 @@
# yardstick/common/utils.py
import collections
+import formatter
from contextlib import closing
import datetime
import errno
import importlib
+from string import Formatter
+
import ipaddress
import logging
import os
@@ -27,16 +30,21 @@
import subprocess
import sys
+import pkg_resources
import six
from flask import jsonify
from six.moves import configparser
from oslo_serialization import jsonutils
-
+import xml.etree.ElementTree
import vnftest
+from vnftest.common.exceptions import ResourceNotFound
+
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
+class_implementations = {}
+
# Decorator for cli-args
def cliargs(*args, **kwargs):
@@ -46,23 +54,24 @@
return _decorator
-def itersubclasses(cls, _seen=None):
- """Generator over all subclasses of a given class in depth first order."""
+def findsubclasses(cls):
+ if cls.__name__ not in class_implementations:
+ # Load entrypoint classes just once.
+ if len(class_implementations) == 0:
+ for entrypoint in pkg_resources.iter_entry_points(group='vnftest.extension'):
+ loaded_type = entrypoint.load()
+ logger.info("Loaded: " + str(loaded_type))
- if not isinstance(cls, type):
- raise TypeError("itersubclasses must be called with "
- "new-style classes, not %.100r" % cls)
- _seen = _seen or set()
- try:
- subs = cls.__subclasses__()
- except TypeError: # fails only when cls is type
- subs = cls.__subclasses__(cls)
- for sub in subs:
- if sub not in _seen:
- _seen.add(sub)
- yield sub
- for sub in itersubclasses(sub, _seen):
- yield sub
+ subclasses = []
+ class_implementations[cls.__name__] = subclasses
+
+ def getallnativesubclasses(clazz):
+ for subclass in clazz.__subclasses__():
+ subclasses.append(subclass)
+ getallnativesubclasses(subclass)
+
+ getallnativesubclasses(cls)
+ return class_implementations[cls.__name__]
def import_modules_from_package(package):
@@ -431,3 +440,91 @@
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
+
+
+def normalize_data_struct(obj):
+ if isinstance(obj, basestring):
+ return [obj]
+ if isinstance(obj, list):
+ nomalized_list = []
+ for element in obj:
+ element = normalize_data_struct(element)
+ nomalized_list.append(element)
+ return nomalized_list
+ if isinstance(obj, dict):
+ normalized_dict = {}
+ for k, v in obj:
+ v = normalize_data_struct(v)
+ normalized_dict[k] = v
+ return normalized_dict
+ return change_obj_to_dict(obj)
+
+
+def xml_to_dict(xml_str):
+ return element_tree_to_dict(xml.etree.ElementTree.fromstring(xml_str))
+
+
+def element_tree_to_dict(element_tree):
+ def internal_iter(tree, accum):
+ if tree is None:
+ return accum
+ attribute_target = None
+ if tree.getchildren():
+ accum[tree.tag] = {}
+ attribute_target = accum[tree.tag]
+ for each in tree.getchildren():
+ result = internal_iter(each, {})
+ if each.tag in accum[tree.tag]:
+ if not isinstance(accum[tree.tag][each.tag], list):
+ accum[tree.tag][each.tag] = [
+ accum[tree.tag][each.tag]
+ ]
+ accum[tree.tag][each.tag].append(result[each.tag])
+ else:
+ accum[tree.tag].update(result)
+ else:
+ attribute_target = accum
+ accum[tree.tag] = tree.text
+ # Add attributes
+ attributes = tree.attrib or {}
+ for att_name, att_value in attributes.iteritems():
+ attribute_target[att_name] = att_value
+
+ return accum
+
+ return internal_iter(element_tree, {})
+
+
+def resource_as_string(path):
+ split_path = os.path.split(path)
+ package = split_path[0].replace("/", ".")
+ if not pkg_resources.resource_exists(package, split_path[1]):
+ raise ResourceNotFound(resource=path)
+ return pkg_resources.resource_string(package, split_path[1])
+
+
+def load_resource(path):
+ split_path = os.path.split(path)
+ package = split_path[0].replace("/", ".")
+ if not pkg_resources.resource_exists(package, split_path[1]):
+ raise ResourceNotFound(resource=path)
+ return pkg_resources.resource_stream(package, split_path[1])
+
+
+def format(st, params):
+ if not isinstance(st, basestring):
+ return st
+ ret_str = ""
+ ret_obj = None
+ for literal_text, field_name, format_spec, conversion in \
+ Formatter().parse(st):
+ if field_name is None:
+ ret_str = ret_str + literal_text
+ else:
+ dict = ret_obj or params
+ value = dict[field_name]
+ if isinstance(value, basestring):
+ ret_str = ret_str + value
+ else:
+ ret_obj = value
+ return ret_obj or ret_str