| # ============LICENSE_START======================================================= |
| # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved. |
| # ================================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END========================================================= |
| # |
| """ |
| standard pytest file that contains the shared fixtures |
| https://docs.pytest.org/en/latest/fixture.html |
| """ |
| |
| import time |
| |
| import pytest |
| from policyhandler import pdp_client |
| from policyhandler.deploy_handler import DeployHandler |
| from policyhandler.onap.audit import Audit |
| from policyhandler.pdp_api.dmaap_mr import DmaapMr |
| from policyhandler.utils import Utils |
| |
| from ..mock_tracker import MockHttpResponse |
| |
| _LOGGER = Utils.get_logger(__file__) |
| |
| @pytest.fixture() |
| def fix_pdp_post(monkeypatch): |
| """monkeyed request /decision/v1 to PDP""" |
| def monkeyed_policy_rest_post(uri, json=None, **kwargs): |
| """monkeypatch for the POST to policy-engine""" |
| return MockHttpResponse("post", uri, json=json, **kwargs) |
| |
| _LOGGER.info("setup fix_pdp_post") |
| pdp_client.PolicyRest._lazy_inited = False |
| pdp_client.PolicyRest._lazy_init() |
| monkeypatch.setattr('policyhandler.pdp_client.PolicyRest._requests_session.post', |
| monkeyed_policy_rest_post) |
| yield fix_pdp_post |
| _LOGGER.info("teardown fix_pdp_post") |
| |
| @pytest.fixture() |
| def fix_deploy_handler(monkeypatch): |
| """monkeyed requests to deployment-handler""" |
| def monkeyed_deploy_handler_put(uri, **kwargs): |
| """monkeypatch for policy-update request.put to deploy_handler""" |
| return MockHttpResponse("put", uri, **kwargs) |
| |
| def monkeyed_deploy_handler_get(uri, **kwargs): |
| """monkeypatch policy-update request.get to deploy_handler""" |
| return MockHttpResponse("get", uri, **kwargs) |
| |
| _LOGGER.info("setup fix_deploy_handler") |
| audit = None |
| if DeployHandler._lazy_inited is False: |
| audit = Audit(req_message="fix_deploy_handler") |
| DeployHandler._lazy_init(audit) |
| |
| monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.put', |
| monkeyed_deploy_handler_put) |
| monkeypatch.setattr('policyhandler.deploy_handler.DeployHandler._requests_session.get', |
| monkeyed_deploy_handler_get) |
| |
| yield fix_deploy_handler |
| if audit: |
| audit.audit_done("teardown") |
| _LOGGER.info("teardown fix_deploy_handler") |
| |
| @pytest.fixture() |
| def fix_dmaap_mr(monkeypatch): |
| """monkeyed requests to dmaap_mr""" |
| def monkeyed_dmaap_mr_get(uri, **kwargs): |
| """monkeypatch policy-update request.get to dmaap_mr""" |
| if kwargs.get("params"): |
| _LOGGER.info("--- fix_dmaap_mr --- sleeping 3 secs...") |
| time.sleep(3) |
| else: |
| _LOGGER.info("--- fix_dmaap_mr --- sleeping 0.5 secs...") |
| time.sleep(0.5) |
| _LOGGER.info("--- fix_dmaap_mr --- send back the response") |
| return MockHttpResponse("get", uri, **kwargs) |
| |
| _LOGGER.info("setup fix_dmaap_mr") |
| audit = Audit(req_message="fix_dmaap_mr") |
| DmaapMr._lazy_inited = False |
| DmaapMr._lazy_init(audit) |
| |
| monkeypatch.setattr('policyhandler.pdp_api.dmaap_mr.DmaapMr._requests_session.get', |
| monkeyed_dmaap_mr_get) |
| |
| yield fix_dmaap_mr |
| audit.audit_done("teardown") |
| _LOGGER.info("teardown fix_dmaap_mr") |