blob: c501c126783423d17e84d1a0fe458ac1d8f0747a [file] [log] [blame]
# ============LICENSE_START=======================================================
# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""test of the package for policy-handler of DCAE-Controller"""
import base64
import copy
import json
import re
import time
import uuid
import cherrypy
from cherrypy.test.helper import CPWebCase
import pytest
from policyhandler.config import Config
from policyhandler.deploy_handler import DeployHandler
from policyhandler.discovery import DiscoveryClient
from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, Audit,
AuditHttpCode)
from policyhandler.policy_consts import (LATEST_POLICIES, POLICY_BODY,
POLICY_CONFIG, POLICY_ID, POLICY_NAME,
POLICY_VERSION, POLICY_VERSIONS)
from policyhandler.policy_receiver import (LOADED_POLICIES, POLICY_VER,
REMOVED_POLICIES, PolicyReceiver)
from policyhandler.policy_rest import PolicyRest
from policyhandler.policy_utils import PolicyUtils, Utils
from policyhandler.web_server import _PolicyWeb
from .mock_settings import MonkeyedResponse, Settings
Settings.init()
class MonkeyPolicyBody(object):
"""policy body that policy-engine returns"""
@staticmethod
def create_policy_body(policy_id, policy_version=1):
"""returns a fake policy-body"""
prev_ver = str(policy_version - 1)
this_ver = str(policy_version)
config = {
"policy_updated_from_ver": prev_ver,
"policy_updated_to_ver": this_ver,
"policy_hello": "world!",
"policy_updated_ts": Settings.RUN_TS,
"updated_policy_id": policy_id
}
return {
"policyConfigMessage": "Config Retrieved! ",
"policyConfigStatus": "CONFIG_RETRIEVED",
"type": "JSON",
POLICY_NAME: "{0}.{1}.xml".format(policy_id, this_ver),
POLICY_VERSION: this_ver,
POLICY_CONFIG: json.dumps(config),
"matchingConditions": {
"ONAPName": "DCAE",
"ConfigName": "alex_config_name"
},
"responseAttributes": {},
"property": None
}
class MockPolicyEngine(object):
"""pretend this is the policy-engine"""
scope_prefix = "test_scope_prefix.Config_"
LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split()
LONG_TEXT = "0123456789" * 100
_policies = []
@staticmethod
def init():
"""init static vars"""
MockPolicyEngine._policies = [
MonkeyPolicyBody.create_policy_body(
MockPolicyEngine.scope_prefix + policy_id, policy_index + 1)
for policy_id in MockPolicyEngine.LOREM_IPSUM
for policy_index in range(1 + MockPolicyEngine.LOREM_IPSUM.index(policy_id))]
Settings.logger.info("MockPolicyEngine._policies: %s",
json.dumps(MockPolicyEngine._policies))
@staticmethod
def get_config(policy_name):
"""find policy the way the policy-engine finds"""
if not policy_name:
return []
return [copy.deepcopy(policy)
for policy in MockPolicyEngine._policies
if re.match(policy_name, policy[POLICY_NAME])]
@staticmethod
def get_configs_all():
"""get all policies the way the policy-engine finds"""
policies = [copy.deepcopy(policy)
for policy in MockPolicyEngine._policies]
for policy in policies:
policy["config"] = MockPolicyEngine.LONG_TEXT
return policies
@staticmethod
def get_policy_id(policy_index):
"""get the policy_id by index"""
return (MockPolicyEngine.scope_prefix
+ MockPolicyEngine.LOREM_IPSUM[
policy_index % len(MockPolicyEngine.LOREM_IPSUM)])
@staticmethod
def gen_policy_latest(policy_index, version_offset=0):
"""generate the policy response by policy_index = version - 1"""
policy_id = MockPolicyEngine.get_policy_id(policy_index)
policy = {
POLICY_ID: policy_id,
POLICY_BODY: MonkeyPolicyBody.create_policy_body(
policy_id, policy_index + 1 - version_offset)
}
return policy_id, PolicyUtils.parse_policy_config(policy)
@staticmethod
def gen_all_policies_latest(version_offset=0):
"""generate all latest policies"""
return dict(MockPolicyEngine.gen_policy_latest(policy_index, version_offset=version_offset)
for policy_index in range(len(MockPolicyEngine.LOREM_IPSUM)))
@staticmethod
def gen_policies_latest(match_to_policy_name):
"""generate all latest policies"""
return dict((k, v)
for k, v in MockPolicyEngine.gen_all_policies_latest().items()
if re.match(match_to_policy_name, k))
MockPolicyEngine.init()
class MockDeploymentHandler(object):
"""pretend this is the deployment-handler"""
@staticmethod
def default_response():
"""generate the deployed policies message"""
return {"server_instance_uuid": Settings.deploy_handler_instance_uuid}
@staticmethod
def get_deployed_policies():
"""generate the deployed policies message"""
response = MockDeploymentHandler.default_response()
policies = dict(
(policy_id, {
POLICY_ID: policy_id,
POLICY_VERSIONS: {policy.get(POLICY_BODY, {}).get(POLICY_VERSION, "999"): True},
"pending_update": False})
for policy_id, policy in (MockPolicyEngine.gen_all_policies_latest(version_offset=1)
.items()))
response["policies"] = policies
return response
@pytest.fixture()
def fix_pdp_post(monkeypatch):
"""monkeyed request /getConfig to PDP"""
def monkeyed_policy_rest_post(full_path, json=None, headers=None, **custom_kwargs):
"""monkeypatch for the POST to policy-engine"""
res_json = MockPolicyEngine.get_config(json.get(POLICY_NAME))
return MonkeyedResponse(full_path, res_json, json, headers)
Settings.logger.info("setup fix_pdp_post")
PolicyRest._lazy_init()
monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post',
monkeyed_policy_rest_post)
yield fix_pdp_post # provide the fixture value
Settings.logger.info("teardown fix_pdp_post")
@pytest.fixture()
def fix_pdp_post_big(monkeypatch):
"""monkeyed request /getConfig to PDP"""
def monkeyed_policy_rest_post(full_path, json=None, headers=None, **custom_kwargs):
"""monkeypatch for the POST to policy-engine"""
res_json = MockPolicyEngine.get_configs_all()
return MonkeyedResponse(full_path, res_json, json, headers)
Settings.logger.info("setup fix_pdp_post_big")
PolicyRest._lazy_init()
monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post',
monkeyed_policy_rest_post)
yield fix_pdp_post_big # provide the fixture value
Settings.logger.info("teardown fix_pdp_post_big")
class MockException(Exception):
"""mock exception"""
pass
@pytest.fixture()
def fix_pdp_post_boom(monkeypatch):
"""monkeyed request /getConfig to PDP - exception"""
def monkeyed_policy_rest_post_boom(full_path, json=None, headers=None, **custom_kwargs):
"""monkeypatch for the POST to policy-engine"""
raise MockException("fix_pdp_post_boom")
Settings.logger.info("setup fix_pdp_post_boom")
PolicyRest._lazy_init()
monkeypatch.setattr('policyhandler.policy_rest.PolicyRest._requests_session.post',
monkeyed_policy_rest_post_boom)
yield fix_pdp_post_boom
Settings.logger.info("teardown fix_pdp_post_boom")
@staticmethod
def monkeyed_boom(*args, **kwargs):
"""monkeypatch for the select_latest_policies"""
raise MockException("monkeyed_boom")
@pytest.fixture()
def fix_select_latest_policies_boom(monkeypatch):
"""monkeyed exception"""
Settings.logger.info("setup fix_select_latest_policies_boom")
monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.select_latest_policies',
monkeyed_boom)
monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.select_latest_policy',
monkeyed_boom)
monkeypatch.setattr('policyhandler.policy_utils.PolicyUtils.extract_policy_id',
monkeyed_boom)
yield fix_select_latest_policies_boom
Settings.logger.info("teardown fix_select_latest_policies_boom")
@pytest.fixture()
def fix_discovery(monkeypatch):
"""monkeyed discovery request.get"""
def monkeyed_discovery(full_path):
"""monkeypatch for get from consul"""
res_json = {}
if full_path == DiscoveryClient.CONSUL_SERVICE_MASK.format(
Config.discovered_config.get_by_key(Config.DEPLOY_HANDLER)):
res_json = [{
"ServiceAddress": "1.1.1.1",
"ServicePort": "123"
}]
elif full_path == DiscoveryClient.CONSUL_KV_MASK.format(Config.system_name):
res_json = [{"Value": base64.b64encode(
json.dumps(Settings.mock_config).encode()).decode("utf-8")}]
return MonkeyedResponse(full_path, res_json)
Settings.logger.info("setup fix_discovery")
monkeypatch.setattr('policyhandler.discovery.requests.get', monkeyed_discovery)
yield fix_discovery # provide the fixture value
Settings.logger.info("teardown fix_discovery")
@pytest.fixture()
def fix_deploy_handler(monkeypatch):
"""monkeyed requests to deployment-handler"""
def monkeyed_deploy_handler_put(full_path, json=None, headers=None,
params=None, **custom_kwargs):
"""monkeypatch for policy-update request.put to deploy_handler"""
return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(),
json, headers)
def monkeyed_deploy_handler_get(full_path, headers=None, params=None, **custom_kwargs):
"""monkeypatch policy-update request.get to deploy_handler"""
return MonkeyedResponse(full_path, MockDeploymentHandler.get_deployed_policies(),
None, headers)
Settings.logger.info("setup fix_deploy_handler")
audit = Audit(req_message="fix_deploy_handler")
DeployHandler._lazy_init(audit)
monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put',
monkeyed_deploy_handler_put)
monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get',
monkeyed_deploy_handler_get)
yield fix_deploy_handler # provide the fixture value
audit.audit_done("teardown")
Settings.logger.info("teardown fix_deploy_handler")
@pytest.fixture()
def fix_deploy_handler_fail(monkeypatch):
"""monkeyed failed discovery request.get"""
def monkeyed_deploy_handler_put(full_path, json=None, headers=None,
params=None, **custom_kwargs):
"""monkeypatch for deploy_handler"""
res = MonkeyedResponse(
full_path,
{"server_instance_uuid": Settings.deploy_handler_instance_uuid},
json, headers
)
res.status_code = 413
return res
def monkeyed_deploy_handler_get(full_path, headers=None, params=None, **custom_kwargs):
"""monkeypatch policy-update request.get to deploy_handler"""
return MonkeyedResponse(full_path, MockDeploymentHandler.default_response(),
None, headers)
@staticmethod
def monkeyed_deploy_handler_init(audit_ignore):
"""monkeypatch for deploy_handler init"""
DeployHandler._url = None
Settings.logger.info("setup fix_deploy_handler_fail")
config_catch_up = Config.discovered_config.get_by_key("catch_up")
Config.discovered_config.update("catch_up", {"interval": 1})
audit = Audit(req_message="fix_deploy_handler_fail")
DeployHandler._lazy_init(audit)
monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._lazy_init',
monkeyed_deploy_handler_init)
monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put',
monkeyed_deploy_handler_put)
monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get',
monkeyed_deploy_handler_get)
yield fix_deploy_handler_fail
audit.audit_done("teardown")
Settings.logger.info("teardown fix_deploy_handler_fail")
Config.discovered_config.update("catch_up", config_catch_up)
@pytest.fixture()
def fix_cherrypy_engine_exit(monkeypatch):
"""monkeyed cherrypy.engine.exit()"""
Settings.logger.info("setup fix_cherrypy_engine_exit")
def monkeyed_cherrypy_engine_exit():
"""monkeypatch for deploy_handler"""
Settings.logger.info("cherrypy_engine_exit()")
monkeypatch.setattr('policyhandler.web_server.cherrypy.engine.exit',
monkeyed_cherrypy_engine_exit)
yield fix_cherrypy_engine_exit # provide the fixture value
Settings.logger.info("teardown fix_cherrypy_engine_exit")
class MonkeyedWebSocket(object):
"""Monkey websocket"""
on_message = None
@staticmethod
def send_notification(updated_indexes):
"""fake notification through the web-socket"""
if not MonkeyedWebSocket.on_message:
return
message = {
LOADED_POLICIES: [
{POLICY_NAME: "{0}.{1}.xml".format(
MockPolicyEngine.get_policy_id(policy_index), policy_index + 1),
POLICY_VER: str(policy_index + 1)}
for policy_index in updated_indexes or []
],
REMOVED_POLICIES : []
}
message = json.dumps(message)
Settings.logger.info("send_notification: %s", message)
MonkeyedWebSocket.on_message(None, message)
@staticmethod
def enableTrace(yes_no):
"""ignore"""
pass
class MonkeyedSocket(object):
"""Monkey websocket"""
def __init__(self):
self.connected = True
class WebSocketApp(object):
"""Monkeyed WebSocketApp"""
def __init__(self, web_socket_url, on_message=None, on_close=None, on_error=None):
self.web_socket_url = web_socket_url
self.on_message = MonkeyedWebSocket.on_message = on_message
self.on_close = on_close
self.on_error = on_error
self.sock = MonkeyedWebSocket.MonkeyedSocket()
Settings.logger.info("MonkeyedWebSocket for: %s", self.web_socket_url)
def run_forever(self):
"""forever in the loop"""
counter = 0
while self.sock.connected:
counter += 1
Settings.logger.info("MonkeyedWebSocket sleep %s...", counter)
time.sleep(5)
Settings.logger.info("MonkeyedWebSocket exit %s", counter)
def close(self):
"""close socket"""
self.sock.connected = False
@pytest.fixture()
def fix_policy_receiver_websocket(monkeypatch):
"""monkeyed websocket for policy_receiver"""
Settings.logger.info("setup fix_policy_receiver_websocket")
monkeypatch.setattr('policyhandler.policy_receiver.websocket', MonkeyedWebSocket)
yield fix_policy_receiver_websocket # provide the fixture value
Settings.logger.info("teardown fix_policy_receiver_websocket")
def test_get_policy_latest(fix_pdp_post, fix_discovery):
"""test /policy_latest/<policy-id>"""
policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3)
audit = Audit(job_name="test_get_policy_latest",
req_message="get /policy_latest/{0}".format(policy_id or ""))
policy_latest = PolicyRest.get_latest_policy((audit, policy_id, None, None)) or {}
audit.audit_done(result=json.dumps(policy_latest))
Settings.logger.info("expected_policy: %s", json.dumps(expected_policy))
Settings.logger.info("policy_latest: %s", json.dumps(policy_latest))
assert Utils.are_the_same(policy_latest, expected_policy)
@pytest.mark.usefixtures("fix_pdp_post", "fix_discovery")
class WebServerTest(CPWebCase):
"""testing the web-server - runs tests in alphabetical order of method names"""
def setup_server():
"""setup the web-server"""
cherrypy.tree.mount(_PolicyWeb(), '/')
setup_server = staticmethod(setup_server)
def test_web_healthcheck(self):
"""test /healthcheck"""
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
Settings.logger.info("got healthcheck: %s", self.body)
self.assertStatus('200 OK')
def test_web_policy_latest(self):
"""test /policy_latest/<policy-id>"""
policy_id, expected_policy = MockPolicyEngine.gen_policy_latest(3)
self.getPage("/policy_latest/{0}".format(policy_id or ""))
self.assertStatus('200 OK')
policy_latest = json.loads(self.body)
Settings.logger.info("policy_latest: %s", self.body)
Settings.logger.info("expected_policy: %s", json.dumps(expected_policy))
assert Utils.are_the_same(policy_latest, expected_policy)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
@pytest.mark.usefixtures("fix_deploy_handler")
def test_web_all_policies_latest(self):
"""test GET /policies_latest"""
expected_policies = MockPolicyEngine.gen_all_policies_latest()
result = self.getPage("/policies_latest")
Settings.logger.info("result: %s", result)
Settings.logger.info("body: %s", self.body)
self.assertStatus('200 OK')
policies_latest = json.loads(self.body)
self.assertIn(LATEST_POLICIES, policies_latest)
policies_latest = policies_latest[LATEST_POLICIES]
Settings.logger.info("policies_latest: %s", json.dumps(policies_latest))
Settings.logger.info("expected_policies: %s", json.dumps(expected_policies))
assert Utils.are_the_same(policies_latest, expected_policies)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
def test_web_policies_latest(self):
"""test POST /policies_latest with policyName"""
match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*"
expected_policies = MockPolicyEngine.gen_policies_latest(match_to_policy_name)
body = json.dumps({POLICY_NAME: match_to_policy_name})
result = self.getPage("/policies_latest", method='POST',
body=body,
headers=[
(REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())),
("Content-Type", "application/json"),
('Content-Length', str(len(body)))
])
Settings.logger.info("result: %s", result)
Settings.logger.info("body: %s", self.body)
self.assertStatus('200 OK')
policies_latest = json.loads(self.body)[LATEST_POLICIES]
Settings.logger.info("policies_latest: %s", json.dumps(policies_latest))
Settings.logger.info("expected_policies: %s", json.dumps(expected_policies))
assert Utils.are_the_same(policies_latest, expected_policies)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_policy_updates_and_catch_ups(self):
"""test run policy handler with policy updates and catchups"""
Settings.logger.info("start policy_updates_and_catch_ups")
audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups",
req_message="start policy_updates_and_catch_ups")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1, 3, 5])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.logger.info("sleep 30 before shutdown...")
time.sleep(30)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_catch_up_on_deploy_handler_changed(self):
"""test run policy handler with deployment-handler changed underneath"""
Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed")
audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed",
req_message="start zzz_catch_up_on_deploy_handler_changed")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.deploy_handler_instance_uuid = str(uuid.uuid4())
Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid)
MonkeyedWebSocket.send_notification([2, 4])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.logger.info("sleep 5 before shutdown...")
time.sleep(5)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_get_catch_up(self):
"""test /catch_up"""
Settings.logger.info("start /catch_up")
audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up")
PolicyReceiver.run(audit)
time.sleep(5)
result = self.getPage("/catch_up")
Settings.logger.info("catch_up result: %s", result)
self.assertStatus('200 OK')
Settings.logger.info("got catch_up: %s", self.body)
Settings.logger.info("sleep 5 before shutdown...")
time.sleep(5)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures(
"fix_deploy_handler",
"fix_policy_receiver_websocket",
"fix_cherrypy_engine_exit")
def test_zzzzz_shutdown(self):
"""test shutdown"""
Settings.logger.info("start shutdown")
audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1, 3, 5])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
WebServerTest.do_gc_test = False
Settings.logger.info("shutdown...")
audit.audit_done("shutdown")
result = self.getPage("/shutdown")
Settings.logger.info("shutdown result: %s", result)
self.assertStatus('200 OK')
Settings.logger.info("got shutdown: %s", self.body)
time.sleep(1)
@pytest.mark.usefixtures("fix_pdp_post_boom", "fix_discovery")
class WebServerPDPBoomTest(CPWebCase):
"""testing the web-server - runs tests in alphabetical order of method names"""
def setup_server():
"""setup the web-server"""
cherrypy.tree.mount(_PolicyWeb(), '/')
setup_server = staticmethod(setup_server)
def test_web_healthcheck(self):
"""test /healthcheck"""
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
Settings.logger.info("got healthcheck: %s", self.body)
self.assertStatus('200 OK')
def test_web_policy_latest(self):
"""test /policy_latest/<policy-id>"""
policy_id, _ = MockPolicyEngine.gen_policy_latest(3)
self.getPage("/policy_latest/{0}".format(policy_id or ""))
self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
@pytest.mark.usefixtures("fix_deploy_handler")
def test_web_all_policies_latest(self):
"""test GET /policies_latest"""
result = self.getPage("/policies_latest")
Settings.logger.info("result: %s", result)
Settings.logger.info("body: %s", self.body)
self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
def test_web_policies_latest(self):
"""test POST /policies_latest with policyName"""
match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*"
body = json.dumps({POLICY_NAME: match_to_policy_name})
result = self.getPage("/policies_latest", method='POST',
body=body,
headers=[
(REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())),
("Content-Type", "application/json"),
('Content-Length', str(len(body)))
])
Settings.logger.info("result: %s", result)
Settings.logger.info("body: %s", self.body)
self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_policy_updates_and_catch_ups(self):
"""test run policy handler with policy updates and catchups"""
Settings.logger.info("start policy_updates_and_catch_ups")
audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups",
req_message="start policy_updates_and_catch_ups")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1, 3, 5])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.logger.info("sleep 30 before shutdown...")
time.sleep(30)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_catch_up_on_deploy_handler_changed(self):
"""test run policy handler with deployment-handler changed underneath"""
Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed")
audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed",
req_message="start zzz_catch_up_on_deploy_handler_changed")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.deploy_handler_instance_uuid = str(uuid.uuid4())
Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid)
MonkeyedWebSocket.send_notification([2, 4])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.logger.info("sleep 5 before shutdown...")
time.sleep(5)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_get_catch_up(self):
"""test /catch_up"""
Settings.logger.info("start /catch_up")
audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up")
PolicyReceiver.run(audit)
time.sleep(5)
result = self.getPage("/catch_up")
Settings.logger.info("catch_up result: %s", result)
self.assertStatus('200 OK')
Settings.logger.info("got catch_up: %s", self.body)
Settings.logger.info("sleep 5 before shutdown...")
time.sleep(5)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures(
"fix_deploy_handler",
"fix_policy_receiver_websocket",
"fix_cherrypy_engine_exit")
def test_zzzzz_shutdown(self):
"""test shutdown"""
Settings.logger.info("start shutdown")
audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1, 3, 5])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
WebServerPDPBoomTest.do_gc_test = False
Settings.logger.info("shutdown...")
audit.audit_done("shutdown")
result = self.getPage("/shutdown")
Settings.logger.info("shutdown result: %s", result)
self.assertStatus('200 OK')
Settings.logger.info("got shutdown: %s", self.body)
time.sleep(1)
@pytest.mark.usefixtures("fix_pdp_post", "fix_select_latest_policies_boom", "fix_discovery")
class WebServerInternalBoomTest(CPWebCase):
"""testing the web-server - runs tests in alphabetical order of method names"""
def setup_server():
"""setup the web-server"""
cherrypy.tree.mount(_PolicyWeb(), '/')
setup_server = staticmethod(setup_server)
def test_web_healthcheck(self):
"""test /healthcheck"""
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
Settings.logger.info("got healthcheck: %s", self.body)
self.assertStatus('200 OK')
def test_web_policy_latest(self):
"""test /policy_latest/<policy-id>"""
policy_id, _ = MockPolicyEngine.gen_policy_latest(3)
self.getPage("/policy_latest/{0}".format(policy_id or ""))
self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
@pytest.mark.usefixtures("fix_deploy_handler")
def test_web_all_policies_latest(self):
"""test GET /policies_latest"""
result = self.getPage("/policies_latest")
Settings.logger.info("result: %s", result)
Settings.logger.info("body: %s", self.body)
self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
def test_web_policies_latest(self):
"""test POST /policies_latest with policyName"""
match_to_policy_name = MockPolicyEngine.scope_prefix + "amet.*"
body = json.dumps({POLICY_NAME: match_to_policy_name})
result = self.getPage("/policies_latest", method='POST',
body=body,
headers=[
(REQUEST_X_ECOMP_REQUESTID, str(uuid.uuid4())),
("Content-Type", "application/json"),
('Content-Length', str(len(body)))
])
Settings.logger.info("result: %s", result)
Settings.logger.info("body: %s", self.body)
self.assertStatus(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_policy_updates_and_catch_ups(self):
"""test run policy handler with policy updates and catchups"""
Settings.logger.info("start policy_updates_and_catch_ups")
audit = Audit(job_name="test_zzz_policy_updates_and_catch_ups",
req_message="start policy_updates_and_catch_ups")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1, 3, 5])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.logger.info("sleep 30 before shutdown...")
time.sleep(30)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_catch_up_on_deploy_handler_changed(self):
"""test run policy handler with deployment-handler changed underneath"""
Settings.logger.info("start zzz_catch_up_on_deploy_handler_changed")
audit = Audit(job_name="test_zzz_catch_up_on_deploy_handler_changed",
req_message="start zzz_catch_up_on_deploy_handler_changed")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.deploy_handler_instance_uuid = str(uuid.uuid4())
Settings.logger.info("new deploy-handler uuid=%s", Settings.deploy_handler_instance_uuid)
MonkeyedWebSocket.send_notification([2, 4])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
Settings.logger.info("sleep 5 before shutdown...")
time.sleep(5)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures("fix_deploy_handler", "fix_policy_receiver_websocket")
def test_zzz_get_catch_up(self):
"""test /catch_up"""
Settings.logger.info("start /catch_up")
audit = Audit(job_name="test_zzz_get_catch_up", req_message="start /catch_up")
PolicyReceiver.run(audit)
time.sleep(5)
result = self.getPage("/catch_up")
Settings.logger.info("catch_up result: %s", result)
self.assertStatus('200 OK')
Settings.logger.info("got catch_up: %s", self.body)
Settings.logger.info("sleep 5 before shutdown...")
time.sleep(5)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
PolicyReceiver.shutdown(audit)
time.sleep(1)
@pytest.mark.usefixtures(
"fix_deploy_handler",
"fix_policy_receiver_websocket",
"fix_cherrypy_engine_exit")
def test_zzzzz_shutdown(self):
"""test shutdown"""
Settings.logger.info("start shutdown")
audit = Audit(job_name="test_zzzzz_shutdown", req_message="start shutdown")
PolicyReceiver.run(audit)
Settings.logger.info("sleep before send_notification...")
time.sleep(2)
MonkeyedWebSocket.send_notification([1, 3, 5])
Settings.logger.info("sleep after send_notification...")
time.sleep(3)
result = self.getPage("/healthcheck")
Settings.logger.info("healthcheck result: %s", result)
WebServerInternalBoomTest.do_gc_test = False
Settings.logger.info("shutdown...")
audit.audit_done("shutdown")
result = self.getPage("/shutdown")
Settings.logger.info("shutdown result: %s", result)
self.assertStatus('200 OK')
Settings.logger.info("got shutdown: %s", self.body)
time.sleep(1)
@pytest.mark.usefixtures(
"fix_pdp_post_big",
"fix_deploy_handler_fail",
"fix_policy_receiver_websocket",
"fix_discovery"
)
def test_catch_ups_failed_dh():
"""test run policy handler with catchups and failed deployment-handler"""
Settings.logger.info("start test_catch_ups_failed_dh")
audit = Audit(job_name="test_catch_ups_failed_dh",
req_message="start test_catch_ups_failed_dh")
PolicyReceiver.run(audit)
Settings.logger.info("sleep 50 before shutdown...")
time.sleep(50)
health = audit.health(full=True)
audit.audit_done(result=json.dumps(health))
Settings.logger.info("healthcheck: %s", json.dumps(health))
assert bool(health)
PolicyReceiver.shutdown(audit)
time.sleep(1)
health = audit.health(full=True)
Settings.logger.info("healthcheck: %s", json.dumps(health))