blob: 116303458231ce998125d4648f0a8281c3d289a0 [file] [log] [blame]
# Copyright (c) 2019 AT&T Intellectual Property. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); #
# you may not use this file except in compliance with the License. #
# You may obtain a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
################################################################################
from flask import Flask, request, make_response, jsonify, g
import json
import uuid
import traceback
import pyping
import paramiko
import socket
import os
import subprocess
import datetime
import logging
from logging import FileHandler
#redirect http to https
app = Flask(__name__)
# Prevents print statement every time an endpoint is triggered.
logging.getLogger("werkzeug").setLevel(logging.WARNING)
def unix_time_millis(dt):
epoch = datetime.datetime.utcfromtimestamp(0)
return (dt - epoch).total_seconds() * 1000.0
def pingServer(targetHost):
try:
response = subprocess.check_output(
['ping', '-c', '1', targetHost], # execute the ping command
stderr = subprocess.STDOUT, # retrieve all the output
universal_newlines = True # return as string
)
except subprocess.CalledProcessError as e:
app.logger.error(e)
app.logger.error('failed getting response from ' + str(targetHost))
response = None
return response
@app.route("/otf/vth/ping/v1/health", methods = ['GET'])
def getHealth():
return "UP"
@app.route('/otf/vth/sample/v1', methods = ['POST'])
def sample():
startTime = unix_time_millis(datetime.datetime.now())
responseData = {
"vthResponse": {
"testDurationMS": "",
"dateTimeUTC": "",
"abstractMessage": "Success",
"resultData": {}
}
}
responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now())
endTime = unix_time_millis(datetime.datetime.now())
responseData['vthResponse']['testDurationMS'] = endTime - startTime
responseData['vthResponse']['resultData']['result'] = "Executed test successfully in " + str(responseData['vthResponse']['testDurationMS']) + " milliseconds."
app.logger.info('hit sample endpoint. response: ' + str(responseData))
return jsonify(responseData)
@app.route('/otf/vth/ping/v1', methods = ['POST'])
def testHead():
responseData = {
"vthResponse": {
"testDurationMS": "",
"dateTimeUTC": "",
"abstractMessage": "",
"resultData": {}
}
}
responseData['vthResponse']['dateTimeUTC'] = str(datetime.datetime.now())
startTime = unix_time_millis(datetime.datetime.now())
try:
if not request.is_json:
raise ValueError('Request must be a valid JSON object.')
requestData = request.get_json()
app.logger.info('ping endpoint. request: ' + str(requestData))
if 'vthInput' in requestData:
vthInput = requestData['vthInput']
expectedKeys = ['vthName', 'testConfig', 'testData']
receivedKeys = vthInput.keys();
testData = ""
testConfig = ""
if sorted(expectedKeys) == sorted(receivedKeys):
testData = vthInput['testData']
# Check if a target host is provided.
if 'targetHost' not in testData:
raise KeyError('targetHost is required to ping server.')
# Check if the target host IP address is in the correct format.
# This excludes IPv6. Use IPy to check both IPv6/IPv4.
try:
socket.inet_aton(testData['targetHost'])
except socket.error:
raise ValueError('Invalid IP address assigned to targetHost')
# Don't use a jump server by default.
if 'useJumpServer' not in testData:
testData['useJumpServer'] = False
else:
raise ValueError('Missing one or more expected keys: {expectedKeys}.'.format(expectedKeys = expectedKeys))
if testData['useJumpServer'] == False:
responseData['vthResponse']['resultData']['result'] = pingServer(testData['targetHost'])
else:
testConfig = vthInput['testConfig']
if 'jumpServer' not in testConfig:
raise KeyError('Cannot use jump server when jumpServer key is missing.')
jumpServer = testConfig['jumpServer']
if 'host' not in testConfig['jumpServer']:
raise KeyError('Missing host value in jumpServer.')
host = testConfig['jumpServer']['host']
if 'credentials' not in jumpServer:
raise KeyError('Missing credentials in jumpServer.')
credentials = jumpServer['credentials']
if 'username' not in credentials:
raise KeyError('Missing username in credentials.')
username = credentials['username']
if 'password' not in credentials:
raise KeyError('Missing password in credentials.')
password = credentials['password']
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username = username, password = password)
command = "ping -c 1 " + testData['targetHost']
ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command)
output = ssh_stdout.read()
error = ssh_stderr.read()
responseData['vthResponse']['resultData']['result'] = output
else:
raise KeyError('Missing vthInput parameter(s)')
# record the end time of the test
endTime = unix_time_millis(datetime.datetime.now())
# Calculate the total duration of the test
totalTime = endTime - startTime
# Set the test duration in the result
responseData['vthResponse']['testDurationMS'] = totalTime
responseData['vthResponse']['abstractMessage'] = 'Result from pinging {host}'.format(host = testData['targetHost'])
app.logger.info('ping endpoint. response: ' + str(responseData))
return jsonify(responseData)
except Exception as e:
app.logger.info(e)
responseData['vthResponse']['abstractMessage'] = str(e)
resp = make_response(json.dumps(responseData))
endTime = unix_time_millis(datetime.datetime.now())
totalTime = endTime - startTime
return resp
if __name__ == '__main__':
logHandler = FileHandler('otf/logs/pingVTH.log', mode='a')
# logHandler = FileHandler('pingVTH.log', mode='a')
logHandler.setLevel(logging.INFO)
app.logger.setLevel(logging.INFO)
app.logger.addHandler(logHandler)
context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem')
app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context)
# app.run(debug = False, host = '0.0.0.0', port = 5000)