blob: 1a3875b9a14a5b0d07d892eb89ccf5fa367d5088 [file] [log] [blame]
import json
import docker
import time
class PrhLibrary(object):
def __init__(self):
pass
@staticmethod
def check_for_log(search_for):
client = docker.from_env()
container = client.containers.get('prh')
#print ("Check for log searches for pattern: ", search_for )
for line in container.logs(stream=True):
#print ("Check for log analysis line: ", line )
if search_for in line.strip():
return True
else:
return False
@staticmethod
def create_pnf_ready_notification_from_ves(json_file):
json_to_python = json.loads(json_file)
ipv4 = PrhLibrary.extract_ip_v4(json_to_python)
ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
serial_number = PrhLibrary.extract_serial_number(json_to_python)
vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
model_number = PrhLibrary.extract_model_number(json_to_python)
unit_type = PrhLibrary.extract_unit_type(json_to_python)
str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serialNumber":"' + serial_number + '","vendorName":"' + vendor_name + '","modelNumber":"' + model_number + '","unitType":"' + unit_type + '"}'
python_to_json = json.dumps(str_json)
return python_to_json.replace("\\", "")[1:-1]
@staticmethod
def create_pnf_ready_notification_as_pnf_ready(json_file):
json_to_python = json.loads(json_file)
ipv4 = PrhLibrary.extract_ip_v4(json_to_python)
ipv6 = PrhLibrary.extract_ip_v6(json_to_python)
correlation_id = PrhLibrary.extract_correlation_id(json_to_python)
serial_number = PrhLibrary.extract_serial_number(json_to_python)
vendor_name = PrhLibrary.extract_vendor_name(json_to_python)
model_number = PrhLibrary.extract_model_number(json_to_python)
unit_type = PrhLibrary.extract_unit_type(json_to_python)
nf_role = json_to_python.get("event").get("commonEventHeader").get("nfNamingCode") if "nfNamingCode" in json_to_python["event"]["commonEventHeader"] else ""
str_json = '{"correlationId":"' + correlation_id + '","ipaddress-v4-oam":"' + ipv4 + '","ipaddress-v6-oam":"' + ipv6 + '","serial-number":"' + serial_number + '","equip-vendor":"' + vendor_name + '","equip-model":"' + model_number + '","equip-type":"' + unit_type + '","nf-role":"' + nf_role + '","sw-version":""}'
python_to_json = json.dumps(str_json)
return python_to_json.replace("\\", "")[1:-1]
@staticmethod
def extract_ip_v4(content):
return content.get("event").get("pnfRegistrationFields").get("oamV4IpAddress") if "oamV4IpAddress" in content["event"]["pnfRegistrationFields"] else ""
@staticmethod
def extract_ip_v6(content):
return content.get("event").get("pnfRegistrationFields").get("oamV6IpAddress") if "oamV6IpAddress" in content["event"]["pnfRegistrationFields"] else ""
@staticmethod
def extract_correlation_id(content):
return content.get("event").get("commonEventHeader").get("sourceName") if "sourceName" in content["event"]["commonEventHeader"] else ""
@staticmethod
def extract_serial_number(content):
return content.get("event").get("pnfRegistrationFields").get("serialNumber") if "serialNumber" in content["event"]["pnfRegistrationFields"] else ""
@staticmethod
def extract_vendor_name(content):
return content.get("event").get("pnfRegistrationFields").get("vendorName") if "vendorName" in content["event"]["pnfRegistrationFields"] else ""
@staticmethod
def extract_model_number(content):
return content.get("event").get("pnfRegistrationFields").get("modelNumber") if "modelNumber" in content["event"]["pnfRegistrationFields"] else ""
@staticmethod
def extract_unit_type(content):
return content.get("event").get("pnfRegistrationFields").get("unitType") if "unitType" in content["event"]["pnfRegistrationFields"] else ""
@staticmethod
def create_pnf_name(json_file):
json_to_python = json.loads(json_file)
correlation_id = json_to_python.get("sourceName")
return correlation_id
@staticmethod
def ensure_container_is_running(name):
client = docker.from_env()
if not PrhLibrary.is_in_status(client, name, "running"):
print ("starting container", name)
container = client.containers.get(name)
container.start()
PrhLibrary.wait_for_status(client, name, "running")
PrhLibrary.print_status(client)
@staticmethod
def ensure_container_is_exited(name):
client = docker.from_env()
if not PrhLibrary.is_in_status(client, name, "exited"):
print ("stopping container", name)
container = client.containers.get(name)
container.stop()
PrhLibrary.wait_for_status(client, name, "exited")
PrhLibrary.print_status(client)
@staticmethod
def print_status(client):
print("containers status")
for c in client.containers.list(all=True):
print(c.name, " ", c.status)
@staticmethod
def wait_for_status(client, name, status):
while not PrhLibrary.is_in_status(client, name, status):
print ("waiting for container: ", name, "to be in status: ", status)
time.sleep(3)
@staticmethod
def is_in_status(client, name, status):
return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1
def create_invalid_notification(self, json_file):
invalidate_input = self.create_pnf_ready_notification_from_ves(json_file).replace("\":", "\": ") \
.replace("ipaddress-v4-oam", "oamV4IpAddress").replace("ipaddress-v6-oam","oamV6IpAddress").replace("}",'')
invalidate_input__and_add_additional_fields = invalidate_input + ',"nfNamingCode": ""' + "," + '"softwareVersion": ""' +"\\n"
return invalidate_input__and_add_additional_fields