| import json |
| |
| import docker |
| import time |
| from docker.utils.json_stream import json_stream |
| from collections import OrderedDict |
| |
| |
| class BbsLibrary(object): |
| |
| def __init__(self): |
| pass |
| |
| @staticmethod |
| def check_for_log(search_for): |
| client = docker.from_env() |
| container = client.containers.get('bbs') |
| |
| alog = container.logs(stream=False, tail=1000) |
| try: |
| alog = alog.decode('utf-8').strip() |
| except AttributeError: |
| pass |
| |
| found = alog.find(search_for) |
| if found != -1: |
| return True |
| else: |
| return False |
| |
| @staticmethod |
| def create_pnf_name_from_auth(json_file): |
| json_to_python = json.loads(json_file) |
| correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") |
| return correlation_id |
| |
| @staticmethod |
| def get_invalid_auth_elements(json_file): |
| """ |
| Get the correlationId, oldState, newState, stateInterface, macAddress, swVersion elements |
| from the invalid message and place the elements into a JSON object (string) as fields for comparision |
| """ |
| eventString = json.loads(json_file)[0] |
| json_to_python = json.loads(eventString.replace("\\", "")) |
| correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName") |
| oldState = json_to_python.get("event").get("stateChangeFields").get("oldState") |
| newState = json_to_python.get("event").get("stateChangeFields").get("newState") |
| stateInterface = json_to_python.get("event").get("stateChangeFields").get("stateInterface") |
| macAddress = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("macAddress") |
| swVersion = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("swVersion") |
| if swVersion is None: |
| swVersion = "" |
| |
| inv_fields = OrderedDict() |
| |
| #inv_fields = dict() |
| inv_fields['correlationId'] = correlation_id |
| inv_fields['oldState'] = oldState |
| inv_fields['newState'] = newState |
| inv_fields['stateInterface'] = stateInterface |
| inv_fields['macAddress'] = macAddress |
| inv_fields['swVersion'] = swVersion |
| |
| # Transform the dictionary to JSON string |
| json_str = json.dumps(inv_fields) |
| |
| # Need to remove spaces between elements |
| json_str = json_str.replace(', ', ',') |
| return json_str |
| |
| @staticmethod |
| def get_invalid_update_elements(json_file): |
| """ |
| Get the correlationId, attachment-point, remote-id, cvlan, svlan, elements |
| from the invalid message and place the elements into a JSON object (string) as fields for comparision |
| """ |
| eventString = json.loads(json_file)[0] |
| json_to_python = json.loads(eventString.replace("\\", "")) |
| correlation_id = json_to_python.get("correlationId") |
| attachmentPoint = json_to_python.get("additionalFields").get("attachment-point") |
| remoteId = json_to_python.get("additionalFields").get("remote-id") |
| cvlan = json_to_python.get("additionalFields").get("cvlan") |
| svlan = json_to_python.get("additionalFields").get("svlan") |
| |
| inv_fields = OrderedDict() |
| #inv_fields = dict() |
| inv_fields['correlationId'] = correlation_id |
| inv_fields['attachment-point'] = attachmentPoint |
| inv_fields['remote-id'] = remoteId |
| inv_fields['cvlan'] = cvlan |
| inv_fields['svlan'] = svlan |
| |
| # Transform the dictionary to JSON string |
| json_str = json.dumps(inv_fields) |
| |
| # Need to remove spaces between elements |
| json_str = json_str.replace(', ', ',') |
| return json_str |
| |
| @staticmethod |
| def compare_policy(dmaap_policy, json_policy): |
| resp = False |
| try: |
| python_policy = json.loads(json_policy).pop() |
| except: |
| python_policy = "" |
| |
| try: |
| python_dmaap_policy = json.loads(dmaap_policy) |
| except: |
| python_dmaap_policy = "" |
| |
| try: |
| d_policy = python_dmaap_policy[0].get("policyName") |
| except: |
| d_policy = "" |
| |
| try: |
| j_policy = python_policy.get("policyName") |
| except: |
| return "False" |
| |
| resp = "False" |
| if (d_policy == j_policy): |
| resp = "True" |
| return resp |
| |
| @staticmethod |
| def create_pnf_name_from_update(json_file): |
| json_to_python = json.loads(json_file) |
| correlation_id = json_to_python.get("correlationId") |
| return correlation_id |
| |
| @staticmethod |
| def ensure_container_is_running(name): |
| |
| client = docker.from_env() |
| |
| if not BbsLibrary.is_in_status(client, name, "running"): |
| print ("starting container", name) |
| container = client.containers.get(name) |
| container.start() |
| BbsLibrary.wait_for_status(client, name, "running") |
| |
| BbsLibrary.print_status(client) |
| |
| @staticmethod |
| def ensure_container_is_exited(name): |
| |
| client = docker.from_env() |
| |
| if not BbsLibrary.is_in_status(client, name, "exited"): |
| print ("stopping container", name) |
| container = client.containers.get(name) |
| container.stop() |
| BbsLibrary.wait_for_status(client, name, "exited") |
| |
| BbsLibrary.print_status(client) |
| |
| @staticmethod |
| def print_status(client): |
| print("containers status") |
| for c in client.containers.list(all=True): |
| print(c.name, " ", c.status) |
| |
| @staticmethod |
| def wait_for_status(client, name, status): |
| while not BbsLibrary.is_in_status(client, name, status): |
| print ("waiting for container: ", name, "to be in status: ", status) |
| time.sleep(3) |
| |
| @staticmethod |
| def is_in_status(client, name, status): |
| return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1 |
| |