blob: 8609a2e39ba0ccc9c191c1dd3a934b4abaf54604 [file] [log] [blame]
# ==================================================================================
# Copyright (c) 2020 HCL Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
import time
import pandas as pd
from influxdb import DataFrameClient
from configparser import ConfigParser
from mdclogpy import Logger
from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
from requests.exceptions import RequestException, ConnectionError
logger = Logger(name=__name__)
class DATABASE(object):
r""" DATABASE takes an input as database name. It creates a client connection
to influxDB and It reads/ writes UE data for a given dabtabase and a measurement.
Parameters
----------
host: str (default='r4-influxdb.ricplt.svc.cluster.local')
hostname to connect to InfluxDB
port: int (default='8086')
port to connect to InfluxDB
username: str (default='root')
user to connect
password: str (default='root')
password of the use
Attributes
----------
client: influxDB client
DataFrameClient api to connect influxDB
data: DataFrame
fetched data from database
"""
def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False):
self.data = None
self.host = host
self.port = port
self.user = user
self.password = password
self.path = path
self.ssl = ssl
self.dbname = dbname
self.client = None
self.config()
def connect(self):
if self.client is not None:
self.client.close()
try:
self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl)
version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
logger.info("Conected to Influx Database, InfluxDB version : {}".format(version))
return True
except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError):
logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname")
time.sleep(120)
def read_data(self, train=False, valid=False, limit=False):
"""Read data method for a given measurement and limit
Parameters
----------
meas: str (default='ueMeasReport')
limit:int (defualt=False)
"""
self.data = None
query = 'select * from ' + self.meas
if not train and not valid and not limit:
query += ' where time>now()-1600ms'
elif train:
query += ' where time<now()-5m and time>now()-75m'
elif valid:
query += ' where time>now()-5m'
elif limit:
query += ' where time>now()-1m limit '+str(limit)
result = self.query(query)
if result and len(result[self.meas]) != 0:
self.data = result[self.meas]
def write_anomaly(self, df, meas='AD'):
"""Write data method for a given measurement
Parameters
----------
meas: str (default='AD')
"""
try:
self.client.write_points(df, meas)
except (RequestException, InfluxDBClientError, InfluxDBServerError) as e:
logger.error('Failed to send metrics to influxdb')
print(e)
def query(self, query):
try:
result = self.client.query(query)
except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError) as e:
logger.error('Failed to connect to influxdb: {}'.format(e))
result = False
return result
def config(self):
cfg = ConfigParser()
cfg.read('src/ad_config.ini')
for section in cfg.sections():
if section == 'influxdb':
self.host = cfg.get(section, "host")
self.port = cfg.get(section, "port")
self.user = cfg.get(section, "user")
self.password = cfg.get(section, "password")
self.path = cfg.get(section, "path")
self.ssl = cfg.get(section, "ssl")
self.dbname = cfg.get(section, "database")
self.meas = cfg.get(section, "measurement")
if section == 'features':
self.thpt = cfg.get(section, "thpt")
self.rsrp = cfg.get(section, "rsrp")
self.rsrq = cfg.get(section, "rsrq")
self.rssinr = cfg.get(section, "rssinr")
self.prb = cfg.get(section, "prb_usage")
self.ue = cfg.get(section, "ue")
self.anomaly = cfg.get(section, "anomaly")
class DUMMY(DATABASE):
def __init__(self):
super().__init__()
self.ue_data = pd.read_csv('src/ue.csv')
def connect(self):
return True
def read_data(self, train=False, valid=False, limit=100000):
if not train:
self.data = self.ue_data.head(limit)
else:
self.data = self.ue_data.head(limit).drop(self.anomaly, axis=1)
def write_anomaly(self, df, meas_name='AD'):
pass
def query(self, query=None):
return {'UEReports': self.ue_data.head(1)}