blob: fb4c66d699859a2efb9bad1524a572363047ce6b [file] [log] [blame]
# ============LICENSE_START=======================================================
# INTEGRATION CSIT
# ================================================================================
# Copyright (C) 2018 Nokia Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
import json
import logging
from functools import partial
from sys import argv
from http.server import BaseHTTPRequestHandler, HTTPServer
DEFAULT_PORT = 8443
class SOHandler(BaseHTTPRequestHandler):
def __init__(self, expected_requests, expected_responses, requests, responses, *args, **kwargs):
self._expected_requests = expected_requests
self._expected_responses = expected_responses
self._requests = requests["requests"]
self._responses = responses["responses"]
self._known_endpoints = self._get_known_endpoints()
super().__init__(*args, **kwargs)
def do_POST(self):
logging.info(
'POST called. Expected POST REQUEST: ' + json.dumps(
self._expected_requests["post"]) + '\nExpected POST response: ' +
json.dumps(self._expected_responses["post"]))
self.send_response(200)
self._set_headers()
self.wfile.write(json.dumps(self._expected_responses["post"]).encode("utf-8"))
return
def do_GET(self):
if self._does_path_exist_as_endpoint():
self.rest_method("GET")
else:
logging.info('GET called. Expected GET REQUEST: '
+ json.dumps(self._expected_requests["get"])
+ '\nExpected GET response: '
+ json.dumps(self._expected_responses["get"]))
self.send_response(200)
self._set_headers()
self.wfile.write(json.dumps(self._expected_responses["get"]).encode("utf-8"))
def do_PUT(self):
if self._does_path_exist_as_endpoint():
self.rest_method("PUT")
else:
request_body_json = self._get_request_body()
if request_body_json is not None:
self._apply_expected_data(request_body_json)
logging.info("EXPECTED RESPONSES: " + str(self._expected_responses))
logging.info("EXPECTED REQUESTS: " + str(self._expected_requests))
response_status = 200
else:
response_status = 400
self.send_response(response_status)
self._set_headers()
def rest_method(self,method):
expected_response = self._get_response_by_endpoint(self.path,method)
if expected_response:
self.send_response(expected_response["responseCode"])
self._set_headers()
self.wfile.write(json.dumps(expected_response["body"]).encode("utf-8"))
else:
response_body = "{\"message:\" \"no response for endpoint\"}"
self.send_response(400)
self._set_headers()
self.wfile.write(json.dumps(response_body).encode("utf-8"))
def _get_request_body(self):
content_len = int(self.headers['Content-Length'], 0)
parsed_req_body = None
if content_len > 0:
body = self.rfile.read(content_len)
body_decoded = body.decode('utf8')
logging.info("BODY: %s type: %s body decoded: %s type: %s", str(body), type(body), str(body_decoded),
type(body_decoded))
parsed_req_body = json.loads(body_decoded)
return parsed_req_body
def _apply_expected_data(self, request_body_json):
if self.path == '/setResponse':
logging.info("IN PUT /setResponse: " + str(request_body_json))
self._expected_responses.update(request_body_json)
elif self.path == '/setRequest':
logging.info("IN PUT /setRequest: " + str(request_body_json))
self._expected_requests.update(request_body_json)
def _set_headers(self):
self.send_header('Content-Type', 'application/json')
self.end_headers()
def _get_response_by_endpoint(self, endpoint, method):
for response in self._responses:
if response["path"] == endpoint and response["method"] == method:
return response
def _does_path_exist_as_endpoint(self):
does_exist = False
for ep in self._known_endpoints:
if ep == self.path:
does_exist = True
break
return does_exist
def _create_path_from_request(self, request):
base_uri = request["path"]
query_params = request["queryParams"]
return base_uri + self._get_params_uri(query_params)
def _get_known_endpoints(self):
endpoints = []
for request in self._requests:
endpoints.append(self._create_path_from_request(request))
return endpoints
@staticmethod
def _get_params_uri(query_params):
params_uri = "?"
for parameter in query_params:
params_uri += parameter + '=' + query_params.get(parameter, '') + '&'
return params_uri[:-1]
class JsonFileToDictReader(object):
@staticmethod
def read_expected_test_data(expected_responses_filename):
with open(expected_responses_filename, 'r') as file:
return json.load(file)
def init_so_simulator():
requests = JsonFileToDictReader.read_expected_test_data("test_data_assets/requests.json")
responses = JsonFileToDictReader.read_expected_test_data("test_data_assets/responses.json")
expected_so_requests = JsonFileToDictReader.read_expected_test_data(argv[1])
expected_so_responses = JsonFileToDictReader.read_expected_test_data(argv[2])
logging.basicConfig(level=logging.INFO)
handler = partial(SOHandler, expected_so_requests, expected_so_responses, requests, responses)
handler.protocol_version = "HTTP/1.0"
httpd = HTTPServer(('', DEFAULT_PORT), handler)
logging.info("serving on: " + str(httpd.socket.getsockname()))
httpd.serve_forever()
if __name__ == '__main__':
init_so_simulator()