blob: 7d4f5a8974122aadf18fd0b0affbda4fa7a7a082 [file] [log] [blame]
# ============LICENSE_START=======================================================
# org.onap.dcae
# ================================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""tests of decorators around the cloudify operations to handle policy actions"""
import copy
import json
import logging
from datetime import datetime, timedelta
from functools import wraps
import pytest
from cloudify import ctx
from cloudify.exceptions import NonRecoverableError
from cloudify.state import current_ctx
from onap_dcae_dcaepolicy_lib import POLICIES, dcae_policy
from onap_dcae_dcaepolicy_lib.dcae_policy import Policies
from onap_dcae_dcaepolicy_lib.utils import Utils
from tests.log_ctx import CtxLogger
from tests.mock_cloudify_ctx import (TARGET_NODE_ID, TARGET_NODE_NAME,
MockCloudifyContextFull)
POLICY_ID = 'policy_id'
POLICY_VERSION = "policyVersion"
POLICY_NAME = "policyName"
POLICY_BODY = 'policy_body'
POLICY_CONFIG = 'config'
DOCKER_CONFIG = "docker_config"
POLICY = "policy"
APPLY_ORDER = "apply_order"
MONKEYED_POLICY_ID = 'monkeyed.Config_peach'
MONKEYED_POLICY_ID_2 = 'monkeyed.Config_peach_2'
MONKEYED_POLICY_ID_M = 'monkeyed.Config_multi'
MONKEYED_POLICY_ID_M_2 = 'monkeyed.Config_multi_2'
MONKEYED_POLICY_ID_M_3 = 'monkeyed.Config_multi_3'
MONKEYED_POLICY_ID_B = 'monkeyed.Config_both'
APPLICATION_CONFIG = "application_config"
LOG_FILE = 'logs/test_onap_dcae_dcaepolicy_lib.log'
LOREM_IPSUM = """Lorem ipsum dolor sit amet consectetur ametist""".split()
RUN_TS = datetime.utcnow()
class MonkeyedLogHandler(object):
"""keep the shared logger handler here"""
_log_handler = None
@staticmethod
def add_handler_to(logger):
"""adds the local handler to the logger"""
if not MonkeyedLogHandler._log_handler:
MonkeyedLogHandler._log_handler = logging.FileHandler(LOG_FILE)
MonkeyedLogHandler._log_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
fmt='%(asctime)s.%(msecs)03d %(levelname)+8s ' + \
'%(threadName)s %(name)s.%(funcName)s: %(message)s', \
datefmt='%Y%m%d_%H%M%S')
MonkeyedLogHandler._log_handler.setFormatter(formatter)
logger.addHandler(MonkeyedLogHandler._log_handler)
class MonkeyedPolicyBody(object):
"""policy body that policy-engine returns"""
@staticmethod
def create_policy_body(policy_id, policy_version=1, priority=None, **kwargs):
"""returns a fake policy-body"""
prev_ver = policy_version - 1
timestamp = RUN_TS + timedelta(hours=prev_ver)
this_ver = str(policy_version)
config = {
"policy_updated_from_ver": str(prev_ver),
"policy_updated_to_ver": this_ver,
"policy_hello": LOREM_IPSUM[prev_ver % len(LOREM_IPSUM)],
"policy_updated_ts": timestamp.isoformat()[:-3] + 'Z',
"updated_policy_id": policy_id
}
config.update(copy.deepcopy(kwargs) or {})
matching_conditions = {
"ONAPName": "DCAE",
"ConfigName": "alex_config_name"
}
if priority is not None:
matching_conditions["priority"] = priority
return {
"policyConfigMessage": "Config Retrieved! ",
"policyConfigStatus": "CONFIG_RETRIEVED",
"type": "JSON",
POLICY_NAME: "{0}.{1}.xml".format(policy_id, this_ver),
POLICY_VERSION: this_ver,
POLICY_CONFIG: config,
"matchingConditions": matching_conditions,
"responseAttributes": {},
"property": None
}
@staticmethod
def create_policy_bare(policy_id, policy_version=1, priority=None, **kwargs):
"""returns the whole policy object for policy_id and policy_version"""
return {
POLICY_ID: policy_id,
POLICY_BODY: MonkeyedPolicyBody.create_policy_body(
policy_id, policy_version, priority, **kwargs)
}
@staticmethod
def create_policy(policy_id, policy_version=1, policy_persistent=True, priority=None, **kwargs):
"""returns the whole policy object for policy_id and policy_version"""
policy = MonkeyedPolicyBody.create_policy_bare(
policy_id, policy_version, priority, **kwargs)
policy[dcae_policy.POLICY_PERSISTENT] = bool(policy_persistent)
return policy
@staticmethod
def is_the_same_dict(policy_body_1, policy_body_2):
"""check whether both policy_body objects are the same"""
if not isinstance(policy_body_1, dict) or not isinstance(policy_body_2, dict):
return False
for key in policy_body_1.keys():
if key not in policy_body_2:
return False
val_1 = policy_body_1[key]
val_2 = policy_body_2[key]
if isinstance(val_1, dict) \
and not MonkeyedPolicyBody.is_the_same_dict(val_1, val_2):
return False
if (val_1 is None and val_2 is not None) \
or (val_1 is not None and val_2 is None) \
or (val_1 != val_2):
return False
return True
class MonkeyedNode(object):
"""node in cloudify"""
BLUEPRINT_ID = 'test_dcae_policy_bp_id'
DEPLOYMENT_ID = 'test_dcae_policy_dpl_id'
EXECUTION_ID = 'test_dcae_policy_exe_id'
def __init__(self, node_id, node_name, node_type, properties=None,
relationships=None, runtime_properties=None):
self.node_id = node_id
self.node_name = node_name
self.ctx = MockCloudifyContextFull(
node_id=self.node_id,
node_name=self.node_name,
node_type=node_type,
blueprint_id=MonkeyedNode.BLUEPRINT_ID,
deployment_id=MonkeyedNode.DEPLOYMENT_ID,
execution_id=MonkeyedNode.EXECUTION_ID,
properties=properties,
relationships=relationships,
runtime_properties=runtime_properties
)
def operation_node_configure(**kwargs):
"""do the node-configure operation"""
ctx.logger.info("operation_node_configure kwargs: {0}".format(kwargs))
app_config = Policies.calc_latest_application_config()
ctx.instance.runtime_properties[APPLICATION_CONFIG] = app_config
ctx.logger.info("applied policy_configs to property app_config: {0}" \
.format(json.dumps(app_config)))
policy_configs = Policies.get_policy_configs()
if policy_configs:
ctx.logger.warn("TBD: apply policy_configs: {0}".format(json.dumps(policy_configs)))
@CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='exe_task')
@Policies.gather_policies_to_node()
def node_configure_default_order(**kwargs):
"""default policy sorting because no param of policy_apply_order_path"""
operation_node_configure(**kwargs)
@CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='exe_task')
@Policies.gather_policies_to_node(policy_apply_order_path="docker_config:policy:apply_order")
def node_configure(**kwargs):
"""decorate with @Policies.gather_policies_to_node on policy consumer node to
bring all policies to runtime_properties["policies"]
"""
operation_node_configure(**kwargs)
@CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='exe_task')
@Policies.gather_policies_to_node(policy_apply_order_path="docker_config:junk:junk")
def node_configure_wrong_order_path(**kwargs):
"""wrong data in param policy_apply_order_path"""
operation_node_configure(**kwargs)
@CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='exe_task')
@Policies.gather_policies_to_node(policy_apply_order_path=" ")
def node_configure_empty_order_path(**kwargs):
"""wrong data in param policy_apply_order_path"""
operation_node_configure(**kwargs)
@CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='execute_operation')
@Policies.update_policies_on_node(configs_only=True)
def policy_update(updated_policies, removed_policies=None, **kwargs):
"""decorate with @Policies.update_policies_on_node() to update runtime_properties["policies"]
:updated_policies: contains the list of changed policy-configs when configs_only=True (default).
Use configs_only=False to bring the full policy objects in :updated_policies:.
Use :Policies.shallow_merge_policies_into(): to merge the updated_policies into app_config
"""
# This is how to merge the policies into default app_config object
# (runtime_properties[APPLICATION_CONFIG] = application_config)
app_config = Policies.calc_latest_application_config()
ctx.logger.info("merged updated_policies {0}, removed_policies {1} into app_config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
ctx.instance.runtime_properties[APPLICATION_CONFIG] = app_config
@CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='execute_operation')
@Policies.update_policies_on_node(configs_only=False)
def policy_update_not_only_config(updated_policies, removed_policies=None, **kwargs):
"""decorate with @Policies.update_policies_on_node() to update runtime_properties["policies"]
:updated_policies: contains the list of changed policy-configs when configs_only=True (default).
Use configs_only=False to bring the full policy objects in :updated_policies:.
Use :Policies.shallow_merge_policies_into(): to merge the updated_policies into app_config
"""
# This is how to merge the policies into default app_config object
# (runtime_properties[APPLICATION_CONFIG] = application_config)
app_config = Policies.calc_latest_application_config()
ctx.logger.info("merged updated_policies {0}, removed_policies {1} into app_config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
ctx.instance.runtime_properties[APPLICATION_CONFIG] = app_config
@CtxLogger.log_ctx(pre_log=True, after_log=True, exe_task='execute_operation')
@Policies.update_policies_on_node(configs_only=False)
def policy_update_many_calcs(updated_policies, removed_policies=None, **kwargs):
"""decorate with @Policies.update_policies_on_node() to update runtime_properties["policies"]
:updated_policies: contains the list of changed policy-configs when configs_only=True (default).
Use configs_only=False to bring the full policy objects in :updated_policies:.
Use :Policies.shallow_merge_policies_into(): to merge the updated_policies into app_config
"""
# This is how to merge the policies into default app_config object
# (runtime_properties[APPLICATION_CONFIG] = application_config)
app_config = Policies.calc_latest_application_config()
ctx.logger.info("merged updated_policies {0}, removed_policies {1} into app_config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
ctx.instance.runtime_properties[APPLICATION_CONFIG] = app_config
app_config = Policies.calc_latest_application_config(APPLICATION_CONFIG)
ctx.logger.info("merged again updated_policies {0}, removed_policies {1} into app_config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
app_config = Policies.calc_latest_application_config(APPLICATION_CONFIG)
ctx.logger.info("merged again updated_policies {0}, removed_policies {1} into app_config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
app_config = Policies.shallow_merge_policies_into(None)
ctx.logger.info("merged to empty updated_policies {0}, removed_policies {1} into app_config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
app_config = ctx.instance.runtime_properties[APPLICATION_CONFIG]
app_config = Policies.shallow_merge_policies_into(app_config, default_config={})
ctx.logger.info(
"merged with empty defaults updated_policies {0}, removed_policies {1} into app_config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
app_config = ctx.instance.runtime_properties[APPLICATION_CONFIG]
app_config = Policies.shallow_merge_policies_into(app_config,
default_config={"unexpected":"foo"})
ctx.logger.info(
"merged with unexpected defaults updated_policies {0}, removed_policies {1} into config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
app_config = ctx.instance.runtime_properties[APPLICATION_CONFIG]
app_config = Policies.shallow_merge_policies_into(app_config)
ctx.logger.info("merged 3rd time updated_policies {0}, removed_policies {1} into app_config {2}"
.format(json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(app_config)))
class CurrentCtx(object):
"""cloudify context"""
_node_ms = None
@staticmethod
def set_current_ctx(include_bad=True, include_good=True):
"""set up the nodes for cloudify"""
Policies._init()
def add_target_to_relationships(relationships, node):
"""adds the node as the target of relationships"""
relationships.append({TARGET_NODE_ID: node.node_id, TARGET_NODE_NAME: node.node_name})
relationships = []
if include_good:
node_policy = MonkeyedNode(
'dcae_policy_node_id',
'dcae_policy_node_name',
dcae_policy.DCAE_POLICY_TYPE,
{POLICY_ID: MONKEYED_POLICY_ID},
None,
{POLICY_BODY: MonkeyedPolicyBody.create_policy_body(
MONKEYED_POLICY_ID, priority="1")}
)
add_target_to_relationships(relationships, node_policy)
if include_bad:
bad_policy_2 = MonkeyedNode(
'bad_policy_2_node_id',
'bad_policy_2_node_name',
dcae_policy.DCAE_POLICY_TYPE,
{POLICY_ID: MONKEYED_POLICY_ID_2},
None,
None
)
add_target_to_relationships(relationships, bad_policy_2)
if include_good:
node_policy_2 = MonkeyedNode(
'dcae_policy_node_id_2',
'dcae_policy_node_name_2',
dcae_policy.DCAE_POLICY_TYPE,
{POLICY_ID: MONKEYED_POLICY_ID_2},
None,
{POLICY_BODY: MonkeyedPolicyBody.create_policy_body(
MONKEYED_POLICY_ID_2, 4, priority="2")}
)
add_target_to_relationships(relationships, node_policy_2)
if include_bad:
bad_policy_3 = MonkeyedNode(
'bad_policy_3_node_id',
'bad_policy_3_node_name',
dcae_policy.DCAE_POLICY_TYPE,
{POLICY_ID: MONKEYED_POLICY_ID_2},
None,
None
)
add_target_to_relationships(relationships, bad_policy_3)
bad_policy_4 = MonkeyedNode(
'bad_policy_4_node_id',
'bad_policy_4_node_name',
dcae_policy.DCAE_POLICY_TYPE,
None,
None,
None
)
add_target_to_relationships(relationships, bad_policy_4)
weird_policy_5 = MonkeyedNode(
'weird_policy_5_node_id',
'weird_policy_5_node_name',
dcae_policy.DCAE_POLICY_TYPE,
None,
None,
{POLICY_BODY: MonkeyedPolicyBody.create_policy_body(
MONKEYED_POLICY_ID, 3, priority="2")}
)
add_target_to_relationships(relationships, weird_policy_5)
if include_good:
node_policies = MonkeyedNode(
'dcae_policies_node_id',
'dcae_policies_node_name',
dcae_policy.DCAE_POLICIES_TYPE,
{dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}},
None,
{dcae_policy.POLICIES_FILTERED: {
MONKEYED_POLICY_ID_M:
MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_M, 3)}}
)
add_target_to_relationships(relationships, node_policies)
node_policies_2 = MonkeyedNode(
'dcae_policies_2_node_id',
'dcae_policies_2_node_name',
dcae_policy.DCAE_POLICIES_TYPE,
{dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}},
None,
{dcae_policy.POLICIES_FILTERED: {
MONKEYED_POLICY_ID_M:
MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_M, 2)}}
)
add_target_to_relationships(relationships, node_policies_2)
policies_empty = MonkeyedNode(
'dcae_policies_empty_node_id',
'dcae_policies_empty_node_name',
dcae_policy.DCAE_POLICIES_TYPE,
{dcae_policy.POLICY_FILTER: {"empty": None}},
None,
None
)
add_target_to_relationships(relationships, policies_empty)
policies_empty_2 = MonkeyedNode(
'dcae_policies_empty_2_node_id',
'dcae_policies_empty_2_node_name',
dcae_policy.DCAE_POLICIES_TYPE,
None,
None,
None
)
add_target_to_relationships(relationships, policies_empty_2)
non_policies = MonkeyedNode(
'non_policies_node_id',
'non_policies_node_name',
"non.policy.type"
)
add_target_to_relationships(relationships, non_policies)
if include_good:
node_policies_b = MonkeyedNode(
'dcae_policies_b_node_id',
'dcae_policies_b_node_name',
dcae_policy.DCAE_POLICIES_TYPE,
{dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}},
None,
{dcae_policy.POLICIES_FILTERED: {
MONKEYED_POLICY_ID_B:
MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_B, 1)}}
)
add_target_to_relationships(relationships, node_policies_b)
node_policies_b_2 = MonkeyedNode(
'dcae_policies_b_2_node_id',
'dcae_policies_b_2_node_name',
dcae_policy.DCAE_POLICIES_TYPE,
{dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}},
None,
{dcae_policy.POLICIES_FILTERED: {
MONKEYED_POLICY_ID_B:
MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_B, 2)}}
)
add_target_to_relationships(relationships, node_policies_b_2)
if include_good:
node_policy_b = MonkeyedNode(
'dcae_policy_b_node_id',
'dcae_policy_b_node_name',
dcae_policy.DCAE_POLICY_TYPE,
{POLICY_ID: MONKEYED_POLICY_ID_B},
None,
{POLICY_BODY: MonkeyedPolicyBody.create_policy_body(
MONKEYED_POLICY_ID_B, 4, priority="1.5")}
)
add_target_to_relationships(relationships, node_policy_b)
node_policies_b_5 = MonkeyedNode(
'dcae_policies_b_5_node_id',
'dcae_policies_b_5_node_name',
dcae_policy.DCAE_POLICIES_TYPE,
{dcae_policy.POLICY_FILTER: {POLICY_NAME: MONKEYED_POLICY_ID_M + ".*"}},
None,
{dcae_policy.POLICIES_FILTERED: {
MONKEYED_POLICY_ID_B:
MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_B, 5)}}
)
add_target_to_relationships(relationships, node_policies_b_5)
CurrentCtx._node_ms = MonkeyedNode(
'test_ms_id', 'test_ms_name', "ms.nodes.type",
{DOCKER_CONFIG: {POLICY: {
APPLY_ORDER: [
"matchingConditions:priority::number desc",
"config:db_client nulls-last",
"config:policy_updated_to_ver"]}},
APPLICATION_CONFIG: MonkeyedPolicyBody.create_policy_body(
"no_policy", db_port="123", weather="snow")[POLICY_CONFIG]
},
relationships
)
current_ctx.set(CurrentCtx._node_ms.ctx)
MonkeyedLogHandler.add_handler_to(CurrentCtx._node_ms.ctx.logger)
@staticmethod
def reset():
"""reset context"""
current_ctx.set(CurrentCtx._node_ms.ctx)
def test_utils():
"""test utils"""
field_value = Utils.get_field_value(
{
"before":"bubu",
"pre":{"field":"hehe"},
"hello":{"field":"haha"},
"post":{"field":"hmmm"},
"after":"bebe"
},
"hello:field")
assert field_value == "haha"
field_value = Utils.get_field_value(None, None)
assert field_value is None
field_value = Utils.get_field_value({"hello":{"field":"haha"}}, "wrong_root:field")
assert field_value is None
field_value = Utils.get_field_value({"hello":{"field":None}}, "hello:field")
assert field_value is None
field_value = Utils.get_field_value({"hello":{"field":None}}, "hello:field::number")
assert field_value is None
def cfy_ctx(include_bad=True, include_good=True):
"""test and safely clean up"""
def cfy_ctx_decorator(func):
"""test and safely clean up"""
if not func:
return
@wraps(func)
def ctx_wrapper(*args, **kwargs):
"""test"""
try:
CurrentCtx.set_current_ctx(include_bad, include_good)
func(*args, **kwargs)
finally:
ctx.logger.info("MockCloudifyContextFull.clear")
MockCloudifyContextFull.clear()
current_ctx.clear()
return ctx_wrapper
return cfy_ctx_decorator
@cfy_ctx(include_bad=True)
def test_gather_policies_to_node():
"""test gather_policies_to_node"""
node_configure_default_order()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID, MONKEYED_POLICY_ID_2]
@cfy_ctx(include_bad=True)
def test_policies_wrong_order():
"""test gather_policies_to_node"""
node_configure_wrong_order_path()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID, MONKEYED_POLICY_ID_2]
@cfy_ctx(include_bad=True)
def test_policies_empty_order():
"""test gather_policies_to_node"""
node_configure_empty_order_path()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID, MONKEYED_POLICY_ID_2]
@cfy_ctx(include_bad=True)
def test_policies_damaged_order():
"""test gather_policies_to_node"""
ctx.node.properties[DOCKER_CONFIG][POLICY][APPLY_ORDER] = " "
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
node_configure()
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID, MONKEYED_POLICY_ID_2]
@cfy_ctx(include_bad=True)
def test_policies_bad_order():
"""test gather_policies_to_node"""
ctx.node.properties[DOCKER_CONFIG][POLICY][APPLY_ORDER] = [" ", "", "ha he", "hu::mu dah"]
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
node_configure()
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID, MONKEYED_POLICY_ID_2]
@cfy_ctx(include_bad=True)
def test_policies_to_node():
"""test gather_policies_to_node"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID]
assert MONKEYED_POLICY_ID in policies
expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1")
policy = policies[MONKEYED_POLICY_ID]
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1)
assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy)
assert MONKEYED_POLICY_ID_B in policies
expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5")
policy = policies[MONKEYED_POLICY_ID_B]
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_b)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b)
assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy)
assert MONKEYED_POLICY_ID_2 in policies
expected_2 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 4, priority="2")
policy = policies[MONKEYED_POLICY_ID_2]
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(expected_2)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_2)
assert MonkeyedPolicyBody.is_the_same_dict(expected_2, policy)
assert MONKEYED_POLICY_ID_M in policies
expected_m = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M, 2, False)
policy = policies[MONKEYED_POLICY_ID_M]
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_M, json.dumps(expected_m)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_m)
assert MonkeyedPolicyBody.is_the_same_dict(expected_m, policy)
@cfy_ctx(include_bad=True)
def test_update_policies():
"""test policy_update"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID]
updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20")
added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2,
False, priority="1")
ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy)))
ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2))
assert MONKEYED_POLICY_ID_M_2 not in policies
policy_filter_ids = runtime_properties.get(dcae_policy.POLICY_FILTERS, {}).keys() or ["--"]
policy_update(updated_policies=[updated_policy],
added_policies={
policy_filter_ids[0]: {
POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}}
},
removed_policies=[MONKEYED_POLICY_ID_M])
ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M))
assert MONKEYED_POLICY_ID_M not in policies
# assert Policies._get_config_from_policy(policies[MONKEYED_POLICY_ID_M]) is None
assert MONKEYED_POLICY_ID_M_2 in policies
policy = policies[MONKEYED_POLICY_ID_M_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy)
assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy)
assert MONKEYED_POLICY_ID_2 in policies
policy = policies[MONKEYED_POLICY_ID_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy)
assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy)
assert MONKEYED_POLICY_ID in policies
policy = policies[MONKEYED_POLICY_ID]
expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1")
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1)
assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy)
assert MONKEYED_POLICY_ID_B in policies
policy = policies[MONKEYED_POLICY_ID_B]
expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5")
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b)
assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy)
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID, MONKEYED_POLICY_ID_M_2]
@cfy_ctx(include_bad=True)
def test_update_not_only_config():
"""test policy_update"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID]
updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20")
added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2,
False, priority="1")
ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy)))
ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2))
assert MONKEYED_POLICY_ID_M_2 not in policies
policy_filter_ids = runtime_properties.get(dcae_policy.POLICY_FILTERS, {}).keys() or ["--"]
policy_update_not_only_config(updated_policies=[updated_policy],
added_policies={
policy_filter_ids[0]: {
POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}}
},
removed_policies=[MONKEYED_POLICY_ID_M])
ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M))
assert MONKEYED_POLICY_ID_M not in policies
# assert Policies._get_config_from_policy(policies[MONKEYED_POLICY_ID_M]) is None
assert MONKEYED_POLICY_ID_M_2 in policies
policy = policies[MONKEYED_POLICY_ID_M_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy)
assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy)
assert MONKEYED_POLICY_ID_2 in policies
policy = policies[MONKEYED_POLICY_ID_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy)
assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy)
assert MONKEYED_POLICY_ID in policies
policy = policies[MONKEYED_POLICY_ID]
expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1")
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1)
assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy)
assert MONKEYED_POLICY_ID_B in policies
policy = policies[MONKEYED_POLICY_ID_B]
expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5")
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b)
assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy)
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID, MONKEYED_POLICY_ID_M_2]
@cfy_ctx(include_bad=True)
def test_update_policies_not():
"""test policy_update - ignore all policies with junk params"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
assert APPLICATION_CONFIG in runtime_properties
policies = runtime_properties[POLICIES]
app_config = runtime_properties[APPLICATION_CONFIG]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
ctx.logger.info("app_config: {0}".format(json.dumps(app_config)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID]
expected_policies = copy.deepcopy(policies)
expected_app_config = copy.deepcopy(app_config)
expected_policy_apply_order = copy.deepcopy(policy_apply_order)
existing_policy = MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID, priority="1")
damaged_policy = MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_2)
del damaged_policy[POLICY_BODY][POLICY_CONFIG]
updated_policy = MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_M_3, 3)
added_policy = MonkeyedPolicyBody.create_policy_bare(MONKEYED_POLICY_ID_M_2, 4)
junk_policy_filter_id = "<<<junk_removed_policy_id>>>"
unexpected_removed_policy_id = "<<<junk_removed_policy_id>>>"
ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy)))
assert MONKEYED_POLICY_ID in policies
assert MONKEYED_POLICY_ID_2 in policies
assert MONKEYED_POLICY_ID_M_2 not in policies
assert MONKEYED_POLICY_ID_M_3 not in policies
assert unexpected_removed_policy_id not in policies
assert junk_policy_filter_id not in runtime_properties.get(dcae_policy.POLICY_FILTERS, {})
policy_filter_ids = runtime_properties.get(dcae_policy.POLICY_FILTERS, {}).keys() or ["--"]
policy_update(updated_policies=[existing_policy, damaged_policy, updated_policy],
added_policies={
junk_policy_filter_id: {
POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}},
policy_filter_ids[0]: {
POLICIES: {MONKEYED_POLICY_ID_2: damaged_policy}}
},
removed_policies=[unexpected_removed_policy_id])
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
ctx.logger.info("policies not changed: {0}".format(json.dumps(policies)))
assert MonkeyedPolicyBody.is_the_same_dict(policies, expected_policies)
assert MonkeyedPolicyBody.is_the_same_dict(expected_policies, policies)
ctx.logger.info("app_config not changed: {0}".format(json.dumps(app_config)))
assert MonkeyedPolicyBody.is_the_same_dict(app_config, expected_app_config)
assert MonkeyedPolicyBody.is_the_same_dict(expected_app_config, app_config)
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == expected_policy_apply_order
@cfy_ctx(include_bad=True)
def test_update_many_calcs():
"""test policy_update"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID]
updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20")
added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2,
False, priority="1")
ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy)))
ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2))
assert MONKEYED_POLICY_ID_M_2 not in policies
policy_filter_ids = runtime_properties.get(dcae_policy.POLICY_FILTERS, {}).keys() or ["--"]
policy_update_many_calcs(updated_policies=[updated_policy],
added_policies={
policy_filter_ids[0]: {
POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}}
},
removed_policies=[MONKEYED_POLICY_ID_M])
ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M))
assert MONKEYED_POLICY_ID_M not in policies
# assert Policies._get_config_from_policy(policies[MONKEYED_POLICY_ID_M]) is None
assert MONKEYED_POLICY_ID_M_2 in policies
policy = policies[MONKEYED_POLICY_ID_M_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy)
assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy)
assert MONKEYED_POLICY_ID_2 in policies
policy = policies[MONKEYED_POLICY_ID_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy)
assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy)
assert MONKEYED_POLICY_ID in policies
policy = policies[MONKEYED_POLICY_ID]
expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1")
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1)
assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy)
assert MONKEYED_POLICY_ID_B in policies
policy = policies[MONKEYED_POLICY_ID_B]
expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5")
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b)
assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy)
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID, MONKEYED_POLICY_ID_M_2]
@cfy_ctx(include_bad=True)
def test_remove_all_policies():
"""test policy_update - remove all policies"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
remove_policy_ids = policies.keys()
policy_update(updated_policies=None, added_policies=None, removed_policies=remove_policy_ids)
ctx.logger.info("removed: {0}".format(remove_policy_ids))
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
assert runtime_properties[dcae_policy.POLICY_APPLY_ORDER] == []
assert dcae_policy.POLICY_DEFAULTED_FIELDS in runtime_properties
assert Policies.get_policy_configs() == []
defaulted_fields = runtime_properties[dcae_policy.POLICY_DEFAULTED_FIELDS]
expected_defaulted_fields = dict(
(k, True)
for k in MonkeyedPolicyBody.create_policy_body(MONKEYED_POLICY_ID)[POLICY_CONFIG]
)
assert MonkeyedPolicyBody.is_the_same_dict(defaulted_fields, expected_defaulted_fields)
assert MonkeyedPolicyBody.is_the_same_dict(expected_defaulted_fields, defaulted_fields)
assert APPLICATION_CONFIG in runtime_properties
assert APPLICATION_CONFIG in ctx.node.properties
app_config = runtime_properties[APPLICATION_CONFIG]
expected_config = dict(ctx.node.properties[APPLICATION_CONFIG])
ctx.logger.info("expected = default application_config: {0}".format(json.dumps(app_config)))
assert MonkeyedPolicyBody.is_the_same_dict(app_config, expected_config)
assert MonkeyedPolicyBody.is_the_same_dict(expected_config, app_config)
@cfy_ctx(include_bad=True)
def test_remove_all_policies_twice():
"""test policy_update - remove all policies twice"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
remove_policy_ids = policies.keys()
policy_update(updated_policies=None, added_policies=None, removed_policies=remove_policy_ids)
policy_update(updated_policies=None, added_policies=None, removed_policies=remove_policy_ids)
ctx.logger.info("removed: {0}".format(remove_policy_ids))
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
assert runtime_properties[dcae_policy.POLICY_APPLY_ORDER] == []
assert dcae_policy.POLICY_DEFAULTED_FIELDS in runtime_properties
assert Policies.get_policy_configs() == []
defaulted_fields = runtime_properties[dcae_policy.POLICY_DEFAULTED_FIELDS]
expected_defaulted_fields = dict(
(k, True)
for k in MonkeyedPolicyBody.create_policy_body(MONKEYED_POLICY_ID)[POLICY_CONFIG]
)
assert MonkeyedPolicyBody.is_the_same_dict(defaulted_fields, expected_defaulted_fields)
assert MonkeyedPolicyBody.is_the_same_dict(expected_defaulted_fields, defaulted_fields)
assert APPLICATION_CONFIG in runtime_properties
assert APPLICATION_CONFIG in ctx.node.properties
app_config = runtime_properties[APPLICATION_CONFIG]
expected_config = dict(ctx.node.properties[APPLICATION_CONFIG])
ctx.logger.info("expected = default application_config: {0}".format(json.dumps(app_config)))
assert MonkeyedPolicyBody.is_the_same_dict(app_config, expected_config)
assert MonkeyedPolicyBody.is_the_same_dict(expected_config, app_config)
@cfy_ctx(include_bad=True)
def test_remove_then_update():
"""test policy_update"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID]
remove_policy_ids = policies.keys()
policy_update(updated_policies=None, added_policies=None, removed_policies=remove_policy_ids)
updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20")
added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2,
False, priority="1")
ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy)))
ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2))
assert MONKEYED_POLICY_ID_M_2 not in policies
policy_filter_ids = runtime_properties.get(dcae_policy.POLICY_FILTERS, {}).keys() or ["--"]
policy_update(updated_policies=[updated_policy],
added_policies={
policy_filter_ids[0]: {
POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}}
},
removed_policies=[MONKEYED_POLICY_ID_M])
ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M))
assert MONKEYED_POLICY_ID_M not in policies
assert MONKEYED_POLICY_ID_M_2 in policies
policy = policies[MONKEYED_POLICY_ID_M_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy)
assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy)
assert MONKEYED_POLICY_ID_2 in policies
policy = policies[MONKEYED_POLICY_ID_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy)
assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy)
assert MONKEYED_POLICY_ID in policies
assert Policies._get_config_from_policy(policies[MONKEYED_POLICY_ID]) is None
assert MONKEYED_POLICY_ID_B in policies
assert Policies._get_config_from_policy(policies[MONKEYED_POLICY_ID_B]) is None
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_M_2]
@cfy_ctx(include_bad=True)
def test_remove_update_many_calcs():
"""test policy_update"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID]
remove_policy_ids = policies.keys()
policy_update_many_calcs(updated_policies=None,
added_policies=None,
removed_policies=remove_policy_ids)
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == []
assert dcae_policy.POLICY_DEFAULTED_FIELDS in runtime_properties
assert Policies.get_policy_configs() == []
defaulted_fields = runtime_properties[dcae_policy.POLICY_DEFAULTED_FIELDS]
expected_defaulted_fields = dict(
(k, True)
for k in MonkeyedPolicyBody.create_policy_body(MONKEYED_POLICY_ID)[POLICY_CONFIG]
)
assert MonkeyedPolicyBody.is_the_same_dict(defaulted_fields, expected_defaulted_fields)
assert MonkeyedPolicyBody.is_the_same_dict(expected_defaulted_fields, defaulted_fields)
updated_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20")
added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2,
False, priority="1")
ctx.logger.info("policy_update: [{0}]".format(json.dumps(updated_policy)))
ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2))
assert MONKEYED_POLICY_ID_M_2 not in policies
policy_filter_ids = runtime_properties.get(dcae_policy.POLICY_FILTERS, {}).keys() or ["--"]
policy_update_many_calcs(updated_policies=[updated_policy],
added_policies={
policy_filter_ids[0]: {
POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}}
},
removed_policies=[MONKEYED_POLICY_ID_M])
ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M))
assert MONKEYED_POLICY_ID_M not in policies
assert MONKEYED_POLICY_ID_M_2 in policies
policy = policies[MONKEYED_POLICY_ID_M_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy)
assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy)
assert MONKEYED_POLICY_ID_2 in policies
policy = policies[MONKEYED_POLICY_ID_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, updated_policy)
assert MonkeyedPolicyBody.is_the_same_dict(updated_policy, policy)
assert MONKEYED_POLICY_ID in policies
assert Policies._get_config_from_policy(policies[MONKEYED_POLICY_ID]) is None
assert MONKEYED_POLICY_ID_B in policies
assert Policies._get_config_from_policy(policies[MONKEYED_POLICY_ID_B]) is None
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_M_2]
@cfy_ctx(include_bad=True)
def test_bad_update_many_calcs():
"""test policy_update"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_M, MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID]
damaged_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_2, 2, priority="aa20")
damaged_policy[POLICY_BODY][POLICY_CONFIG] = ["damaged config"]
added_policy = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_M_2, 2,
False, priority="1")
added_policy[POLICY_BODY][POLICY_CONFIG] = {"unexpected":"foo", "none": None}
ctx.logger.info("policy_update: [{0}]".format(json.dumps(damaged_policy)))
ctx.logger.info("policy[{0}]: not yet in policies".format(MONKEYED_POLICY_ID_M_2))
assert MONKEYED_POLICY_ID_M_2 not in policies
policy_filter_ids = runtime_properties.get(dcae_policy.POLICY_FILTERS, {}).keys() or ["--"]
policy_update_many_calcs(updated_policies=[damaged_policy],
added_policies={
policy_filter_ids[0]: {
POLICIES: {MONKEYED_POLICY_ID_M_2: added_policy}}
},
removed_policies=[MONKEYED_POLICY_ID_M])
ctx.logger.info("policy[{0}]: removed".format(MONKEYED_POLICY_ID_M))
assert MONKEYED_POLICY_ID_M not in policies
assert MONKEYED_POLICY_ID_M_2 in policies
policy = policies[MONKEYED_POLICY_ID_M_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_M_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, added_policy)
assert MonkeyedPolicyBody.is_the_same_dict(added_policy, policy)
assert MONKEYED_POLICY_ID_2 in policies
policy = policies[MONKEYED_POLICY_ID_2]
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_2, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, damaged_policy)
assert MonkeyedPolicyBody.is_the_same_dict(damaged_policy, policy)
assert MONKEYED_POLICY_ID in policies
policy = policies[MONKEYED_POLICY_ID]
expected_1 = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID, priority="1")
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_1)
assert MonkeyedPolicyBody.is_the_same_dict(expected_1, policy)
assert MONKEYED_POLICY_ID_B in policies
policy = policies[MONKEYED_POLICY_ID_B]
expected_b = MonkeyedPolicyBody.create_policy(MONKEYED_POLICY_ID_B, 4, priority="1.5")
ctx.logger.info("expected[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(expected_1)))
ctx.logger.info("policy[{0}]: {1}".format(MONKEYED_POLICY_ID_B, json.dumps(policy)))
assert MonkeyedPolicyBody.is_the_same_dict(policy, expected_b)
assert MonkeyedPolicyBody.is_the_same_dict(expected_b, policy)
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == [
MONKEYED_POLICY_ID_2, MONKEYED_POLICY_ID_B, MONKEYED_POLICY_ID_M_2, MONKEYED_POLICY_ID]
@cfy_ctx(include_bad=True, include_good=False)
def test_bad_policies():
"""test bad policy nodes"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
assert policy_apply_order == []
@cfy_ctx(include_bad=True, include_good=False)
def test_wrong_ctx_node_configure():
"""test wrong ctx"""
current_ctx.set(ctx.instance.relationships[0])
ctx_type = ctx.type
with pytest.raises(NonRecoverableError) as excinfo:
node_configure()
CurrentCtx.reset()
ctx.logger.info("{0} not a node boom: {1}".format(ctx_type, str(excinfo.value)))
assert ctx_type == 'cloudify.relationships.depends_on'
assert str(excinfo.value) == "can only invoke gather_policies_to_node on node"
@cfy_ctx(include_bad=True, include_good=False)
def test_wrong_ctx_policy_update():
"""test wrong ctx"""
current_ctx.set(ctx.instance.relationships[0])
ctx_type = ctx.type
no_policy_configs = Policies.get_policy_configs()
with pytest.raises(NonRecoverableError) as excinfo:
policy_update(updated_policies=None, added_policies=None, removed_policies=None)
CurrentCtx.reset()
ctx.logger.info("{0} not a node boom: {1}".format(ctx_type, str(excinfo.value)))
assert ctx_type == 'cloudify.relationships.depends_on'
assert no_policy_configs == []
assert str(excinfo.value) == "can only invoke update_policies_on_node on node"
def test_defenses_on_decorators():
"""test defenses of code mainly for code coverage"""
should_be_none = Policies.gather_policies_to_node()(None)
assert should_be_none is None
should_be_none = Policies.update_policies_on_node()(None)
assert should_be_none is None
def monkeyed_set_policies_boom(policies):
"""monkeypatch for exception"""
raise Exception("Policies._set_policies boom: {0}".format(json.dumps(policies or {})))
@pytest.fixture()
def fix_boom_gather(monkeypatch):
"""monkeyed boom"""
monkeypatch.setattr('onap_dcae_dcaepolicy_lib.dcae_policy.Policies._set_policies',
monkeyed_set_policies_boom)
return fix_boom_gather # provide the fixture value
@cfy_ctx(include_bad=True, include_good=False)
@pytest.mark.usefixtures("fix_boom_gather")
def test_exception_on_gather():
"""test exception on gather"""
with pytest.raises(NonRecoverableError) as excinfo:
node_configure()
ctx.logger.info("monkeyed_set_policies_boom: {0}".format(str(excinfo.value)))
assert str(excinfo.value).startswith("Failed to set the policies ")
def monkeyed_update_policies_boom(policies):
"""monkeypatch for exception"""
raise Exception("Policies._update_policies boom")
@pytest.fixture()
def fix_boom_update_policies(monkeypatch):
"""monkeyed boom"""
monkeypatch.setattr('onap_dcae_dcaepolicy_lib.dcae_policy.Policies._update_policies',
monkeyed_update_policies_boom)
return fix_boom_update_policies # provide the fixture value
@cfy_ctx(include_bad=True, include_good=False)
@pytest.mark.usefixtures("fix_boom_update_policies")
def test_exception_on_update():
"""test exception on update_policies"""
with pytest.raises(NonRecoverableError) as excinfo:
policy_update(updated_policies=None, added_policies=None, removed_policies=None)
ctx.logger.info("monkeyed_update_policies_boom: {0}".format(str(excinfo.value)))
assert str(excinfo.value).startswith("Failed to update the policies ")
@cfy_ctx(include_bad=True, include_good=False)
def test_defenses_on_policy_update():
"""test defenses on policy_update"""
policy_update(updated_policies=None, added_policies=None, removed_policies=None)
ctx.logger.info("policy_update() ok")
@cfy_ctx(include_bad=True, include_good=False)
def test_defenses_on_set_policies():
"""test defenses of code mainly for code coverage"""
node_configure()
runtime_properties = ctx.instance.runtime_properties
ctx.logger.info("runtime_properties: {0}".format(json.dumps(runtime_properties)))
assert POLICIES in runtime_properties
policies = runtime_properties[POLICIES]
ctx.logger.info("policies: {0}".format(json.dumps(policies)))
assert dcae_policy.POLICY_APPLY_ORDER in runtime_properties
policy_apply_order = runtime_properties[dcae_policy.POLICY_APPLY_ORDER]
ctx.logger.info("policy_apply_order: {0}".format(json.dumps(policy_apply_order)))
Policies._set_policies({})
assert POLICIES not in runtime_properties
assert dcae_policy.POLICY_APPLY_ORDER not in runtime_properties
Policies._set_policies({})
assert POLICIES not in runtime_properties
assert dcae_policy.POLICY_APPLY_ORDER not in runtime_properties