blob: d3fed3ad285f11f9890672346eda97567b7fb537 [file] [log] [blame]
# ============LICENSE_START===============================================
# Copyright (C) 2020 Nordix Foundation. All rights reserved.
# ========================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=================================================
#
import argparse
from datetime import datetime
from pygments.util import xrange
from requests import ConnectionError
import requests
import sys
import threading
import time
SERVICE_NAME = 'HealthCheck'
BASE_URL = 'http://localhost:8081'
RIC_CHUNK_SIZE = 10
TIME_BETWEEN_CHECKS = 60
type_to_use = ''
policy_body = ''
class Ric:
def __init__(self, name, supported_types, state):
self.name = name
self.supports_type_to_use = self.policy_type_supported(supported_types)
self.state = state
self.no_of_created_policies = 0
self.no_of_read_policies = 0
self.no_of_updated_policies = 0
self.no_of_deleted_policies = 0
def update_supported_types(self, supported_types):
self.supports_type_to_use = self.policy_type_supported(supported_types)
def policy_type_supported(self, supported_policy_types):
for supported_type in supported_policy_types:
if type_to_use == supported_type:
return True
return False
class PolicyCheckThread (threading.Thread):
def __init__(self, thread_id, ric):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.ric = ric
def run(self):
verboseprint(f'Checking ric: {self.ric.name}')
if put_policy(self.thread_id, self.ric.name):
verboseprint(f'Created policy: {self.thread_id} in ric: {self.ric.name}')
self.ric.no_of_created_policies += 1
time.sleep(0.5)
if get_policy(self.thread_id):
verboseprint(f'Read policy: {self.thread_id} from ric: {self.ric.name}')
self.ric.no_of_read_policies += 1
if put_policy(self.thread_id, self.ric.name, update_value=1):
verboseprint(f'Updated policy: {self.thread_id} in ric: {self.ric.name}')
self.ric.no_of_updated_policies += 1
if delete_policy(self.thread_id):
verboseprint(f'Deleted policy: {self.thread_id} from ric: {self.ric.name}')
self.ric.no_of_deleted_policies += 1
def get_rics_from_agent():
resp = requests.get(BASE_URL + '/rics')
if not resp.ok:
verboseprint(f'Unable to get Rics {resp.status_code}')
return {}
return resp.json()
def create_ric_dict(rics_as_json):
rics = {}
for ric_info in rics_as_json:
rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"], ric_info['state']))
verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
return rics
def update_rics():
added_rics = {}
for ric_info in get_rics_from_agent():
if ric_info["ricName"] in rics:
rics[ric_info["ricName"]].update_supported_types(ric_info["policyTypes"])
rics[ric_info["ricName"]].state = ric_info['state']
else:
added_rics[ric_info["ricName"]] = (Ric(ric_info["ricName"], ric_info["policyTypes"]))
verboseprint(f'Adding ric: {rics[ric_info["ricName"]]}')
rics.update(added_rics)
def put_policy(thread_id, ric_name, update_value=0):
policy_id = f'thread_{thread_id}'
complete_url = f'{BASE_URL}/policy?type={type_to_use}&id={policy_id}&ric={ric_name}&service={SERVICE_NAME}'
headers = {'content-type': 'application/json'}
resp = requests.put(complete_url, policy_body.replace('XXX', str(thread_id + update_value)), headers=headers, verify=False)
if not resp.ok:
verboseprint(f'Unable to create policy {resp}')
return False
else:
return True
def get_policy(thread_id):
policy_id = f'thread_{thread_id}'
complete_url = f'{BASE_URL}/policy?id={policy_id}'
resp = requests.get(complete_url)
if not resp.ok:
verboseprint(f'Unable to get policy {resp}')
return False
else:
return True
def delete_policy(thread_id):
policy_id = f'thread_{thread_id}'
complete_url = f'{BASE_URL}/policy?id={policy_id}'
resp = requests.delete(complete_url)
if not resp.ok:
verboseprint(f'Unable to delete policy for policy ID {policy_id}')
return False
return True
def statistics(duration):
no_of_unavailable_rics = 0
no_of_rics_not_supporting_type = 0
no_of_rics_supporting_type = 0
no_of_created_policies = 0
no_of_read_policies = 0
no_of_updated_policies = 0
no_of_deleted_policies = 0
for ric in rics.values():
if not (ric.state == 'AVAILABLE' or ric.state == 'CONSISTENCY_CHECK'):
no_of_unavailable_rics += 1
elif ric.supports_type_to_use:
no_of_rics_supporting_type += 1
no_of_created_policies += ric.no_of_created_policies
no_of_read_policies += ric.no_of_read_policies
no_of_updated_policies += ric.no_of_updated_policies
no_of_deleted_policies += ric.no_of_deleted_policies
else:
no_of_rics_not_supporting_type += 1
print(f'*********** Statistics {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} ***********')
print(f'Duration of check: {duration.total_seconds()} seconds')
print(f'Number of checks: {no_of_checks}')
print(f'Number of unavailable rics: {no_of_unavailable_rics}')
print(f'Number of rics not supporting type: {no_of_rics_not_supporting_type}')
print(f'Number of rics supporting type: {no_of_rics_supporting_type}')
print(f'Number of created policies: {no_of_created_policies}')
print(f'Number of read policies: {no_of_read_policies}')
print(f'Number of updated policies: {no_of_updated_policies}')
print(f'Number of deleted policies: {no_of_deleted_policies}')
print('******************************************************')
def run_check_threads(rics):
thread_id = 1
threads = []
for ric in rics.values():
if ric.supports_type_to_use and (ric.state == 'AVAILABLE' or ric.state == 'CONSISTENCY_CHECK'): #or ric.name == 'ric_not_working':
policy_checker = PolicyCheckThread(thread_id, ric)
policy_checker.start()
thread_id += 1
threads.append(policy_checker)
for checker in threads:
checker.join()
def split_rics_equally(chunks):
# prep with empty dicts
return_list = [dict() for _ in xrange(chunks)]
idx = 0
for k,v in rics.items():
return_list[idx][k] = v
if idx < chunks-1: # indexes start at 0
idx += 1
else:
idx = 0
return return_list
def get_no_of_chunks(size_of_chunks, size_to_chunk):
(q, _) = divmod(size_to_chunk, size_of_chunks)
return q
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='PROG')
parser.add_argument('policyTypeId', help='The ID of the policy type to use')
parser.add_argument('policyBodyPath', help='The path to the JSON body of the policy to create')
parser.add_argument('-v', '--verbose', action='store_true', help='Turn on verbose printing')
parser.add_argument('--version', action='version', version='%(prog)s 1.0')
args = vars(parser.parse_args())
if args['verbose']:
def verboseprint(*args, **kwargs):
print(*args, **kwargs)
else:
verboseprint = lambda *a, **k: None # do-nothing function
verboseprint(f'Using policy type {args["policyTypeId"]}')
verboseprint(f'Using policy file {args["policyBodyPath"]}')
type_to_use = args["policyTypeId"]
with open(args["policyBodyPath"]) as json_file:
policy_body = json_file.read()
verboseprint(f'Policy body: {policy_body}')
try:
rics_from_agent = get_rics_from_agent()
except ConnectionError:
print(f'Policy Agent is not answering on {BASE_URL}, cannot start!')
sys.exit(1)
rics = create_ric_dict(rics_from_agent)
no_of_checks = 0
while True:
start_time = datetime.now()
chunked_rics = split_rics_equally(get_no_of_chunks(RIC_CHUNK_SIZE, rics.__len__()))
for ric_chunk in chunked_rics:
run_check_threads(ric_chunk)
no_of_checks += 1
finish_time = datetime.now()
duration = finish_time - start_time
statistics(duration)
sleep_time = TIME_BETWEEN_CHECKS - duration.total_seconds()
verboseprint(f'Sleeping {sleep_time} seconds')
time.sleep(sleep_time)
update_rics()
verboseprint('Exiting main')