| import json |
| |
| import docker |
| import time |
| |
| |
| class PrhLibrary(object): |
| |
| def __init__(self): |
| pass |
| |
| @staticmethod |
| def find_log_entry(search_for): |
| print (type(search_for)) |
| client = docker.from_env() |
| container = client.containers.get('prh') |
| print ("Check for log searches for pattern: ", search_for ) |
| for line in container.logs(stream=True): |
| print ("Check for log analysis line: ", line ) |
| if search_for in line.strip(): |
| return True |
| else: |
| return False |
| |
| @staticmethod |
| def create_invalid_notification(json_file): |
| event = json.loads(json_file)[0] |
| correlation_id = PrhLibrary.extract_correlation_id_value(event, "correlationId") |
| ipv4 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV4IpAddress", "oamV4IpAddress") |
| ipv6 = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "oamV6IpAddress", "oamV6IpAddress") |
| serial_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "serialNumber", "serialNumber") |
| vendor_name = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "vendorName", "vendorName") |
| model_number = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "modelNumber", "modelNumber") |
| unit_type = PrhLibrary.extract_value_from_pnfRegistrationFields(event, "unitType", "unitType") |
| |
| additional_fields = PrhLibrary.extract_additional_fields(event) |
| |
| str_json = '{' + correlation_id + ipv4 + ipv6 + serial_number + vendor_name + model_number + unit_type + '"nfNamingCode":""' + "," + '"softwareVersion":"",' + additional_fields |
| return json.dumps(str_json).replace("\\", "")[1:-1].replace("\":", "\": ").rstrip(',') + '\\n}' |
| |
| @staticmethod |
| def create_pnf_ready_notification_as_pnf_ready(json_file): |
| json_to_python = json.loads(json_file) |
| correlation_id = PrhLibrary.extract_correlation_id_value(json_to_python, "correlationId") |
| |
| additional_fields = PrhLibrary.extract_additional_fields_value(json_to_python) |
| |
| str_json = '{' + correlation_id + additional_fields |
| |
| return json.dumps(str_json.rstrip(',') + '}').replace("\\", "")[1:-1] |
| |
| @staticmethod |
| def extract_additional_fields_value(content): |
| fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content) |
| if len(fields) == 0: |
| return "" |
| return PrhLibrary.build_additional_fields_json(fields) |
| |
| @staticmethod |
| def extract_additional_fields(content): |
| fields = PrhLibrary.get_additional_fields_as_key_value_pairs(content) |
| if fields == []: |
| return '"additionalFields":null' |
| return PrhLibrary.build_additional_fields_json(fields) |
| |
| @staticmethod |
| def get_additional_fields_as_key_value_pairs(content): |
| return content.get("event").get("pnfRegistrationFields").get( |
| "additionalFields") if "additionalFields" in content["event"]["pnfRegistrationFields"] else [] |
| |
| @staticmethod |
| def build_additional_fields_json(fields): |
| res = '"additionalFields":{' |
| for f in fields: |
| res += '"' + f + '":"' + fields.get(f) + '",' |
| return res.rstrip(',') + '},' |
| |
| @staticmethod |
| def extract_value_from_pnfRegistrationFields(content, name, key): |
| return '"' + name + '":"' + (content.get("event").get("pnfRegistrationFields").get(key) + '",' if key in content["event"]["pnfRegistrationFields"] else '",') |
| |
| @staticmethod |
| def extract_correlation_id_value(content, name): |
| return '"' + name + '":"' + (content.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in content["event"]["commonEventHeader"] else '",') |
| |
| @staticmethod |
| def create_pnf_name(json_file): |
| json_to_python = json.loads(json_file) |
| correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") + '",' if "sourceName" in json_to_python["event"]["commonEventHeader"] else '",' |
| return correlation_id |
| |
| @staticmethod |
| def ensure_container_is_running(name): |
| client = docker.from_env() |
| |
| if not PrhLibrary.is_in_status(client, name, "running"): |
| print ("starting container", name) |
| container = client.containers.get(name) |
| container.start() |
| PrhLibrary.wait_for_status(client, name, "running") |
| |
| PrhLibrary.print_status(client) |
| |
| @staticmethod |
| def ensure_container_is_exited(name): |
| client = docker.from_env() |
| |
| if not PrhLibrary.is_in_status(client, name, "exited"): |
| print ("stopping container", name) |
| container = client.containers.get(name) |
| container.stop() |
| PrhLibrary.wait_for_status(client, name, "exited") |
| |
| PrhLibrary.print_status(client) |
| |
| @staticmethod |
| def print_status(client): |
| print("containers status") |
| for c in client.containers.list(all=True): |
| print(c.name, " ", c.status) |
| |
| @staticmethod |
| def wait_for_status(client, name, status): |
| while not PrhLibrary.is_in_status(client, name, status): |
| print ("waiting for container: ", name, "to be in status: ", status) |
| time.sleep(3) |
| |
| @staticmethod |
| def is_in_status(client, name, status): |
| return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1 |