| import json |
| |
| import docker |
| import time |
| |
| |
| class PrhLibrary(object): |
| |
| def __init__(self): |
| pass |
| |
| @staticmethod |
| def check_for_log(search_for): |
| client = docker.from_env() |
| container = client.containers.get('prh') |
| #print ("Check for log searches for pattern: ", search_for ) |
| for line in container.logs(stream=True): |
| #print ("Check for log analysis line: ", line ) |
| if search_for in line.strip(): |
| return True |
| else: |
| return False |
| |
| @staticmethod |
| def create_pnf_ready_notification_from_ves(json_file): |
| json_to_python = json.loads(json_file) |
| ipv4 = PrhLibrary.extract_ip_v4(json_to_python) |
| ipv6 = PrhLibrary.extract_ip_v6(json_to_python) |
| correlation_id = PrhLibrary.extract_correlation_id(json_to_python) |
| serial_number = PrhLibrary.extract_serial_number(json_to_python) |
| vendor_name = PrhLibrary.extract_vendor_name(json_to_python) |
| model_number = PrhLibrary.extract_model_number(json_to_python) |
| unit_type = PrhLibrary.extract_unit_type(json_to_python) |
| |
| str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serialNumber":"' + serial_number + '","vendorName":"' + vendor_name + '","modelNumber":"' + model_number + '","unitType":"' + unit_type + '"}' |
| python_to_json = json.dumps(str_json) |
| return python_to_json.replace("\\", "")[1:-1] |
| |
| @staticmethod |
| def create_pnf_ready_notification_as_pnf_ready(json_file): |
| json_to_python = json.loads(json_file) |
| ipv4 = PrhLibrary.extract_ip_v4(json_to_python) |
| ipv6 = PrhLibrary.extract_ip_v6(json_to_python) |
| correlation_id = PrhLibrary.extract_correlation_id(json_to_python) |
| serial_number = PrhLibrary.extract_serial_number(json_to_python) |
| vendor_name = PrhLibrary.extract_vendor_name(json_to_python) |
| model_number = PrhLibrary.extract_model_number(json_to_python) |
| unit_type = PrhLibrary.extract_unit_type(json_to_python) |
| |
| nf_role = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else "" |
| |
| str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + vendor_name + '","equip-model":"' + model_number + '","equip-type":"' + unit_type + '","nf-role":"' + nf_role + '","sw-version":""}' |
| python_to_json = json.dumps(str_json) |
| return python_to_json.replace("\\", "")[1:-1] |
| |
| @staticmethod |
| def extract_ip_v4(content): |
| return content.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") if "oamV4IpAddress" in content["event"]["pnfRegistrationFields"] else "" |
| |
| @staticmethod |
| def extract_ip_v6(content): |
| return content.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in content["event"]["pnfRegistrationFields"] else "" |
| |
| @staticmethod |
| def extract_correlation_id(content): |
| return content.get("event").get("commonEventHeader").get("sourceName") if "sourceName" in content["event"]["commonEventHeader"] else "" |
| |
| @staticmethod |
| def extract_serial_number(content): |
| return content.get("event").get("pnfRegistrationFields").get("serialNumber") if "serialNumber" in content["event"]["pnfRegistrationFields"] else "" |
| |
| @staticmethod |
| def extract_vendor_name(content): |
| return content.get("event").get("pnfRegistrationFields").get("vendorName") if "vendorName" in content["event"]["pnfRegistrationFields"] else "" |
| |
| @staticmethod |
| def extract_model_number(content): |
| return content.get("event").get("pnfRegistrationFields").get("modelNumber") if "modelNumber" in content["event"]["pnfRegistrationFields"] else "" |
| |
| @staticmethod |
| def extract_unit_type(content): |
| return content.get("event").get("pnfRegistrationFields").get("unitType") if "unitType" in content["event"]["pnfRegistrationFields"] else "" |
| |
| @staticmethod |
| def create_pnf_name(json_file): |
| json_to_python = json.loads(json_file) |
| correlation_id = json_to_python.get("sourceName") |
| return correlation_id |
| |
| @staticmethod |
| def ensure_container_is_running(name): |
| client = docker.from_env() |
| |
| if not PrhLibrary.is_in_status(client, name, "running"): |
| print ("starting container", name) |
| container = client.containers.get(name) |
| container.start() |
| PrhLibrary.wait_for_status(client, name, "running") |
| |
| PrhLibrary.print_status(client) |
| |
| @staticmethod |
| def ensure_container_is_exited(name): |
| client = docker.from_env() |
| |
| if not PrhLibrary.is_in_status(client, name, "exited"): |
| print ("stopping container", name) |
| container = client.containers.get(name) |
| container.stop() |
| PrhLibrary.wait_for_status(client, name, "exited") |
| |
| PrhLibrary.print_status(client) |
| |
| @staticmethod |
| def print_status(client): |
| print("containers status") |
| for c in client.containers.list(all=True): |
| print(c.name, " ", c.status) |
| |
| @staticmethod |
| def wait_for_status(client, name, status): |
| while not PrhLibrary.is_in_status(client, name, status): |
| print ("waiting for container: ", name, "to be in status: ", status) |
| time.sleep(3) |
| |
| @staticmethod |
| def is_in_status(client, name, status): |
| return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1 |
| |
| |
| def create_invalid_notification(self, json_file): |
| invalidate_input = self.create_pnf_ready_notification_from_ves(json_file).replace("\":", "\": ") \ |
| .replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam","oamV6IpAddress").replace("}",'') |
| invalidate_input__and_add_additional_fields = invalidate_input + ',"nfNamingCode": ""' + "," + '"softwareVersion": ""' +"\\n" |
| return invalidate_input__and_add_additional_fields |