blob: 4c245614065d57d1437ad3ef45099a58deb9e52d [file] [log] [blame]
Gary Wu9abb61c2018-09-27 10:38:50 -07001'''
2Created on Aug 15, 2017
3
4@author: sw6830
5'''
6import os
7import posixpath
8import BaseHTTPServer
9import urllib
10import urlparse
11import cgi
12import sys
13import shutil
14import mimetypes
15from jsonschema import validate
16import jsonschema
17import json
18import DcaeVariables
19import SimpleHTTPServer
Gary Wu9abb61c2018-09-27 10:38:50 -070020
21try:
22 from cStringIO import StringIO
23except ImportError:
24 from StringIO import StringIO
25
26EvtSchema = None
27DMaaPHttpd = None
28
29
30def clean_up_event():
31 sz = DcaeVariables.VESEventQ.qsize()
32 for i in range(sz):
33 try:
34 self.evtQueue.get_nowait()
35 except:
36 pass
37
38
39def enque_event(evt):
40 if DcaeVariables.VESEventQ is not None:
41 try:
42 DcaeVariables.VESEventQ.put(evt)
Gary Wu9abb61c2018-09-27 10:38:50 -070043 return True
44 except Exception as e:
45 print (str(e))
46 return False
47 return False
48
49
50def deque_event(wait_sec=25):
51 if DcaeVariables.IsRobotRun:
Aleksandra Maciagafd6c8fa2019-11-06 15:14:11 +010052 pass
Gary Wu9abb61c2018-09-27 10:38:50 -070053 try:
54 evt = DcaeVariables.VESEventQ.get(True, wait_sec)
Gary Wu9abb61c2018-09-27 10:38:50 -070055 return evt
56 except Exception as e:
57 if DcaeVariables.IsRobotRun:
Aleksandra Maciagafd6c8fa2019-11-06 15:14:11 +010058 pass
59
Gary Wu9abb61c2018-09-27 10:38:50 -070060 else:
61 print("DMaaP Event dequeue timeout")
62 return None
63
64
65class DMaaPHandler(BaseHTTPServer.BaseHTTPRequestHandler):
66
67 def do_PUT(self):
68 self.send_response(405)
69 return
70
71 def do_POST(self):
Gary Wu9abb61c2018-09-27 10:38:50 -070072 resp_code = 0
73 # Parse the form data posted
74 '''
75 form = cgi.FieldStorage(
76 fp=self.rfile,
77 headers=self.headers,
78 environ={'REQUEST_METHOD':'POST',
79 'CONTENT_TYPE':self.headers['Content-Type'],
80 })
81
82
83 form = cgi.FieldStorage(
84 fp=self.rfile,
85 headers=self.headers,
86 environ={"REQUEST_METHOD": "POST"})
87
88 for item in form.list:
89 print "%s=%s" % (item.name, item.value)
90
91 '''
92
93 if 'POST' not in self.requestline:
94 resp_code = 405
95
96 '''
97 if resp_code == 0:
98 if '/eventlistener/v5' not in self.requestline and '/eventlistener/v5/eventBatch' not in self.requestline and \
99 '/eventlistener/v5/clientThrottlingState' not in self.requestline:
100 resp_code = 404
101
102
103 if resp_code == 0:
104 if 'Y29uc29sZTpaakprWWpsbE1qbGpNVEkyTTJJeg==' not in str(self.headers):
105 resp_code = 401
106 '''
107
108 if resp_code == 0:
Bartosz Gardziejewskia926d942020-07-29 11:13:36 +0200109 topic = self.extract_topic_from_path()
Gary Wu9abb61c2018-09-27 10:38:50 -0700110 content_len = int(self.headers.getheader('content-length', 0))
111 post_body = self.rfile.read(content_len)
112
Gary Wu9abb61c2018-09-27 10:38:50 -0700113 indx = post_body.index("{")
114 if indx != 0:
115 post_body = post_body[indx:]
Bartosz Gardziejewskia926d942020-07-29 11:13:36 +0200116
117 event = "\""+topic+"\":" + post_body
118 if not enque_event(event):
Gary Wu9abb61c2018-09-27 10:38:50 -0700119 print "enque event fails"
120
121 global EvtSchema
122 try:
123 if EvtSchema is None:
Aleksandra Maciaga72ae9c32020-04-01 13:14:14 +0200124 with open(DcaeVariables.CommonEventSchema) as opened_file:
Gary Wu9abb61c2018-09-27 10:38:50 -0700125 EvtSchema = json.load(opened_file)
126 decoded_body = json.loads(post_body)
127 jsonschema.validate(decoded_body, EvtSchema)
128 except:
129 resp_code = 400
130
131 # Begin the response
132 if not DcaeVariables.IsRobotRun:
133 print ("Response Message:")
134
135 '''
136 {
137 "200" : {
138 "description" : "Success",
139 "schema" : {
140 "$ref" : "#/definitions/DR_Pub"
141 }
142 }
143
144 rspStr = "{'responses' : {'200' : {'description' : 'Success'}}}"
145 rspStr1 = "{'count': 1, 'serverTimeMs': 3}"
146
147 '''
148
149 if resp_code == 0:
150 if 'clientThrottlingState' in self.requestline:
151 self.send_response(204)
152 else:
153 self.send_response(200)
154 self.send_header('Content-Type', 'application/json')
155 self.end_headers()
Gary Wu9abb61c2018-09-27 10:38:50 -0700156 self.wfile.write("{'count': 1, 'serverTimeMs': 3}")
157 self.wfile.close()
158 else:
159 self.send_response(resp_code)
160
161 '''
162 self.end_headers()
163 self.wfile.write('Client: %s\n' % str(self.client_address))
164 self.wfile.write('User-agent: %s\n' % str(self.headers['user-agent']))
165 self.wfile.write('Path: %s\n' % self.path)
166 self.wfile.write('Form data:\n')
167 self.wfile.close()
168
169 # Echo back information about what was posted in the form
170 for field in form.keys():
171 field_item = form[field]
172 if field_item.filename:
173 # The field contains an uploaded file
174 file_data = field_item.file.read()
175 file_len = len(file_data)
176 del file_data
177 self.wfile.write('\tUploaded %s as "%s" (%d bytes)\n' % \
178 (field, field_item.filename, file_len))
179 else:
180 # Regular form value
181 self.wfile.write('\t%s=%s\n' % (field, form[field].value))
182 '''
183 return
184
Bartosz Gardziejewskia926d942020-07-29 11:13:36 +0200185 def extract_topic_from_path(self):
186 return self.path["/events/".__len__():]
187
Gary Wu9abb61c2018-09-27 10:38:50 -0700188 def do_GET(self):
189 """Serve a GET request."""
190 f = self.send_head()
191 if f:
192 try:
193 self.copyfile(f, self.wfile)
194 finally:
195 f.close()
196
197 def do_HEAD(self):
198 """Serve a HEAD request."""
199 f = self.send_head()
200 if f:
201 f.close()
202
203 def send_head(self):
204 """Common code for GET and HEAD commands.
205
206 This sends the response code and MIME headers.
207
208 Return value is either a file object (which has to be copied
209 to the outputfile by the caller unless the command was HEAD,
210 and must be closed by the caller under all circumstances), or
211 None, in which case the caller has nothing further to do.
212
213 """
214 path = self.translate_path(self.path)
215 if os.path.isdir(path):
216 parts = urlparse.urlsplit(self.path)
217 if not parts.path.endswith('/'):
218 # redirect browser - doing basically what apache does
219 self.send_response(301)
220 new_parts = (parts[0], parts[1], parts[2] + '/',
221 parts[3], parts[4])
222 new_url = urlparse.urlunsplit(new_parts)
223 self.send_header("Location", new_url)
224 self.end_headers()
225 return None
226 for index in "index.html", "index.htm":
227 index = os.path.join(path, index)
228 if os.path.exists(index):
229 path = index
230 break
231 else:
232 return self.list_directory(path)
233 ctype = self.guess_type(path)
234 try:
235 # Always read in binary mode. Opening files in text mode may cause
236 # newline translations, making the actual size of the content
237 # transmitted *less* than the content-length!
238 f = open(path, 'rb')
239 except IOError:
240 self.send_error(404, "File not found")
241 return None
242 try:
243 self.send_response(200)
244 self.send_header("Content-type", ctype)
245 fs = os.fstat(f.fileno())
246 self.send_header("Content-Length", str(fs[6]))
247 self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
248 self.end_headers()
249 return f
250 except:
251 f.close()
252 raise
253
254 def list_directory(self, path):
255 """Helper to produce a directory listing (absent index.html).
256
257 Return value is either a file object, or None (indicating an
258 error). In either case, the headers are sent, making the
259 interface the same as for send_head().
260
261 """
262 try:
263 list_dir = os.listdir(path)
264 except os.error:
265 self.send_error(404, "No permission to list directory")
266 return None
267 list_dir.sort(key=lambda a: a.lower())
268 f = StringIO()
269 displaypath = cgi.escape(urllib.unquote(self.path))
270 f.write('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
271 f.write("<html>\n<title>Directory listing for %s</title>\n" % displaypath)
272 f.write("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath)
273 f.write("<hr>\n<ul>\n")
274 for name in list_dir:
275 fullname = os.path.join(path, name)
276 displayname = linkname = name
277 # Append / for directories or @ for symbolic links
278 if os.path.isdir(fullname):
279 displayname = name + "/"
280 linkname = name + "/"
281 if os.path.islink(fullname):
282 displayname = name + "@"
283 # Note: a link to a directory displays with @ and links with /
284 f.write('<li><a href="%s">%s</a>\n'
285 % (urllib.quote(linkname), cgi.escape(displayname)))
286 f.write("</ul>\n<hr>\n</body>\n</html>\n")
287 length = f.tell()
288 f.seek(0)
289 self.send_response(200)
290 encoding = sys.getfilesystemencoding()
291 self.send_header("Content-type", "text/html; charset=%s" % encoding)
292 self.send_header("Content-Length", str(length))
293 self.end_headers()
294 return f
295
296 @staticmethod
297 def translate_path(path):
298 """Translate a /-separated PATH to the local filename syntax.
299
300 Components that mean special things to the local file system
301 (e.g. drive or directory names) are ignored. (XXX They should
302 probably be diagnosed.)
303
304 """
305 # abandon query parameters
306 path = path.split('?', 1)[0]
307 path = path.split('#', 1)[0]
308 # Don't forget explicit trailing slash when normalizing. Issue17324
309 trailing_slash = path.rstrip().endswith('/')
310 path = posixpath.normpath(urllib.unquote(path))
311 words = path.split('/')
312 words = filter(None, words)
313 path = os.getcwd()
314 for word in words:
315 if os.path.dirname(word) or word in (os.curdir, os.pardir):
316 # Ignore components that are not a simple file/directory name
317 continue
318 path = os.path.join(path, word)
319 if trailing_slash:
320 path += '/'
321 return path
322
323 @staticmethod
324 def copyfile(source, outputfile):
325 """Copy all data between two file objects.
326
327 The SOURCE argument is a file object open for reading
328 (or anything with a read() method) and the DESTINATION
329 argument is a file object open for writing (or
330 anything with a write() method).
331
332 The only reason for overriding this would be to change
333 the block size or perhaps to replace newlines by CRLF
334 -- note however that this the default server uses this
335 to copy binary data as well.
336
337 """
338 shutil.copyfileobj(source, outputfile)
339
340 def guess_type(self, path):
341 """Guess the type of a file.
342
343 Argument is a PATH (a filename).
344
345 Return value is a string of the form type/subtype,
346 usable for a MIME Content-type header.
347
348 The default implementation looks the file's extension
349 up in the table self.extensions_map, using application/octet-stream
350 as a default; however it would be permissible (if
351 slow) to look inside the data to make a better guess.
352
353 """
354
355 base, ext = posixpath.splitext(path)
356 if ext in self.extensions_map:
357 return self.extensions_map[ext]
358 ext = ext.lower()
359 if ext in self.extensions_map:
360 return self.extensions_map[ext]
361 else:
362 return self.extensions_map['']
363
364 if not mimetypes.inited:
365 mimetypes.init() # try to read system mime.types
366 extensions_map = mimetypes.types_map.copy()
367 extensions_map.update({
368 '': 'application/octet-stream', # Default
369 '.py': 'text/plain',
370 '.c': 'text/plain',
371 '.h': 'text/plain',
372 })
373
374
375def test(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0", port=3904):
Aleksandra Maciaga72ae9c32020-04-01 13:14:14 +0200376 print "Load event schema file: " + DcaeVariables.CommonEventSchema
377 with open(DcaeVariables.CommonEventSchema) as opened_file:
Gary Wu9abb61c2018-09-27 10:38:50 -0700378 global EvtSchema
379 EvtSchema = json.load(opened_file)
380
381 server_address = ('', port)
382
383 handler_class.protocol_version = protocol
384 httpd = server_class(server_address, handler_class)
385
386 global DMaaPHttpd
387 DMaaPHttpd = httpd
388 DcaeVariables.HTTPD = httpd
389
390 sa = httpd.socket.getsockname()
391 print "Serving HTTP on", sa[0], "port", sa[1], "..."
392 # httpd.serve_forever()
393
394
395def _main_(handler_class=DMaaPHandler, server_class=BaseHTTPServer.HTTPServer, protocol="HTTP/1.0"):
396
397 if sys.argv[1:]:
398 port = int(sys.argv[1])
399 else:
400 port = 3904
401
Aleksandra Maciaga72ae9c32020-04-01 13:14:14 +0200402 print "Load event schema file: " + DcaeVariables.CommonEventSchema
403 with open(DcaeVariables.CommonEventSchema) as opened_file:
Gary Wu9abb61c2018-09-27 10:38:50 -0700404 global EvtSchema
405 EvtSchema = json.load(opened_file)
406
407 server_address = ('', port)
408
409 handler_class.protocol_version = protocol
410 httpd = server_class(server_address, handler_class)
411
412 sa = httpd.socket.getsockname()
413 print "Serving HTTP on", sa[0], "port", sa[1], "..."
414 httpd.serve_forever()
415
416
417if __name__ == '__main__':
418 _main_()