blob: 18b60fe6f56aaab559e8016ea19d6e69f89c1a3c [file] [log] [blame]
# org.onap.dcae
# ============LICENSE_START====================================================
# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# =============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END======================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import base64
import json
import os
import requests
from threading import Thread
import uuid
from onap_dcae_cbs_docker_client.client import get_config
try:
from http.server import BaseHTTPRequestHandler, HTTPServer
except ImportError:
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
_httpport = int(os.environ['DCAEPORT']) if 'DCAEPORT' in os.environ else 80
_clientid = uuid.uuid4().hex
_groupid = uuid.uuid4().hex
class _handler(BaseHTTPRequestHandler):
def do_GET(self):
if '/healthcheck' == self.path:
if self.server.env._health():
self.send_response(202)
self.end_headers()
else:
self.send_error(503)
elif '/reconfigure' == self.path:
self.server.env._loadconfig()
self.server.env._reconf()
self.send_response(202)
self.end_headers()
else:
self.send_error(404)
def _genauth(sinfo):
"""
Return authentication parameters for stream, if present.
"""
user = sinfo['aaf_username'] if 'aaf_username' in sinfo else None
password = sinfo['aaf_password'] if 'aaf_password' in sinfo else None
if user and password:
return { 'auth': (user, password) }
else:
return {}
class DcaeEnv:
def __init__(self, healthCB = lambda:True, reconfigCB = lambda:None):
"""
Initialize environment, but don't start web server or invoke any callbacks.
"""
self._health = healthCB
self._reconf = reconfigCB
self._unread = {}
self._server = None
self._loadconfig()
def start(self):
"""
Start web server to receive health checks and reconfigure requests
"""
if self._server is not None:
return
self._server = HTTPServer(('', _httpport), _handler)
self._server.env = self
th = Thread(target=self._server.serve_forever, name='webserver')
th.daemon = True
th.start()
def stop(self):
"""
Stop web server
"""
if self._server is None:
return
self._server.shutdown()
self._server.env = None
self._server = None
def _loadconfig(self):
self._config = get_config()
def hasdata(self, stream):
"""
Return whether there is any unprocessed received data for the specified
data stream. That is, if an earlier getdata() call returned more than
one record, and the additional records have not yet been retrieved.
"""
return stream in self._unread
def getdata(self, stream, timeout_ms = 15000, limit = 10):
"""
Try to retrieve data from Message Router for the specified data stream.
If no data is retrieved, within the specified timeout, return None.
"""
sinfo = self._config['streams_subscribes'][stream]
if stream in self._unread:
x = self._unread[stream]
ret = x.pop()
if len(x) == 0:
del self._unread[stream]
return ret
gid = sinfo['client_id'] if 'client_id' in sinfo and sinfo['client_id'] else _groupid
resp = requests.get('{0}/{1}/{2}?timeout={3}&limit={4}'.format(sinfo['dmaap_info']['topic_url'], gid, _clientid, timeout_ms, limit), **_genauth(sinfo))
resp.raise_for_status()
x = resp.json()
if len(x) == 0:
return None
if len(x) == 1:
return x[0]
x.reverse()
ret = x.pop()
self._unread[stream] = x
return ret
def senddata(self, stream, partition, data):
"""
Publish data to the specified stream.
"""
sinfo = self._config['streams_publishes'][stream]
body = '{0}.{1}.{2}{3}'.format(len(partition), len(data), partition, data)
resp = requests.post('{0}'.format(sinfo['dmaap_info']['topic_url']), headers={'Content-Type': 'application/cambria'}, data=body, **_genauth(sinfo))
resp.raise_for_status()
def getconfig(self):
"""
Get the latest version of the configuration data.
"""
return self._config
def reconfigure():
"""
Make the web request to reconfigure (locally)
"""
requests.get('http://localhost:{0}/reconfigure'.format(_httpport))