blob: b99426113d6eddd72bd2e788ccb209edd882373f [file] [log] [blame]
import json
import re
import docker
import time
class PrhLibrary(object):
def __init__(self):
pass
@staticmethod
def find_one_of_log_entryies(searched_entries):
print(type(searched_entries))
client = docker.from_env()
container = client.containers.get('prh')
print("Check for log searches for pattern: ", searched_entries)
for line in container.logs(stream=True):
print("Check for log analysis line: ", line )
for searched_entry in searched_entries:
if searched_entry in line.strip():
return True
else:
return False
@staticmethod
def find_log_json(prefix, json_message):
print("Looking for:")
print("Prefix: " + str(prefix))
print("Json: " + str(json_message))
try:
decoded_message = json.loads(json_message)
except json.JSONDecodeError:
print("Could not decode given message")
return False
pattern = re.compile(prefix + "(.*)$")
client = docker.from_env()
container = client.containers.get('prh')
for line in container.logs(stream=True):
print("Check for log analysis line: ", line )
if PrhLibrary.__same_json_in_log(decoded_message, line, pattern):
return True
else:
return False
@staticmethod
def create_invalid_notification(json_file):
output = {}
input = json.loads(json_file)[0]
output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
output["oamV4IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV4IpAddress")
output["oamV6IpAddress"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "oamV6IpAddress")
output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
output["vendorName"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
output["modelNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
output["unitType"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
output['nfNamingCode'] = ''
output['softwareVersion'] = ''
output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
return json.dumps(output)
@staticmethod
def create_pnf_ready_notification_as_pnf_ready(json_file):
output = {}
input = json.loads(json_file)[0]
output["correlationId"] = PrhLibrary.__extract_correlation_id_value(input)
output["serialNumber"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "serialNumber")
output["equip-vendor"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "vendorName")
output["equip-model"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "modelNumber")
output["equip-type"] = PrhLibrary.__extract_value_from_pnfRegistrationFields(input, "unitType")
output["nf-role"] = PrhLibrary.__extract_nf_role(input)
output["sw-version"] = ""
output["additionalFields"] = PrhLibrary.__get_additional_fields_as_key_value_pairs(input)
return json.dumps(output)
@staticmethod
def ensure_container_is_running(name):
client = docker.from_env()
if not PrhLibrary.is_in_status(client, name, "running"):
print ("starting container", name)
container = client.containers.get(name)
container.start()
PrhLibrary.wait_for_status(client, name, "running")
PrhLibrary.print_status(client)
@staticmethod
def ensure_container_is_exited(name):
client = docker.from_env()
if not PrhLibrary.is_in_status(client, name, "exited"):
print ("stopping container", name)
container = client.containers.get(name)
container.stop()
PrhLibrary.wait_for_status(client, name, "exited")
PrhLibrary.print_status(client)
@staticmethod
def print_status(client):
print("containers status")
for c in client.containers.list(all=True):
print(c.name, " ", c.status)
@staticmethod
def wait_for_status(client, name, status):
while not PrhLibrary.is_in_status(client, name, status):
print ("waiting for container: ", name, "to be in status: ", status)
time.sleep(3)
@staticmethod
def is_in_status(client, name, status):
return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
@staticmethod
def create_pnf_name(json_file):
json_to_python = json.loads(json_file)
correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",'
return correlation_id
@staticmethod
def __get_additional_fields_as_key_value_pairs(content):
return content.get("event").get("pnfRegistrationFields").get(
"additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else {}
@staticmethod
def __extract_value_from_pnfRegistrationFields(content, key):
return content["event"]["pnfRegistrationFields"][key] if key in content["event"]["pnfRegistrationFields"] else ''
@staticmethod
def __extract_correlation_id_value(content):
return content["event"]["commonEventHeader"]["sourceName"] if "sourceName" in content["event"]["commonEventHeader"] else ''
@staticmethod
def __extract_nf_role(content):
return content["event"]["commonEventHeader"]["nfNamingCode"] if "nfNamingCode" in content["event"]["commonEventHeader"] else ''
@staticmethod
def __same_json_in_log(decoded_message, line, pattern):
extracted_json = PrhLibrary.__extract_json(line, pattern)
if extracted_json is not None:
print("Found json: " + extracted_json)
try:
if json.loads(extracted_json) == decoded_message:
return True
except json.JSONDecodeError:
print("Could not decode")
return False
@staticmethod
def __extract_json(line, pattern):
full_message = PrhLibrary.__extract_full_message_from_line(line)
if full_message is not None:
match = pattern.match(full_message)
if match:
return match.group(1).replace("\\n", "\n").replace("\\t", "\t")
return None
@staticmethod
def __extract_full_message_from_line(line):
split = line.split("|")
if len(split) > 3:
return split[3]
return None