| # ============LICENSE_START======================================================= |
| # Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. |
| # ================================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END========================================================= |
| # |
| """record all the messages going outside policy-handler during testing""" |
| |
| import copy |
| import json |
| |
| from policyhandler.config import Config |
| from policyhandler.onap.audit import (REQUEST_X_ECOMP_REQUESTID, |
| REQUEST_X_ONAP_REQUESTID) |
| from policyhandler.utils import Utils |
| |
| REQUEST = "request" |
| STATUS_CODE = "status_code" |
| RESPONSE = "res" |
| PEP_INSTANCE = "ONAPInstance" |
| _LOGGER = Utils.get_logger(__file__) |
| |
| class _MockHttpRequestInResponse(object): |
| """Mock http request in reponse object""" |
| def __init__(self, method, uri, **kwargs): |
| self.method = method |
| self.uri = uri |
| self.params = copy.deepcopy(kwargs.get("params")) |
| self.req_json = copy.deepcopy(kwargs.get("json")) |
| self.headers = copy.deepcopy(kwargs.get("headers")) |
| |
| def to_json(self): |
| """create json of the request""" |
| return { |
| "method": self.method, |
| "uri": self.uri, |
| "params": self.params, |
| "json": self.req_json, |
| "headers": self.headers |
| } |
| |
| |
| class MockHttpResponse(object): |
| """Mock http response based on request""" |
| def __init__(self, method, uri, res_json=None, **kwargs): |
| """create response based on request""" |
| self.request = _MockHttpRequestInResponse(method, uri, **kwargs) |
| self.headers = {} |
| |
| self.status_code, self.res = Tracker.get_response(self.request.to_json()) |
| if self.status_code is None and res_json: |
| self.status_code = kwargs.get(STATUS_CODE, 200) |
| if res_json: |
| self.res = copy.deepcopy(res_json) |
| if self.status_code is None: |
| self.status_code = 500 |
| self.text = json.dumps(self.res) |
| |
| _LOGGER.info("MockHttpResponse: %s", self) |
| |
| def json(self): |
| """returns json of response""" |
| return self.res |
| |
| def raise_for_status(self): |
| """ignoring""" |
| pass |
| |
| def to_json(self): |
| """create json of the message""" |
| return { |
| REQUEST: self.request.to_json(), |
| STATUS_CODE: self.status_code, |
| RESPONSE: self.res |
| } |
| |
| def __str__(self): |
| """stringify for logging""" |
| return json.dumps(self.to_json(), sort_keys=True) |
| |
| |
| class Tracker(object): |
| """record all the messages going outside policy-handler during testing""" |
| test_name = None |
| test_names = [] |
| |
| requests = [] |
| expected = [] |
| |
| validated_tests = {} |
| valid_tests = {} |
| |
| main_history = {} |
| pdp_api_v0_history = {} |
| |
| @staticmethod |
| def _init(): |
| """load expected data from json files""" |
| try: |
| with open("tests/main/expectations.json", 'r') as expectations: |
| Tracker.main_history = json.load(expectations) |
| except Exception: |
| Tracker.main_history = {} |
| |
| try: |
| with open("tests/pdp_api_v0/expectations.json", 'r') as expectations: |
| Tracker.pdp_api_v0_history = json.load(expectations) |
| except Exception: |
| Tracker.pdp_api_v0_history = {} |
| |
| @staticmethod |
| def reset(test_name=None): |
| """remove all the messages from history""" |
| if not Tracker.test_names: |
| Tracker._init() |
| |
| Tracker.test_name = test_name |
| Tracker.requests.clear() |
| Tracker.test_names.append(test_name) |
| |
| if Config.is_pdp_api_default(): |
| Tracker.expected = Tracker.main_history.get(Tracker.test_name, []) |
| else: |
| Tracker.expected = Tracker.pdp_api_v0_history.get(Tracker.test_name, []) |
| |
| |
| @staticmethod |
| def get_response(request): |
| """ |
| track the request to the history of requests |
| and return the response with the status_code from the expected history queue |
| """ |
| request_idx = len(Tracker.requests) |
| request = copy.deepcopy(request) |
| Tracker.requests.append(request) |
| |
| if request_idx < len(Tracker.expected): |
| expected = Tracker.expected[request_idx] or {} |
| masked_request = Tracker._hide_volatiles(copy.deepcopy(request)) |
| expected_request = Tracker._hide_volatiles(copy.deepcopy(expected.get(REQUEST))) |
| if Utils.are_the_same(masked_request, expected_request): |
| _LOGGER.info("as expected[%s]: %s", request_idx, |
| json.dumps(expected, sort_keys=True)) |
| return expected.get(STATUS_CODE), expected.get(RESPONSE) |
| |
| unexpected_request = {"unit-test-tracker": { |
| "request_idx": request_idx, |
| "received_request": copy.deepcopy(request), |
| "expected": copy.deepcopy(expected.get(REQUEST)) |
| }} |
| _LOGGER.error("unexpected_request[%s]: %s", request_idx, |
| json.dumps(unexpected_request, sort_keys=True)) |
| return None, unexpected_request |
| |
| unexpected_request = {"unit-test-tracker":{ |
| "request_idx": request_idx, "out-of-range": len(Tracker.expected), |
| "received_request": copy.deepcopy(request) |
| }} |
| _LOGGER.error("unexpected_request[%s]: %s", request_idx, |
| json.dumps(unexpected_request, sort_keys=True)) |
| return None, unexpected_request |
| |
| @staticmethod |
| def to_string(): |
| """stringify message history for logging""" |
| return json.dumps(Tracker.requests, sort_keys=True) |
| |
| @staticmethod |
| def get_status(test_name=None): |
| """get the status of validation""" |
| if Tracker.valid_tests.get(test_name): |
| return "success" |
| if Tracker.validated_tests.get(test_name): |
| return "failed" |
| if test_name in Tracker.test_names: |
| return "covered" |
| return "unknown" |
| |
| @staticmethod |
| def log_all_tests(): |
| """log the covered and not covered test names""" |
| _LOGGER.info("-"*75) |
| _LOGGER.info("tracked test_names[%s]", len(Tracker.test_names)) |
| for idx, test_name in enumerate(Tracker.test_names): |
| _LOGGER.info("%s[%s]: %s", Tracker.get_status(test_name), (idx + 1), test_name) |
| |
| _LOGGER.info("not tracked test_names listed in main.expectations") |
| |
| for test_name in Tracker.main_history: |
| if test_name not in Tracker.test_names: |
| _LOGGER.info("untracked: %s", test_name) |
| |
| _LOGGER.info("not tracked test_names listed in pdp_api_v0.expectations") |
| for test_name in Tracker.pdp_api_v0_history: |
| if test_name not in Tracker.test_names: |
| _LOGGER.info("untracked: %s", test_name) |
| |
| @staticmethod |
| def _hide_volatiles(obj): |
| """hides the volatile field values""" |
| if not isinstance(obj, dict): |
| return obj |
| |
| for key, value in obj.items(): |
| if key in [REQUEST_X_ONAP_REQUESTID, REQUEST_X_ECOMP_REQUESTID, RESPONSE, PEP_INSTANCE]: |
| obj[key] = "*" |
| elif isinstance(value, dict): |
| obj[key] = Tracker._hide_volatiles(value) |
| |
| return obj |
| |
| @staticmethod |
| def validate(): |
| """validate that the message history is as expected""" |
| _LOGGER.info("Tracker.validate(%s)", Tracker.test_name) |
| Tracker.validated_tests[Tracker.test_name] = True |
| requests = [Tracker._hide_volatiles(copy.deepcopy(request)) |
| for request in Tracker.requests] |
| expected_reqs = [Tracker._hide_volatiles(copy.deepcopy(expected.get(REQUEST))) |
| for expected in Tracker.expected] |
| |
| _LOGGER.info("requests: %s", json.dumps(requests, sort_keys=True)) |
| _LOGGER.info("expected: %s", json.dumps(expected_reqs, sort_keys=True)) |
| assert Utils.are_the_same(requests, expected_reqs) |
| |
| _LOGGER.info("history valid for Tracker.validate(%s)", Tracker.test_name) |
| Tracker.valid_tests[Tracker.test_name] = True |