blob: 11f5d0b9f83a1bed0ee58d3aedeb20e81fdc1578 [file] [log] [blame]
# ============LICENSE_START=======================================================
# Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
"""
standard pytest file that contains the shared fixtures
https://docs.pytest.org/en/latest/fixture.html
"""
import time
import pytest
from policyhandler import pdp_client
from policyhandler.deploy_handler import DeployHandler
from policyhandler.onap.audit import Audit
from policyhandler.pdp_api.dmaap_mr import DmaapMr
from policyhandler.utils import Utils
from ..mock_tracker import MockHttpResponse
_LOGGER = Utils.get_logger(__file__)
@pytest.fixture()
def fix_pdp_post(monkeypatch):
"""monkeyed request /decision/v1 to PDP"""
def monkeyed_policy_rest_post(uri, json=None, **kwargs):
"""monkeypatch for the POST to policy-engine"""
return MockHttpResponse("post", uri, json=json, **kwargs)
_LOGGER.info("setup fix_pdp_post")
pdp_client.PolicyRest._lazy_inited = False
pdp_client.PolicyRest._lazy_init()
monkeypatch.setattr('policyhandler.pdp_client.PolicyRest._requests_session.post',
monkeyed_policy_rest_post)
yield fix_pdp_post
_LOGGER.info("teardown fix_pdp_post")
@pytest.fixture()
def fix_deploy_handler(monkeypatch):
"""monkeyed requests to deployment-handler"""
def monkeyed_deploy_handler_put(uri, **kwargs):
"""monkeypatch for policy-update request.put to deploy_handler"""
return MockHttpResponse("put", uri, **kwargs)
def monkeyed_deploy_handler_get(uri, **kwargs):
"""monkeypatch policy-update request.get to deploy_handler"""
return MockHttpResponse("get", uri, **kwargs)
_LOGGER.info("setup fix_deploy_handler")
audit = None
if DeployHandler._lazy_inited is False:
audit = Audit(req_message="fix_deploy_handler")
DeployHandler._lazy_init(audit)
monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put',
monkeyed_deploy_handler_put)
monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get',
monkeyed_deploy_handler_get)
yield fix_deploy_handler
if audit:
audit.audit_done("teardown")
_LOGGER.info("teardown fix_deploy_handler")
@pytest.fixture()
def fix_dmaap_mr(monkeypatch):
"""monkeyed requests to dmaap_mr"""
def monkeyed_dmaap_mr_get(uri, **kwargs):
"""monkeypatch policy-update request.get to dmaap_mr"""
if kwargs.get("params"):
_LOGGER.info("--- fix_dmaap_mr --- sleeping 3 secs...")
time.sleep(3)
else:
_LOGGER.info("--- fix_dmaap_mr --- sleeping 0.5 secs...")
time.sleep(0.5)
_LOGGER.info("--- fix_dmaap_mr --- send back the response")
return MockHttpResponse("get", uri, **kwargs)
_LOGGER.info("setup fix_dmaap_mr")
audit = Audit(req_message="fix_dmaap_mr")
DmaapMr._lazy_inited = False
DmaapMr._lazy_init(audit)
monkeypatch.setattr('policyhandler.pdp_api.dmaap_mr.DmaapMr._requests_session.get',
monkeyed_dmaap_mr_get)
yield fix_dmaap_mr
audit.audit_done("teardown")
_LOGGER.info("teardown fix_dmaap_mr")