| # ============LICENSE_START======================================================= |
| # Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved. |
| # ================================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END========================================================= |
| # |
| # ECOMP is a trademark and service mark of AT&T Intellectual Property. |
| |
| """tests of decorators around the cloudify operations to handle policy actions""" |
| |
| import copy |
| import json |
| import logging |
| from datetime import datetime, timedelta |
| from functools import wraps |
| |
| import pytest |
| import requests |
| from cloudify import ctx |
| from cloudify.exceptions import NonRecoverableError |
| from cloudify.state import current_ctx |
| |
| from onap_dcae_dcaepolicy_lib import dcae_policy |
| from onap_dcae_dcaepolicy_lib.dcae_policy import Policies |
| from onap_dcae_dcaepolicy_lib.policies_output import PoliciesOutput |
| from tests.log_ctx import CtxLogger |
| from tests.mock_cloudify_ctx import (TARGET_NODE_ID, TARGET_NODE_NAME, |
| MockCloudifyContextFull) |
| |
| POLICY_ID = 'policy_id' |
| POLICY_VERSION = "policyVersion" |
| POLICY_NAME = "policyName" |
| POLICY_BODY = 'policy_body' |
| POLICY_CONFIG = 'config' |
| CONFIG_NAME = "ConfigName" |
| |
| MONKEYED_POLICY_ID = 'monkeyed.Config_peach' |
| MONKEYED_POLICY_ID_2 = 'monkeyed.Config_peach_2' |
| MONKEYED_POLICY_ID_M = 'monkeyed.Config_multi' |
| MONKEYED_POLICY_ID_M_2 = 'monkeyed.Config_multi_2' |
| MONKEYED_POLICY_ID_M_3 = 'monkeyed.Config_multi_3' |
| MONKEYED_POLICY_ID_B = 'monkeyed.Config_both' |
| APPLICATION_CONFIG = "application_config" |
| LOG_FILE = 'logs/test_onap_dcae_dcaepolicy_lib.log' |
| LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split() |
| |
| RUN_TS = datetime.utcnow() |
| |
| class MonkeyedLogHandler(object): |
| """keep the shared logger handler here""" |
| _log_handler = None |
| |
| @staticmethod |
| def add_handler_to(logger): |
| """adds the local handler to the logger""" |
| if not MonkeyedLogHandler._log_handler: |
| MonkeyedLogHandler._log_handler = logging.FileHandler(LOG_FILE) |
| MonkeyedLogHandler._log_handler.setLevel(logging.DEBUG) |
| formatter = logging.Formatter( |
| fmt='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \ |
| '%(threadName)s %(name)s.%(funcName)s: %(message)s', \ |
| datefmt='%Y%m%d_%H%M%S') |
| MonkeyedLogHandler._log_handler.setFormatter(formatter) |
| logger.addHandler(MonkeyedLogHandler._log_handler) |
| |
| class MonkeyedPolicyBody(object): |
| """policy body that policy-engine returns""" |
| @staticmethod |
| def create_policy_body(policy_id, policy_version=1, priority=None, **kwargs): |
| """returns a fake policy-body""" |
| prev_ver = policy_version - 1 |
| timestamp = RUN_TS + timedelta(hours=prev_ver) |
| |
| this_ver = str(policy_version) |
| config = { |
| "policy_updated_from_ver": str(prev_ver), |
| "policy_updated_to_ver": this_ver, |
| "policy_hello": LOREM_IPSUM[prev_ver % len(LOREM_IPSUM)], |
| "policy_updated_ts": timestamp.isoformat()[:-3] + 'Z', |
| "updated_policy_id": policy_id |
| } |
| config.update(copy.deepcopy(kwargs) or {}) |
| |
| matching_conditions = { |
| "ONAPName": "DCAE", |
| CONFIG_NAME: "alex_config_name" |
| } |
| if priority is not None: |
| matching_conditions["priority"] = priority |
| |
| return { |
| "policyConfigMessage": "Config Retrieved! ", |
| "policyConfigStatus": "CONFIG_RETRIEVED", |
| "type": "JSON", |
| POLICY_NAME: "{0}.{1}.xml".format(policy_id, this_ver), |
| POLICY_VERSION: this_ver, |
| POLICY_CONFIG: config, |
| "matchingConditions": matching_conditions, |
| "responseAttributes": {}, |
| "property": None |
| } |
| |
| @staticmethod |
| def create_policy_bare(policy_id, policy_version=1, priority=None, **kwargs): |
| """returns the whole policy object for policy_id and policy_version""" |
| return { |
| POLICY_ID: policy_id, |
| POLICY_BODY: MonkeyedPolicyBody.create_policy_body( |
| policy_id, policy_version, priority, **kwargs) |
| } |
| |
| @staticmethod |
| def create_policy(policy_id, policy_version=1, policy_persistent=True, priority=None, **kwargs): |
| """returns the whole policy object for policy_id and policy_version""" |
| policy = MonkeyedPolicyBody.create_policy_bare( |
| policy_id, policy_version, priority, **kwargs) |
| policy[dcae_policy.POLICY_PERSISTENT] = bool(policy_persistent) |
| return policy |
| |
| @staticmethod |
| def is_the_same_dict(policy_body_1, policy_body_2): |
| """check whether both policy_body objects are the same""" |
| if not isinstance(policy_body_1, dict) or not isinstance(policy_body_2, dict): |
| return False |
| for key in policy_body_1: |
| if key not in policy_body_2: |
| return False |
| |
| val_1 = policy_body_1[key] |
| val_2 = policy_body_2[key] |
| if (val_1 is None) != (val_2 is None): |
| return False |
| if isinstance(val_1, dict) \ |
| and not MonkeyedPolicyBody.is_the_same_dict(val_1, val_2): |
| return False |
| if val_1 != val_2: |
| return False |
| return True |
| |
| class MonkeyedNode(object): |
| """node in cloudify""" |
| BLUEPRINT_ID = 'test_dcae_policy_bp_id' |
| DEPLOYMENT_ID = 'test_dcae_policy_dpl_id' |
| EXECUTION_ID = 'test_dcae_policy_exe_id' |
| |
| def __init__(self, node_id, node_name, node_type, properties=None, |
| relationships=None, runtime_properties=None): |
| self.node_id = node_id |
| self.node_name = node_name |
| self.ctx = MockCloudifyContextFull( |
| node_id=self.node_id, |
| node_name=self.node_name, |
| node_type=node_type, |
| blueprint_id=MonkeyedNode.BLUEPRINT_ID, |
| deployment_id=MonkeyedNode.DEPLOYMENT_ID, |
| execution_id=MonkeyedNode.EXECUTION_ID, |
| properties=properties, |
| relationships=relationships, |
| runtime_properties=runtime_properties |
| ) |
| |
| class MonkeyedResponse(object): |
| """Monkey response""" |
| def __init__(self, full_path, headers=None, resp_json=None): |
| self.full_path = full_path |
| self.status_code = 200 |
| self.headers = headers or {} |
| self.resp_json = resp_json |
| self.text = json.dumps(resp_json or {}) |
| |
| def json(self): |
| """returns json of response""" |
| return self.resp_json |
| |
| def raise_for_status(self): |
| """always happy""" |
| pass |
| |
| def get_app_config(): |
| """just get the config""" |
| config = copy.deepcopy(dict(ctx.instance.runtime_properties.get(APPLICATION_CONFIG, {}))) |
| if not config: |
| config = copy.deepcopy(dict(ctx.node.properties.get(APPLICATION_CONFIG, {}))) |
| return config |
| |
| def operation_node_configure(**kwargs): |
| """do the node-configure operation""" |
| ctx.logger.info("operation_node_configure kwargs: {0}".format(kwargs)) |
| |
| app_config = get_app_config() |
| ctx.instance.runtime_properties[APPLICATION_CONFIG] = app_config |
| ctx.instance.runtime_properties[PoliciesOutput.SERVICE_COMPONENT_NAME] = "unit_test_scn" |
| ctx.logger.info("property app_config: {0}".format(json.dumps(app_config))) |
| |
| @CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='exe_task') |
| @Policies.gather_policies_to_node() |
| def node_configure(**kwargs): |
| """decorate with @Policies.gather_policies_to_node on policy consumer node to |
| bring all policies to runtime_properties["policies"] |
| """ |
| operation_node_configure(**kwargs) |
| |
| @CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='execute_operation') |
| @Policies.update_policies_on_node() |
| def policy_update(updated_policies, removed_policies=None, **kwargs): |
| """decorate with @Policies.update_policies_on_node() to update runtime_properties["policies"] |
| |
| :updated_policies: contains the list of changed policy-configs when configs_only=True (default). |
| """ |
| # This is how to merge the policies into default app_config object |
| # (runtime_properties[APPLICATION_CONFIG] = application_config) |
| app_config = get_app_config() |
| ctx.logger.info("app_config {0}".format(json.dumps(app_config))) |
| |
| ctx.instance.runtime_properties[APPLICATION_CONFIG] = app_config |
| |
| @CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='execute_operation') |
| @Policies.update_policies_on_node() |
| def policy_update_not_only_config(updated_policies, removed_policies=None, **kwargs): |
| """decorate with @Policies.update_policies_on_node() to update runtime_properties["policies"] |
| |
| :updated_policies: contains the list of changed policy-configs when configs_only=True (default). |
| """ |
| # This is how to merge the policies into default app_config object |
| # (runtime_properties[APPLICATION_CONFIG] = application_config) |
| app_config = get_app_config() |
| ctx.logger.info("app_config {0}".format(json.dumps(app_config))) |
| |
| ctx.instance.runtime_properties[APPLICATION_CONFIG] = app_config |
| |
| @CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='execute_operation') |
| @Policies.update_policies_on_node() |
| def policy_update_many_calcs(updated_policies, removed_policies=None, policies=None, **kwargs): |
| """decorate with @Policies.update_policies_on_node() to update runtime_properties["policies"] |
| |
| :updated_policies: contains the list of changed policy-configs when configs_only=True (default). |
| """ |
| app_config = get_app_config() |
| ctx.logger.info("app_config {0}".format(json.dumps(app_config))) |
| |
| ctx.instance.runtime_properties[APPLICATION_CONFIG] = app_config |
| |
| |
| @CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='exe_task') |
| @Policies.cleanup_policies_on_node |
| def node_delete(**kwargs): |
| """delete <service_component_name> records in consul-kv""" |
| operation_node_configure(**kwargs) |
| |
| |
| class CurrentCtx(object): |
| """cloudify context""" |
| _node_ms = None |
| |
| @staticmethod |
| def set_current_ctx(include_bad=True, include_good=True): |
| """set up the nodes for cloudify""" |
| Policies._init() |
| |
| def add_target_to_relationships(relationships, node): |
| """adds the node as the target of relationships""" |
| relationships.append({TARGET_NODE_ID: node.node_id, TARGET_NODE_NAME: node.node_name}) |
| |
| relationships = [] |
| if include_good: |
| node_policy = MonkeyedNode( |
| 'dcae_policy_node_id', |
| 'dcae_policy_node_name', |
| dcae_policy.DCAE_POLICY_TYPE, |
| {POLICY_ID: MONKEYED_POLICY_ID}, |
| None, |
| {POLICY_BODY: MonkeyedPolicyBody.create_policy_body( |
| MONKEYED_POLICY_ID, priority="1")} |
| ) |
| add_target_to_relationships(relationships, node_policy) |
| |
| if include_bad: |
| bad_policy_2 = MonkeyedNode( |
| 'bad_policy_2_node_id', |
| 'bad_policy_2_node_name', |
| dcae_policy.DCAE_POLICY_TYPE, |
| {POLICY_ID: MONKEYED_POLICY_ID_2}, |
| None, |
| None |
| ) |
| add_target_to_relationships(relationships, bad_policy_2) |
| |
| if include_good: |
| node_policy_2 = MonkeyedNode( |
| 'dcae_policy_node_id_2', |
| 'dcae_policy_node_name_2', |
| dcae_policy.DCAE_POLICY_TYPE, |
| {POLICY_ID: MONKEYED_POLICY_ID_2}, |
| None, |
| {POLICY_BODY: MonkeyedPolicyBody.create_policy_body( |
| MONKEYED_POLICY_ID_2, 4, priority="2")} |
| ) |
| add_target_to_relationships(relationships, node_policy_2) |
| |
| if include_bad: |
| bad_policy_3 = MonkeyedNode( |
| 'bad_policy_3_node_id', |
| 'bad_policy_3_node_name', |
| dcae_policy.DCAE_POLICY_TYPE, |
| {POLICY_ID: MONKEYED_POLICY_ID_2}, |
| None, |
| None |
| ) |
| add_target_to_relationships(relationships, bad_policy_3) |
| |
| bad_policy_4 = MonkeyedNode( |
| 'bad_policy_4_node_id', |
| 'bad_policy_4_node_name', |
| dcae_policy.DCAE_POLICY_TYPE, |
| None, |
| None, |
| None |
| ) |
| add_target_to_relationships(relationships, bad_policy_4) |
| |
| weird_policy_5 = MonkeyedNode( |
| 'weird_policy_5_node_id', |
| 'weird_policy_5_node_name', |
| dcae_policy.DCAE_POLICY_TYPE, |
| None, |
| None, |
| {POLICY_BODY: MonkeyedPolicyBody.create_policy_body( |
| MONKEYED_POLICY_ID, 3, priority="2")} |
| ) |
| add_target_to_relationships(relationships, weird_policy_5) |
| |
| if include_good: |
| node_policies = MonkeyedNode( |
| 'dcae_policies_node_id', |
| 'dcae_policies_node_name', |
| dcae_policy.DCAE_POLICIES_TYPE, |
| {dcae_policy.POLICY_FILTER: { |
| POLICY_NAME: MONKEYED_POLICY_ID_M + ".*", |
| dcae_policy.CONFIG_ATTRIBUTES: json.dumps({ |
| CONFIG_NAME: "alex_config_name" |
| }) |
| }}, |
| None, |
| {dcae_policy.POLICIES_FILTERED: { |
| MONKEYED_POLICY_ID_M: |
| MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_M, 3)}} |
| ) |
| add_target_to_relationships(relationships, node_policies) |
| |
| node_policies_2 = MonkeyedNode( |
| 'dcae_policies_2_node_id', |
| 'dcae_policies_2_node_name', |
| dcae_policy.DCAE_POLICIES_TYPE, |
| {dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}}, |
| None, |
| {dcae_policy.POLICIES_FILTERED: { |
| MONKEYED_POLICY_ID_M: |
| MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_M, 2)}} |
| ) |
| add_target_to_relationships(relationships, node_policies_2) |
| |
| policies_empty = MonkeyedNode( |
| 'dcae_policies_empty_node_id', |
| 'dcae_policies_empty_node_name', |
| dcae_policy.DCAE_POLICIES_TYPE, |
| {dcae_policy.POLICY_FILTER: {"empty": None}}, |
| None, |
| None |
| ) |
| add_target_to_relationships(relationships, policies_empty) |
| |
| policies_empty_2 = MonkeyedNode( |
| 'dcae_policies_empty_2_node_id', |
| 'dcae_policies_empty_2_node_name', |
| dcae_policy.DCAE_POLICIES_TYPE, |
| None, |
| None, |
| None |
| ) |
| add_target_to_relationships(relationships, policies_empty_2) |
| |
| non_policies = MonkeyedNode( |
| 'non_policies_node_id', |
| 'non_policies_node_name', |
| "non.policy.type" |
| ) |
| add_target_to_relationships(relationships, non_policies) |
| |
| if include_good: |
| node_policies_b = MonkeyedNode( |
| 'dcae_policies_b_node_id', |
| 'dcae_policies_b_node_name', |
| dcae_policy.DCAE_POLICIES_TYPE, |
| {dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}}, |
| None, |
| {dcae_policy.POLICIES_FILTERED: { |
| MONKEYED_POLICY_ID_B: |
| MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_B, 1)}} |
| ) |
| add_target_to_relationships(relationships, node_policies_b) |
| |
| node_policies_b_2 = MonkeyedNode( |
| 'dcae_policies_b_2_node_id', |
| 'dcae_policies_b_2_node_name', |
| dcae_policy.DCAE_POLICIES_TYPE, |
| {dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}}, |
| None, |
| {dcae_policy.POLICIES_FILTERED: { |
| MONKEYED_POLICY_ID_B: |
| MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_B, 2)}} |
| ) |
| add_target_to_relationships(relationships, node_policies_b_2) |
| |
| if include_good: |
| node_policy_b = MonkeyedNode( |
| 'dcae_policy_b_node_id', |
| 'dcae_policy_b_node_name', |
| dcae_policy.DCAE_POLICY_TYPE, |
| {POLICY_ID: MONKEYED_POLICY_ID_B}, |
| None, |
| {POLICY_BODY: MonkeyedPolicyBody.create_policy_body( |
| MONKEYED_POLICY_ID_B, 4, priority="1.5")} |
| ) |
| add_target_to_relationships(relationships, node_policy_b) |
| |
| node_policies_b_5 = MonkeyedNode( |
| 'dcae_policies_b_5_node_id', |
| 'dcae_policies_b_5_node_name', |
| dcae_policy.DCAE_POLICIES_TYPE, |
| {dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}}, |
| None, |
| {dcae_policy.POLICIES_FILTERED: { |
| MONKEYED_POLICY_ID_B: |
| MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_B, 5)}} |
| ) |
| add_target_to_relationships(relationships, node_policies_b_5) |
| |
| |
| CurrentCtx._node_ms = MonkeyedNode( |
| 'test_ms_id', 'test_ms_name', "ms.nodes.type", |
| {APPLICATION_CONFIG: MonkeyedPolicyBody.create_policy_body( |
| "no_policy", db_port="123", weather="snow")[POLICY_CONFIG] |
| }, |
| relationships |
| ) |
| current_ctx.set(CurrentCtx._node_ms.ctx) |
| MonkeyedLogHandler.add_handler_to(CurrentCtx._node_ms.ctx.logger) |
| |
| @staticmethod |
| def reset(): |
| """reset context""" |
| current_ctx.set(CurrentCtx._node_ms.ctx) |
| |
| |
| def monkeyed_consul_boom(full_path, json): |
| """boom on monkeypatch for the put to consul""" |
| raise requests.ConnectionError("monkey-boom") |
| |
| |
| @pytest.fixture() |
| def fix_consul_boom(monkeypatch): |
| """monkeyed discovery request.put""" |
| PoliciesOutput._lazy_inited = False |
| monkeypatch.setattr('requests.put', monkeyed_consul_boom) |
| yield fix_consul_boom |
| PoliciesOutput._lazy_inited = False |
| |
| |
| def monkeyed_consul_put(full_path, json): |
| """monkeypatch for the put to consul""" |
| return MonkeyedResponse(full_path) |
| |
| |
| @pytest.fixture() |
| def fix_consul(monkeypatch): |
| """monkeyed discovery request.put""" |
| PoliciesOutput._lazy_inited = False |
| monkeypatch.setattr('requests.put', monkeyed_consul_put) |
| yield fix_consul |
| PoliciesOutput._lazy_inited = False |
| |
| |
| def cfy_ctx(include_bad=True, include_good=True): |
| """test and safely clean up""" |
| def cfy_ctx_decorator(func): |
| """test and safely clean up""" |
| if not func: |
| return |
| |
| @wraps(func) |
| def ctx_wrapper(*args, **kwargs): |
| """test""" |
| try: |
| CurrentCtx.set_current_ctx(include_bad, include_good) |
| |
| func(*args, **kwargs) |
| |
| finally: |
| ctx.logger.info("MockCloudifyContextFull.clear") |
| MockCloudifyContextFull.clear() |
| current_ctx.clear() |
| |
| return ctx_wrapper |
| return cfy_ctx_decorator |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_gather_policies_to_node(): |
| """test gather_policies_to_node""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_policies_to_node(): |
| """test gather_policies_to_node""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| assert MONKEYED_POLICY_ID in policies |
| expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1") |
| policy = policies[MONKEYED_POLICY_ID] |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy) |
| |
| assert MONKEYED_POLICY_ID_B in policies |
| expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5") |
| policy = policies[MONKEYED_POLICY_ID_B] |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_b))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy) |
| |
| assert MONKEYED_POLICY_ID_2 in policies |
| expected_2 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 4, priority="2") |
| policy = policies[MONKEYED_POLICY_ID_2] |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(expected_2))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_2) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_2, policy) |
| |
| assert MONKEYED_POLICY_ID_M in policies |
| expected_m = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M, 2, False) |
| policy = policies[MONKEYED_POLICY_ID_M] |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_M, json.dumps(expected_m))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_m) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_m, policy) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_update_policies(): |
| """test policy_update""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20") |
| added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2, |
| False, priority="1") |
| ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy))) |
| |
| ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2)) |
| assert MONKEYED_POLICY_ID_M_2 not in policies |
| |
| policy_filter_ids = list(runtime_properties.get(dcae_policy.POLICY_FILTERS) or ["--"]) |
| |
| policy_update(updated_policies=[updated_policy], |
| added_policies={ |
| policy_filter_ids[0]: { |
| dcae_policy.POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}} |
| }, |
| removed_policies=[MONKEYED_POLICY_ID_M]) |
| |
| ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M)) |
| assert MONKEYED_POLICY_ID_M not in policies |
| |
| assert MONKEYED_POLICY_ID_M_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_M_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy) |
| |
| assert MONKEYED_POLICY_ID_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy) |
| |
| assert MONKEYED_POLICY_ID in policies |
| policy = policies[MONKEYED_POLICY_ID] |
| expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1") |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy) |
| |
| assert MONKEYED_POLICY_ID_B in policies |
| policy = policies[MONKEYED_POLICY_ID_B] |
| expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5") |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_update_not_only_config(): |
| """test policy_update""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20") |
| added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2, |
| False, priority="1") |
| ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy))) |
| |
| ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2)) |
| assert MONKEYED_POLICY_ID_M_2 not in policies |
| |
| policy_filter_ids = list(runtime_properties.get(dcae_policy.POLICY_FILTERS) or ["--"]) |
| |
| policy_update_not_only_config(updated_policies=[updated_policy], |
| added_policies={ |
| policy_filter_ids[0]: { |
| dcae_policy.POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}} |
| }, |
| removed_policies=[MONKEYED_POLICY_ID_M]) |
| |
| ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M)) |
| assert MONKEYED_POLICY_ID_M not in policies |
| |
| assert MONKEYED_POLICY_ID_M_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_M_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy) |
| |
| assert MONKEYED_POLICY_ID_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy) |
| |
| assert MONKEYED_POLICY_ID in policies |
| policy = policies[MONKEYED_POLICY_ID] |
| expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1") |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy) |
| |
| assert MONKEYED_POLICY_ID_B in policies |
| policy = policies[MONKEYED_POLICY_ID_B] |
| expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5") |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_update_policies_not(): |
| """test policy_update - ignore all policies with junk params""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| assert APPLICATION_CONFIG in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| app_config = runtime_properties[APPLICATION_CONFIG] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| ctx.logger.info("app_config: {0}".format(json.dumps(app_config))) |
| |
| expected_policies = copy.deepcopy(policies) |
| expected_app_config = copy.deepcopy(app_config) |
| |
| existing_policy = MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID, priority="1") |
| damaged_policy = MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_2) |
| del damaged_policy[POLICY_BODY][POLICY_CONFIG] |
| updated_policy = MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_M_3, 3) |
| added_policy = MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_M_2, 4) |
| junk_policy_filter_id = "<<<junk_removed_policy_id>>>" |
| unexpected_removed_policy_id = "<<<junk_removed_policy_id>>>" |
| ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy))) |
| |
| assert MONKEYED_POLICY_ID in policies |
| assert MONKEYED_POLICY_ID_2 in policies |
| assert MONKEYED_POLICY_ID_M_2 not in policies |
| assert MONKEYED_POLICY_ID_M_3 not in policies |
| assert unexpected_removed_policy_id not in policies |
| assert junk_policy_filter_id not in runtime_properties.get(dcae_policy.POLICY_FILTERS, {}) |
| |
| policy_filter_ids = list(runtime_properties.get(dcae_policy.POLICY_FILTERS) or ["--"]) |
| |
| policy_update(updated_policies=[existing_policy, damaged_policy, updated_policy], |
| added_policies={ |
| junk_policy_filter_id: { |
| dcae_policy.POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}}, |
| policy_filter_ids[0]: { |
| dcae_policy.POLICIES: {MONKEYED_POLICY_ID_2: damaged_policy}} |
| }, |
| removed_policies=[unexpected_removed_policy_id]) |
| |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| ctx.logger.info("policies not changed: {0}".format(json.dumps(policies))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policies, expected_policies) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_policies, policies) |
| |
| ctx.logger.info("app_config not changed: {0}".format(json.dumps(app_config))) |
| assert MonkeyedPolicyBody.is_the_same_dict(app_config, expected_app_config) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_app_config, app_config) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_update_many_calcs(): |
| """test policy_update""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20") |
| added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2, |
| False, priority="1") |
| ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy))) |
| |
| ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2)) |
| assert MONKEYED_POLICY_ID_M_2 not in policies |
| |
| policy_filter_ids = list(runtime_properties.get(dcae_policy.POLICY_FILTERS) or ["--"]) |
| |
| policy_update_many_calcs(updated_policies=[updated_policy], |
| added_policies={ |
| policy_filter_ids[0]: { |
| dcae_policy.POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}} |
| }, |
| removed_policies=[MONKEYED_POLICY_ID_M]) |
| |
| ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M)) |
| assert MONKEYED_POLICY_ID_M not in policies |
| |
| assert MONKEYED_POLICY_ID_M_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_M_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy) |
| |
| assert MONKEYED_POLICY_ID_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy) |
| |
| assert MONKEYED_POLICY_ID in policies |
| policy = policies[MONKEYED_POLICY_ID] |
| expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1") |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy) |
| |
| assert MONKEYED_POLICY_ID_B in policies |
| policy = policies[MONKEYED_POLICY_ID_B] |
| expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5") |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_remove_all_policies(): |
| """test policy_update - remove all policies""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| remove_policy_ids = list(policies) |
| |
| policy_update(updated_policies=None, added_policies=None, removed_policies=remove_policy_ids) |
| |
| ctx.logger.info("removed: {0}".format(remove_policy_ids)) |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| assert dcae_policy.POLICIES in runtime_properties |
| assert Policies.get_policy_bodies() == [] |
| |
| assert APPLICATION_CONFIG in runtime_properties |
| assert APPLICATION_CONFIG in ctx.node.properties |
| app_config = runtime_properties[APPLICATION_CONFIG] |
| expected_config = dict(ctx.node.properties[APPLICATION_CONFIG]) |
| ctx.logger.info("expected = default application_config: {0}".format(json.dumps(app_config))) |
| assert MonkeyedPolicyBody.is_the_same_dict(app_config, expected_config) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_config, app_config) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_remove_all_policies_twice(): |
| """test policy_update - remove all policies twice""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| remove_policy_ids = list(policies) |
| |
| policy_update(updated_policies=None, added_policies=None, removed_policies=remove_policy_ids) |
| policy_update(updated_policies=None, added_policies=None, removed_policies=remove_policy_ids) |
| |
| ctx.logger.info("removed: {0}".format(remove_policy_ids)) |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| assert dcae_policy.POLICIES in runtime_properties |
| assert Policies.get_policy_bodies() == [] |
| |
| assert APPLICATION_CONFIG in runtime_properties |
| assert APPLICATION_CONFIG in ctx.node.properties |
| app_config = runtime_properties[APPLICATION_CONFIG] |
| expected_config = dict(ctx.node.properties[APPLICATION_CONFIG]) |
| ctx.logger.info("expected = default application_config: {0}".format(json.dumps(app_config))) |
| assert MonkeyedPolicyBody.is_the_same_dict(app_config, expected_config) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_config, app_config) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_remove_then_update(): |
| """test policy_update""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| remove_policy_ids = list(policies) |
| policy_update(updated_policies=None, added_policies=None, removed_policies=remove_policy_ids) |
| |
| updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20") |
| added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2, |
| False, priority="1") |
| ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy))) |
| |
| ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2)) |
| assert MONKEYED_POLICY_ID_M_2 not in policies |
| |
| policy_filter_ids = list(runtime_properties.get(dcae_policy.POLICY_FILTERS) or ["--"]) |
| |
| policy_update(updated_policies=[updated_policy], |
| added_policies={ |
| policy_filter_ids[0]: { |
| dcae_policy.POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}} |
| }, |
| removed_policies=[MONKEYED_POLICY_ID_M]) |
| |
| ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M)) |
| assert MONKEYED_POLICY_ID_M not in policies |
| |
| assert MONKEYED_POLICY_ID_M_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_M_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy) |
| |
| assert MONKEYED_POLICY_ID_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy) |
| |
| assert MONKEYED_POLICY_ID in policies |
| assert MONKEYED_POLICY_ID_B in policies |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_remove_update_many_calcs(): |
| """test policy_update""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| remove_policy_ids = list(policies) |
| policy_update_many_calcs(updated_policies=None, |
| added_policies=None, |
| removed_policies=remove_policy_ids) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20") |
| added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2, |
| False, priority="1") |
| ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy))) |
| |
| ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2)) |
| assert MONKEYED_POLICY_ID_M_2 not in policies |
| |
| policy_filter_ids = list(runtime_properties.get(dcae_policy.POLICY_FILTERS) or ["--"]) |
| |
| policy_update_many_calcs(updated_policies=[updated_policy], |
| added_policies={ |
| policy_filter_ids[0]: { |
| dcae_policy.POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}} |
| }, |
| removed_policies=[MONKEYED_POLICY_ID_M]) |
| |
| ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M)) |
| assert MONKEYED_POLICY_ID_M not in policies |
| |
| assert MONKEYED_POLICY_ID_M_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_M_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy) |
| |
| assert MONKEYED_POLICY_ID_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy) |
| |
| assert MONKEYED_POLICY_ID in policies |
| assert MONKEYED_POLICY_ID_B in policies |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_bad_update_many_calcs(): |
| """test policy_update""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| damaged_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20") |
| damaged_policy[POLICY_BODY][POLICY_CONFIG] = ["damaged config"] |
| |
| added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2, |
| False, priority="1") |
| added_policy[POLICY_BODY][POLICY_CONFIG] = {"unexpected":"foo", "none": None} |
| |
| ctx.logger.info("policy_update: [{0}]".format(json.dumps(damaged_policy))) |
| |
| ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2)) |
| assert MONKEYED_POLICY_ID_M_2 not in policies |
| |
| policy_filter_ids = list(runtime_properties.get(dcae_policy.POLICY_FILTERS) or ["--"]) |
| |
| policy_update_many_calcs(updated_policies=[damaged_policy], |
| added_policies={ |
| policy_filter_ids[0]: { |
| dcae_policy.POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}} |
| }, |
| removed_policies=[MONKEYED_POLICY_ID_M]) |
| |
| ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M)) |
| assert MONKEYED_POLICY_ID_M not in policies |
| |
| assert MONKEYED_POLICY_ID_M_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_M_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy) |
| |
| assert MONKEYED_POLICY_ID_2 in policies |
| policy = policies[MONKEYED_POLICY_ID_2] |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, damaged_policy) |
| assert MonkeyedPolicyBody.is_the_same_dict(damaged_policy, policy) |
| |
| assert MONKEYED_POLICY_ID in policies |
| policy = policies[MONKEYED_POLICY_ID] |
| expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1") |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy) |
| |
| assert MONKEYED_POLICY_ID_B in policies |
| policy = policies[MONKEYED_POLICY_ID_B] |
| expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5") |
| ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_1))) |
| ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy))) |
| assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b) |
| assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True, include_good=False) |
| def test_bad_policies(): |
| """test bad policy nodes""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True, include_good=False) |
| def test_wrong_ctx_node_configure(): |
| """test wrong ctx""" |
| current_ctx.set(ctx.instance.relationships[0]) |
| ctx_type = ctx.type |
| |
| with pytest.raises(NonRecoverableError) as excinfo: |
| node_configure() |
| |
| CurrentCtx.reset() |
| ctx.logger.info("{0} not a node boom: {1}".format(ctx_type, str(excinfo.value))) |
| assert ctx_type == 'cloudify.relationships.depends_on' |
| assert str(excinfo.value) == "can only invoke gather_policies_to_node on node" |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True, include_good=False) |
| def test_wrong_ctx_policy_update(): |
| """test wrong ctx""" |
| no_policies = Policies.get_policy_bodies() |
| current_ctx.set(ctx.instance.relationships[0]) |
| ctx_type = ctx.type |
| |
| with pytest.raises(NonRecoverableError) as excinfo: |
| policy_update(updated_policies=None, added_policies=None, removed_policies=None) |
| |
| CurrentCtx.reset() |
| ctx.logger.info("{0} not a node boom: {1}".format(ctx_type, str(excinfo.value))) |
| assert ctx_type == 'cloudify.relationships.depends_on' |
| assert no_policies == [] |
| assert str(excinfo.value) == "can only invoke update_policies_on_node on node" |
| |
| def test_defenses_on_decorators(): |
| """test defenses of code mainly for code coverage""" |
| should_be_none = Policies.gather_policies_to_node()(None) |
| assert should_be_none is None |
| |
| should_be_none = Policies.update_policies_on_node()(None) |
| assert should_be_none is None |
| |
| def monkeyed_set_policies_boom(policies): |
| """monkeypatch for exception""" |
| raise Exception("Policies._set_policies boom: {0}".format(json.dumps(policies or {}))) |
| |
| @pytest.fixture() |
| def fix_boom_gather(monkeypatch): |
| """monkeyed boom""" |
| monkeypatch.setattr('onap_dcae_dcaepolicy_lib.dcae_policy.Policies._set_policies', |
| monkeyed_set_policies_boom) |
| return fix_boom_gather # provide the fixture value |
| |
| @cfy_ctx(include_bad=True, include_good=False) |
| @pytest.mark.usefixtures("fix_boom_gather") |
| def test_exception_on_gather(): |
| """test exception on gather""" |
| with pytest.raises(NonRecoverableError) as excinfo: |
| node_configure() |
| |
| ctx.logger.info("monkeyed_set_policies_boom: {0}".format(str(excinfo.value))) |
| assert str(excinfo.value).startswith("Failed to set the policies ") |
| |
| def monkeyed_update_policies_boom(policies): |
| """monkeypatch for exception""" |
| raise Exception("Policies._update_policies boom") |
| |
| @pytest.fixture() |
| def fix_boom_update_policies(monkeypatch): |
| """monkeyed boom""" |
| monkeypatch.setattr('onap_dcae_dcaepolicy_lib.dcae_policy.Policies._update_policies', |
| monkeyed_update_policies_boom) |
| return fix_boom_update_policies # provide the fixture value |
| |
| @cfy_ctx(include_bad=True, include_good=False) |
| @pytest.mark.usefixtures("fix_boom_update_policies") |
| def test_exception_on_update(): |
| """test exception on update_policies""" |
| with pytest.raises(NonRecoverableError) as excinfo: |
| policy_update(updated_policies=None, added_policies=None, removed_policies=None) |
| |
| ctx.logger.info("monkeyed_update_policies_boom: {0}".format(str(excinfo.value))) |
| assert str(excinfo.value).startswith("Failed to update the policies ") |
| |
| @cfy_ctx(include_bad=True, include_good=False) |
| def test_defenses_on_policy_update(): |
| """test defenses on policy_update""" |
| policy_update(updated_policies=None, added_policies=None, removed_policies=None) |
| ctx.logger.info("policy_update() ok") |
| |
| @cfy_ctx(include_bad=True, include_good=False) |
| def test_defenses_on_set_policies(): |
| """test defenses of code mainly for code coverage""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| Policies._set_policies({}) |
| |
| assert dcae_policy.POLICIES not in runtime_properties |
| |
| Policies._set_policies({}) |
| |
| assert dcae_policy.POLICIES not in runtime_properties |
| |
| @pytest.mark.usefixtures("fix_consul") |
| @cfy_ctx(include_bad=True) |
| def test_delete_node(): |
| """test delete""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| node_delete() |
| |
| |
| @pytest.mark.usefixtures("fix_consul_boom") |
| @cfy_ctx(include_bad=True) |
| def test_delete_node_no_consul(): |
| """test delete without consul""" |
| node_configure() |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES in runtime_properties |
| policies = runtime_properties[dcae_policy.POLICIES] |
| ctx.logger.info("policies: {0}".format(json.dumps(policies))) |
| |
| node_delete() |
| |
| |
| @pytest.mark.usefixtures("fix_consul_boom") |
| @cfy_ctx(include_bad=True) |
| def test_delete_node_no_policies(): |
| """test delete without consul and setup""" |
| |
| ctx.instance.runtime_properties[PoliciesOutput.POLICIES_EVENT] = {} |
| ctx.instance.runtime_properties[PoliciesOutput.SERVICE_COMPONENT_NAME] = "delete_node_empty" |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES not in runtime_properties |
| |
| node_delete() |
| |
| |
| @pytest.mark.usefixtures("fix_consul_boom") |
| @cfy_ctx(include_bad=True) |
| def test_delete_node_empty(): |
| """test delete without consul and setup""" |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES not in runtime_properties |
| |
| node_delete() |
| |
| @pytest.mark.usefixtures("fix_consul_boom") |
| @cfy_ctx(include_bad=True) |
| def test_delete_node_lost_scn(): |
| """test delete without consul and setup""" |
| ctx.instance.runtime_properties[PoliciesOutput.POLICIES_EVENT] = {} |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES not in runtime_properties |
| |
| node_delete() |
| |
| |
| @pytest.mark.usefixtures("fix_consul_boom") |
| @cfy_ctx(include_bad=True) |
| def test_delete_node_empty_config(): |
| """test delete without consul and setup""" |
| |
| ctx.instance.runtime_properties[PoliciesOutput.POLICIES_EVENT] = {} |
| ctx.instance.runtime_properties[PoliciesOutput.SERVICE_COMPONENT_NAME] = "delete_node_empty" |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES not in runtime_properties |
| |
| node_delete() |
| |
| @pytest.mark.usefixtures("fix_consul_boom") |
| @cfy_ctx(include_bad=True) |
| def test_delete_ms_no_consul_addr(): |
| """test delete without consul and setup""" |
| |
| ctx.instance.runtime_properties[PoliciesOutput.POLICIES_EVENT] = {} |
| ctx.instance.runtime_properties[PoliciesOutput.SERVICE_COMPONENT_NAME] = "delete_node_empty" |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES not in runtime_properties |
| |
| node_delete() |
| |
| |
| @pytest.mark.usefixtures("fix_consul_boom") |
| @cfy_ctx(include_bad=True) |
| def test_delete_bad_config(): |
| """test delete without consul and setup""" |
| |
| ctx.instance.runtime_properties[PoliciesOutput.POLICIES_EVENT] = {} |
| ctx.instance.runtime_properties[PoliciesOutput.SERVICE_COMPONENT_NAME] = "delete_node_empty" |
| |
| runtime_properties = ctx.instance.runtime_properties |
| ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties))) |
| |
| assert dcae_policy.POLICIES not in runtime_properties |
| |
| node_delete() |
| |