blob: 8609a2e39ba0ccc9c191c1dd3a934b4abaf54604 [file] [log] [blame]
Deepanshu Karnwal77f7c382022-12-10 16:23:18 +05301# ==================================================================================
2# Copyright (c) 2020 HCL Technologies Limited.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# ==================================================================================
16import time
17import pandas as pd
18from influxdb import DataFrameClient
19from configparser import ConfigParser
20from mdclogpy import Logger
21from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
22from requests.exceptions import RequestException, ConnectionError
23
24logger = Logger(name=__name__)
25
26
27class DATABASE(object):
28 r""" DATABASE takes an input as database name. It creates a client connection
29 to influxDB and It reads/ writes UE data for a given dabtabase and a measurement.
30
31
32 Parameters
33 ----------
34 host: str (default='r4-influxdb.ricplt.svc.cluster.local')
35 hostname to connect to InfluxDB
36 port: int (default='8086')
37 port to connect to InfluxDB
38 username: str (default='root')
39 user to connect
40 password: str (default='root')
41 password of the use
42
43 Attributes
44 ----------
45 client: influxDB client
46 DataFrameClient api to connect influxDB
47 data: DataFrame
48 fetched data from database
49 """
50
51 def __init__(self, dbname='Timeseries', user='root', password='root', host="r4-influxdb.ricplt", port='8086', path='', ssl=False):
52 self.data = None
53 self.host = host
54 self.port = port
55 self.user = user
56 self.password = password
57 self.path = path
58 self.ssl = ssl
59 self.dbname = dbname
60 self.client = None
61 self.config()
62
63 def connect(self):
64 if self.client is not None:
65 self.client.close()
66
67 try:
68 self.client = DataFrameClient(self.host, port=self.port, username=self.user, password=self.password, path=self.path, ssl=self.ssl, database=self.dbname, verify_ssl=self.ssl)
69 version = self.client.request('ping', expected_response_code=204).headers['X-Influxdb-Version']
70 logger.info("Conected to Influx Database, InfluxDB version : {}".format(version))
71 return True
72
73 except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError):
74 logger.error("Failed to establish a new connection with InflulxDB, Please check your url/hostname")
75 time.sleep(120)
76
77 def read_data(self, train=False, valid=False, limit=False):
78 """Read data method for a given measurement and limit
79
80 Parameters
81 ----------
82 meas: str (default='ueMeasReport')
83 limit:int (defualt=False)
84 """
85 self.data = None
86 query = 'select * from ' + self.meas
87 if not train and not valid and not limit:
88 query += ' where time>now()-1600ms'
89 elif train:
90 query += ' where time<now()-5m and time>now()-75m'
91 elif valid:
92 query += ' where time>now()-5m'
93 elif limit:
94 query += ' where time>now()-1m limit '+str(limit)
95 result = self.query(query)
96 if result and len(result[self.meas]) != 0:
97 self.data = result[self.meas]
98
99 def write_anomaly(self, df, meas='AD'):
100 """Write data method for a given measurement
101
102 Parameters
103 ----------
104 meas: str (default='AD')
105 """
106 try:
107 self.client.write_points(df, meas)
108 except (RequestException, InfluxDBClientError, InfluxDBServerError) as e:
109 logger.error('Failed to send metrics to influxdb')
110 print(e)
111
112 def query(self, query):
113 try:
114 result = self.client.query(query)
115 except (RequestException, InfluxDBClientError, InfluxDBServerError, ConnectionError) as e:
116 logger.error('Failed to connect to influxdb: {}'.format(e))
117 result = False
118 return result
119
120 def config(self):
121 cfg = ConfigParser()
122 cfg.read('src/ad_config.ini')
123 for section in cfg.sections():
124 if section == 'influxdb':
125 self.host = cfg.get(section, "host")
126 self.port = cfg.get(section, "port")
127 self.user = cfg.get(section, "user")
128 self.password = cfg.get(section, "password")
129 self.path = cfg.get(section, "path")
130 self.ssl = cfg.get(section, "ssl")
131 self.dbname = cfg.get(section, "database")
132 self.meas = cfg.get(section, "measurement")
133
134 if section == 'features':
135 self.thpt = cfg.get(section, "thpt")
136 self.rsrp = cfg.get(section, "rsrp")
137 self.rsrq = cfg.get(section, "rsrq")
138 self.rssinr = cfg.get(section, "rssinr")
139 self.prb = cfg.get(section, "prb_usage")
140 self.ue = cfg.get(section, "ue")
141 self.anomaly = cfg.get(section, "anomaly")
142
143
144class DUMMY(DATABASE):
145
146 def __init__(self):
147 super().__init__()
148 self.ue_data = pd.read_csv('src/ue.csv')
149
150 def connect(self):
151 return True
152
153 def read_data(self, train=False, valid=False, limit=100000):
154 if not train:
155 self.data = self.ue_data.head(limit)
156 else:
157 self.data = self.ue_data.head(limit).drop(self.anomaly, axis=1)
158
159 def write_anomaly(self, df, meas_name='AD'):
160 pass
161
162 def query(self, query=None):
163 return {'UEReports': self.ue_data.head(1)}