blob: 640b33b0cef4d0d30d2c1646c6bb3839bad0fae1 [file] [log] [blame]
import json
import docker
import time
from docker.utils.json_stream import json_stream
from collections import OrderedDict
class BbsLibrary(object):
def __init__(self):
pass
@staticmethod
def check_for_log(search_for):
client = docker.from_env()
container = client.containers.get('bbs')
alog = container.logs(stream=False, tail=1000)
try:
alog = alog.decode('utf-8').strip()
except AttributeError:
pass
found = alog.find(search_for)
if found != -1:
return True
else:
return False
@staticmethod
def create_pnf_name_from_auth(json_file):
json_to_python = json.loads(json_file)
correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
return correlation_id
@staticmethod
def get_invalid_auth_elements(json_file):
"""
Get the correlationId, oldState, newState, stateInterface, macAddress, swVersion elements
from the invalid message and place the elements into a JSON object (string) as fields for comparision
"""
eventString = json.loads(json_file)[0]
json_to_python = json.loads(eventString.replace("\\", ""))
correlation_id = json_to_python.get("event").get("commonEventHeader").get("sourceName")
oldState = json_to_python.get("event").get("stateChangeFields").get("oldState")
newState = json_to_python.get("event").get("stateChangeFields").get("newState")
stateInterface = json_to_python.get("event").get("stateChangeFields").get("stateInterface")
macAddress = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("macAddress")
swVersion = json_to_python.get("event").get("stateChangeFields").get("additionalFields").get("swVersion")
if swVersion is None:
swVersion = ""
inv_fields = OrderedDict()
#inv_fields = dict()
inv_fields['correlationId'] = correlation_id
inv_fields['oldState'] = oldState
inv_fields['newState'] = newState
inv_fields['stateInterface'] = stateInterface
inv_fields['macAddress'] = macAddress
inv_fields['swVersion'] = swVersion
# Transform the dictionary to JSON string
json_str = json.dumps(inv_fields)
# Need to remove spaces between elements
json_str = json_str.replace(', ', ',')
return json_str
@staticmethod
def get_invalid_update_elements(json_file):
"""
Get the correlationId, attachment-point, remote-id, cvlan, svlan, elements
from the invalid message and place the elements into a JSON object (string) as fields for comparision
"""
eventString = json.loads(json_file)[0]
json_to_python = json.loads(eventString.replace("\\", ""))
correlation_id = json_to_python.get("correlationId")
attachmentPoint = json_to_python.get("additionalFields").get("attachment-point")
remoteId = json_to_python.get("additionalFields").get("remote-id")
cvlan = json_to_python.get("additionalFields").get("cvlan")
svlan = json_to_python.get("additionalFields").get("svlan")
inv_fields = OrderedDict()
#inv_fields = dict()
inv_fields['correlationId'] = correlation_id
inv_fields['attachment-point'] = attachmentPoint
inv_fields['remote-id'] = remoteId
inv_fields['cvlan'] = cvlan
inv_fields['svlan'] = svlan
# Transform the dictionary to JSON string
json_str = json.dumps(inv_fields)
# Need to remove spaces between elements
json_str = json_str.replace(', ', ',')
return json_str
@staticmethod
def compare_policy(dmaap_policy, json_policy):
resp = False
try:
python_policy = json.loads(json_policy).pop()
except:
python_policy = ""
try:
python_dmaap_policy = json.loads(dmaap_policy)
except:
python_dmaap_policy = ""
try:
d_policy = python_dmaap_policy[0].get("policyName")
except:
d_policy = ""
try:
j_policy = python_policy.get("policyName")
except:
return "False"
resp = "False"
if (d_policy == j_policy):
resp = "True"
return resp
@staticmethod
def create_pnf_name_from_update(json_file):
json_to_python = json.loads(json_file)
correlation_id = json_to_python.get("correlationId")
return correlation_id
@staticmethod
def ensure_container_is_running(name):
client = docker.from_env()
if not BbsLibrary.is_in_status(client, name, "running"):
print ("starting container", name)
container = client.containers.get(name)
container.start()
BbsLibrary.wait_for_status(client, name, "running")
BbsLibrary.print_status(client)
@staticmethod
def ensure_container_is_exited(name):
client = docker.from_env()
if not BbsLibrary.is_in_status(client, name, "exited"):
print ("stopping container", name)
container = client.containers.get(name)
container.stop()
BbsLibrary.wait_for_status(client, name, "exited")
BbsLibrary.print_status(client)
@staticmethod
def print_status(client):
print("containers status")
for c in client.containers.list(all=True):
print(c.name, " ", c.status)
@staticmethod
def wait_for_status(client, name, status):
while not BbsLibrary.is_in_status(client, name, status):
print ("waiting for container: ", name, "to be in status: ", status)
time.sleep(3)
@staticmethod
def is_in_status(client, name, status):
return len(client.containers.list(all=True, filters={"name": "^/"+name+"$", "status": status})) == 1