Refactor PrhLibrary.py
Issue-ID: INT-1094
Signed-off-by: biniek <lukasz.biniek@nokia.com>
Change-Id: Ic946c3ce58b3d94e4d8e3c4f206be4b707c31084
diff --git a/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py b/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py
index 88aeb45..7453eb0 100644
--- a/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py
+++ b/tests/dcaegen2/prh-testcases/resources/PrhLibrary.py
@@ -1,5 +1,5 @@
import json
-
+import re
import docker
import time
@@ -11,83 +11,72 @@
@staticmethod
def find_log_entry(search_for):
- print (type(search_for))
+ print(type(search_for))
client = docker.from_env()
container = client.containers.get('prh')
- print ("Check for log searches for pattern: ", search_for )
+ print("Check for log searches for pattern: ", search_for )
for line in container.logs(stream=True):
- print ("Check for log analysis line: ", line )
+ print("Check for log analysis line: ", line )
if search_for in line.strip():
return True
else:
return False
@staticmethod
+ def find_log_json(prefix, json_message):
+ print("Looking for:")
+ print("Prefix: " + str(prefix))
+ print("Json: " + str(json_message))
+ try:
+ decoded_message = json.loads(json_message)
+ except json.JSONDecodeError:
+ print("Could not decode given message")
+ return False
+ pattern = re.compile(prefix + "(.*)$")
+ client = docker.from_env()
+ container = client.containers.get('prh')
+ for line in container.logs(stream=True):
+ print("Check for log analysis line: ", line )
+ if PrhLibrary.__same_json_in_log(decoded_message, line, pattern):
+ return True
+ else:
+ return False
+
+ @staticmethod
def create_invalid_notification(json_file):
- event = json.loads(json_file)[0]
- correlation_id = PrhLibrary.extract_correlation_id_value(event, "correlationId")
- ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV4IpAddress", "oamV4IpAddress")
- ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV6IpAddress", "oamV6IpAddress")
- serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "serialNumber", "serialNumber")
- vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "vendorName", "vendorName")
- model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "modelNumber", "modelNumber")
- unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "unitType", "unitType")
+ output = {}
+ input = json.loads(json_file)[0]
- additional_fields = PrhLibrary.extract_additional_fields(event)
+ output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
+ output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress")
+ output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress")
+ output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
+ output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
+ output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
+ output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
+ output['nfNamingCode'] = ''
+ output['softwareVersion'] = ''
- str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields
- return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}'
+ output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
+
+ return json.dumps(output)
@staticmethod
def create_pnf_ready_notification_as_pnf_ready(json_file):
- json_to_python = json.loads(json_file)
- correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId")
+ output = {}
+ input = json.loads(json_file)[0]
- additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python)
+ output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
+ output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
+ output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
+ output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
+ output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
+ output["nf-role"] = PrhLibrary.__extract_nf_role(input)
+ output["sw-version"] = ""
- str_json = '{' + correlation_id + additional_fields
+ output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
- return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1]
-
- @staticmethod
- def extract_additional_fields_value(content):
- fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
- if len(fields) == 0:
- return ""
- return PrhLibrary.build_additional_fields_json(fields)
-
- @staticmethod
- def extract_additional_fields(content):
- fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content)
- if fields == []:
- return '"additionalFields":null'
- return PrhLibrary.build_additional_fields_json(fields)
-
- @staticmethod
- def get_additional_fields_as_key_value_pairs(content):
- return content.get("event").get("pnfRegistrationFields").get(
- "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else []
-
- @staticmethod
- def build_additional_fields_json(fields):
- res = '"additionalFields":{'
- for f in fields:
- res += '"' + f + '":"' + fields.get(f) + '",'
- return res.rstrip(',') + '},'
-
- @staticmethod
- def extract_value_from_pnfRegistrationFields(content, name, key):
- return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",')
-
- @staticmethod
- def extract_correlation_id_value(content, name):
- return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",')
-
- @staticmethod
- def create_pnf_name(json_file):
- json_to_python = json.loads(json_file)
- correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
- return correlation_id
+ return json.dumps(output)
@staticmethod
def ensure_container_is_running(name):
@@ -127,4 +116,55 @@
@staticmethod
def is_in_status(client, name, status):
- return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
\ No newline at end of file
+ return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
+
+ @staticmethod
+ def create_pnf_name(json_file):
+ json_to_python = json.loads(json_file)
+ correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
+ return correlation_id
+
+ @staticmethod
+ def __get_additional_fields_as_key_value_pairs(content):
+ return content.get("event").get("pnfRegistrationFields").get(
+ "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {}
+
+ @staticmethod
+ def __extract_value_from_pnfRegistrationFields(content, key):
+ return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else ''
+
+ @staticmethod
+ def __extract_correlation_id_value(content):
+ return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else ''
+
+ @staticmethod
+ def __extract_nf_role(content):
+ return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else ''
+
+ @staticmethod
+ def __same_json_in_log(decoded_message, line, pattern):
+ extracted_json = PrhLibrary.__extract_json(line, pattern)
+ if extracted_json is not None:
+ print("Found json: " + extracted_json)
+ try:
+ if json.loads(extracted_json) == decoded_message:
+ return True
+ except json.JSONDecodeError:
+ print("Could not decode")
+ return False
+
+ @staticmethod
+ def __extract_json(line, pattern):
+ full_message = PrhLibrary.__extract_full_message_from_line(line)
+ if full_message is not None:
+ match = pattern.match(full_message)
+ if match:
+ return match.group(1).replace("\\n", "\n").replace("\\t", "\t")
+ return None
+
+ @staticmethod
+ def __extract_full_message_from_line(line):
+ split = line.split("|")
+ if len(split) > 3:
+ return split[3]
+ return None