''' | |
Created on Aug 15, 2017 | |
@author: sw6830 | |
''' | |
import os | |
import posixpath | |
import BaseHTTPServer | |
import urllib | |
import urlparse | |
import cgi, sys, shutil, mimetypes | |
from jsonschema import validate | |
import jsonschema, json | |
import DcaeVariables | |
import SimpleHTTPServer | |
from robot.api import logger | |
try: | |
from cStringIO import StringIO | |
except ImportError: | |
from StringIO import StringIO | |
EvtSchema = None | |
DMaaPHttpd = None | |
def cleanUpEvent(): | |
sz = DcaeVariables.VESEventQ.qsize() | |
for i in range(sz): | |
try: | |
self.evtQueue.get_nowait() | |
except: | |
pass | |
def enqueEvent(evt): | |
if DcaeVariables.VESEventQ != None: | |
try: | |
DcaeVariables.VESEventQ.put(evt) | |
if DcaeVariables.IsRobotRun: | |
logger.console("DMaaP Event enqued - size=" + str(len(evt))) | |
else: | |
print ("DMaaP Event enqueued - size=" + str(len(evt))) | |
return True | |
except Exception as e: | |
print (str(e)) | |
return False | |
return False | |
def dequeEvent(waitSec=25): | |
if DcaeVariables.IsRobotRun: | |
logger.console("Enter DequeEvent") | |
try: | |
evt = DcaeVariables.VESEventQ.get(True, waitSec) | |
if DcaeVariables.IsRobotRun: | |
logger.console("DMaaP Event dequeued - size=" + str(len(evt))) | |
else: | |
print("DMaaP Event dequeued - size=" + str(len(evt))) | |
return evt | |
except Exception as e: | |
if DcaeVariables.IsRobotRun: | |
logger.console(str(e)) | |
logger.console("DMaaP Event dequeue timeout") | |
else: | |
print("DMaaP Event dequeue timeout") | |
return None | |
class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler): | |
def do_PUT(self): | |
self.send_response(405) | |
return | |
def do_POST(self): | |
respCode = 0 | |
# Parse the form data posted | |
''' | |
form = cgi.FieldStorage( | |
fp=self.rfile, | |
headers=self.headers, | |
environ={'REQUEST_METHOD':'POST', | |
'CONTENT_TYPE':self.headers['Content-Type'], | |
}) | |
form = cgi.FieldStorage( | |
fp=self.rfile, | |
headers=self.headers, | |
environ={"REQUEST_METHOD": "POST"}) | |
for item in form.list: | |
print "%s=%s" % (item.name, item.value) | |
''' | |
if 'POST' not in self.requestline: | |
respCode = 405 | |
''' | |
if respCode == 0: | |
if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \ | |
'/eventlistener/v5/clientThrottlingState' not in self.requestline: | |
respCode = 404 | |
if respCode == 0: | |
if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers): | |
respCode = 401 | |
''' | |
if respCode == 0: | |
content_len = int(self.headers.getheader('content-length', 0)) | |
post_body = self.rfile.read(content_len) | |
if DcaeVariables.IsRobotRun: | |
logger.console("\n" + "DMaaP Receive Event:\n" + post_body) | |
else: | |
print("\n" + "DMaaP Receive Event:") | |
print (post_body) | |
indx = post_body.index("{") | |
if indx != 0: | |
post_body = post_body[indx:] | |
if enqueEvent(post_body) == False: | |
print "enque event fails" | |
global EvtSchema | |
try: | |
if EvtSchema == None: | |
with open(DcaeVariables.CommonEventSchemaV5) as file: | |
EvtSchema = json.load(file) | |
decoded_body = json.loads(post_body) | |
jsonschema.validate(decoded_body, EvtSchema) | |
except: | |
respCode = 400 | |
# Begin the response | |
if DcaeVariables.IsRobotRun == False: | |
print ("Response Message:") | |
''' | |
{ | |
"200" : { | |
"description" : "Success", | |
"schema" : { | |
"$ref" : "#/definitions/DR_Pub" | |
} | |
} | |
rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}" | |
rspStr1 = "{'count': 1, 'serverTimeMs': 3}" | |
''' | |
if respCode == 0: | |
if 'clientThrottlingState' in self.requestline: | |
self.send_response(204) | |
else: | |
self.send_response(200) | |
self.send_header('Content-Type', 'application/json') | |
self.end_headers() | |
#self.wfile.write("{'responses' : {'200' : {'description' : 'Success'}}}") | |
self.wfile.write("{'count': 1, 'serverTimeMs': 3}") | |
self.wfile.close() | |
else: | |
self.send_response(respCode) | |
''' | |
self.end_headers() | |
self.wfile.write('Client: %s\n' % str(self.client_address)) | |
self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent'])) | |
self.wfile.write('Path: %s\n' % self.path) | |
self.wfile.write('Form data:\n') | |
self.wfile.close() | |
# Echo back information about what was posted in the form | |
for field in form.keys(): | |
field_item = form[field] | |
if field_item.filename: | |
# The field contains an uploaded file | |
file_data = field_item.file.read() | |
file_len = len(file_data) | |
del file_data | |
self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \ | |
(field, field_item.filename, file_len)) | |
else: | |
# Regular form value | |
self.wfile.write('\t%s=%s\n' % (field, form[field].value)) | |
''' | |
return | |
def do_GET(self): | |
"""Serve a GET request.""" | |
f = self.send_head() | |
if f: | |
try: | |
self.copyfile(f, self.wfile) | |
finally: | |
f.close() | |
def do_HEAD(self): | |
"""Serve a HEAD request.""" | |
f = self.send_head() | |
if f: | |
f.close() | |
def send_head(self): | |
"""Common code for GET and HEAD commands. | |
This sends the response code and MIME headers. | |
Return value is either a file object (which has to be copied | |
to the outputfile by the caller unless the command was HEAD, | |
and must be closed by the caller under all circumstances), or | |
None, in which case the caller has nothing further to do. | |
""" | |
path = self.translate_path(self.path) | |
f = None | |
if os.path.isdir(path): | |
parts = urlparse.urlsplit(self.path) | |
if not parts.path.endswith('/'): | |
# redirect browser - doing basically what apache does | |
self.send_response(301) | |
new_parts = (parts[0], parts[1], parts[2] + '/', | |
parts[3], parts[4]) | |
new_url = urlparse.urlunsplit(new_parts) | |
self.send_header("Location", new_url) | |
self.end_headers() | |
return None | |
for index in "index.html", "index.htm": | |
index = os.path.join(path, index) | |
if os.path.exists(index): | |
path = index | |
break | |
else: | |
return self.list_directory(path) | |
ctype = self.guess_type(path) | |
try: | |
# Always read in binary mode. Opening files in text mode may cause | |
# newline translations, making the actual size of the content | |
# transmitted *less* than the content-length! | |
f = open(path, 'rb') | |
except IOError: | |
self.send_error(404, "File not found") | |
return None | |
try: | |
self.send_response(200) | |
self.send_header("Content-type", ctype) | |
fs = os.fstat(f.fileno()) | |
self.send_header("Content-Length", str(fs[6])) | |
self.send_header("Last-Modified", self.date_time_string(fs.st_mtime)) | |
self.end_headers() | |
return f | |
except: | |
f.close() | |
raise | |
def list_directory(self, path): | |
"""Helper to produce a directory listing (absent index.html). | |
Return value is either a file object, or None (indicating an | |
error). In either case, the headers are sent, making the | |
interface the same as for send_head(). | |
""" | |
try: | |
list = os.listdir(path) | |
except os.error: | |
self.send_error(404, "No permission to list directory") | |
return None | |
list.sort(key=lambda a: a.lower()) | |
f = StringIO() | |
displaypath = cgi.escape(urllib.unquote(self.path)) | |
f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') | |
f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath) | |
f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath) | |
f.write("<hr>\n<ul>\n") | |
for name in list: | |
fullname = os.path.join(path, name) | |
displayname = linkname = name | |
# Append / for directories or @ for symbolic links | |
if os.path.isdir(fullname): | |
displayname = name + "/" | |
linkname = name + "/" | |
if os.path.islink(fullname): | |
displayname = name + "@" | |
# Note: a link to a directory displays with @ and links with / | |
f.write('<li><a href="%s">%s</a>\n' | |
% (urllib.quote(linkname), cgi.escape(displayname))) | |
f.write("</ul>\n<hr>\n</body>\n</html>\n") | |
length = f.tell() | |
f.seek(0) | |
self.send_response(200) | |
encoding = sys.getfilesystemencoding() | |
self.send_header("Content-type", "text/html; charset=%s" % encoding) | |
self.send_header("Content-Length", str(length)) | |
self.end_headers() | |
return f | |
def translate_path(self, path): | |
"""Translate a /-separated PATH to the local filename syntax. | |
Components that mean special things to the local file system | |
(e.g. drive or directory names) are ignored. (XXX They should | |
probably be diagnosed.) | |
""" | |
# abandon query parameters | |
path = path.split('?',1)[0] | |
path = path.split('#',1)[0] | |
# Don't forget explicit trailing slash when normalizing. Issue17324 | |
trailing_slash = path.rstrip().endswith('/') | |
path = posixpath.normpath(urllib.unquote(path)) | |
words = path.split('/') | |
words = filter(None, words) | |
path = os.getcwd() | |
for word in words: | |
if os.path.dirname(word) or word in (os.curdir, os.pardir): | |
# Ignore components that are not a simple file/directory name | |
continue | |
path = os.path.join(path, word) | |
if trailing_slash: | |
path += '/' | |
return path | |
def copyfile(self, source, outputfile): | |
"""Copy all data between two file objects. | |
The SOURCE argument is a file object open for reading | |
(or anything with a read() method) and the DESTINATION | |
argument is a file object open for writing (or | |
anything with a write() method). | |
The only reason for overriding this would be to change | |
the block size or perhaps to replace newlines by CRLF | |
-- note however that this the default server uses this | |
to copy binary data as well. | |
""" | |
shutil.copyfileobj(source, outputfile) | |
def guess_type(self, path): | |
"""Guess the type of a file. | |
Argument is a PATH (a filename). | |
Return value is a string of the form type/subtype, | |
usable for a MIME Content-type header. | |
The default implementation looks the file's extension | |
up in the table self.extensions_map, using application/octet-stream | |
as a default; however it would be permissible (if | |
slow) to look inside the data to make a better guess. | |
""" | |
base, ext = posixpath.splitext(path) | |
if ext in self.extensions_map: | |
return self.extensions_map[ext] | |
ext = ext.lower() | |
if ext in self.extensions_map: | |
return self.extensions_map[ext] | |
else: | |
return self.extensions_map[''] | |
if not mimetypes.inited: | |
mimetypes.init() # try to read system mime.types | |
extensions_map = mimetypes.types_map.copy() | |
extensions_map.update({ | |
'': 'application/octet-stream', # Default | |
'.py': 'text/plain', | |
'.c': 'text/plain', | |
'.h': 'text/plain', | |
}) | |
def test(HandlerClass = DMaaPHandler, | |
ServerClass = BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904): | |
print "Load event schema file: " + DcaeVariables.CommonEventSchemaV5 | |
with open(DcaeVariables.CommonEventSchemaV5) as file: | |
global EvtSchema | |
EvtSchema = json.load(file) | |
server_address = ('', port) | |
HandlerClass.protocol_version = protocol | |
httpd = ServerClass(server_address, HandlerClass) | |
global DMaaPHttpd | |
DMaaPHttpd = httpd | |
DcaeVariables.HTTPD = httpd | |
sa = httpd.socket.getsockname() | |
print "Serving HTTP on", sa[0], "port", sa[1], "..." | |
#httpd.serve_forever() | |
def _main_ (HandlerClass = DMaaPHandler, | |
ServerClass = BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"): | |
if sys.argv[1:]: | |
port = int(sys.argv[1]) | |
else: | |
port = 3904 | |
print "Load event schema file: " + DcaeVariables.CommonEventSchemaV5 | |
with open(DcaeVariables.CommonEventSchemaV5) as file: | |
global EvtSchema | |
EvtSchema = json.load(file) | |
server_address = ('', port) | |
HandlerClass.protocol_version = protocol | |
httpd = ServerClass(server_address, HandlerClass) | |
sa = httpd.socket.getsockname() | |
print "Serving HTTP on", sa[0], "port", sa[1], "..." | |
httpd.serve_forever() | |
if __name__ == '__main__': | |
_main_() |