blob: bd6caecce973bd81990882335b09b5ca3eee97cb [file] [log] [blame]
ac25505082fd72018-03-20 12:35:48 +01001#!/usr/bin/env python3
2import os
3import json
4import copy
5import random
6import requests
7import uuid
8import time
9from datetime import datetime
10
11def luck(n=2):
12 """ gives 1 chance out of n (default: 2) to return True """
13 assert n > 1
14 return bool(random.randint(0, n-1))
15def now_dmaap_timestamp():
16 return str(datetime.now().timestamp()).replace(".","")[:13]
17def now_notification_time():
18 return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")
19
20CONTROL_LOOP_NAMES = [
ac255074c616e2018-03-28 14:42:49 +020021 'CL-vCPE-d925ed73',
22 'CL-vCPE-37b1c91e',
23 'CL-vCPE-c2597657',
24 'CL-vCPE-a11318ba',
25 'CL-vCPE-5321c558',
ac25505082fd72018-03-20 12:35:48 +010026]
27
28TEMPLATES = {
29 'event_abated' :'event_abated.json',
30 'event_onset' :'event_onset.json',
31 'notification_active' :'notification_active.json',
32 'notification_final_failed' :'notification_final_failed.json',
33 'notification_final_open' :'notification_final_open.json',
34 'notification_final_success' :'notification_final_success.json',
35 'notification_operation_failure' :'notification_operation_failure.json',
36 'notification_operation' :'notification_operation.json',
37 'notification_operation_success' :'notification_operation_success.json',
38 'notification_rejected_disabled' :'notification_rejected_disabled.json',
39 'notification_rejected_missing' :'notification_rejected_missing.json',
40}
41
ac2550e7bf2692018-03-29 14:24:04 +020042ERROR_MESSAGES = [
43 ('APPC', 'APPC1 : timeout on restart','RESTART'),
44 ('APPC', 'APPC2 : cannot restart','RESTART'),
45 ('SO', 'SO1 : scale up failed', 'SCALEUP'),
46 ('SO', 'SO2 : scale down failed', 'SCALEDOWN'),
47]
48
ac25505082fd72018-03-20 12:35:48 +010049for key in TEMPLATES:
50 with open(TEMPLATES[key]) as f:
51 content = f.read()
52 TEMPLATES[key] = json.loads(content)
53
54
55class DMaaPMessage(dict):
56
57 dmaap_host_url = "http://dmaap.host.url:9200/"
58 dmaap_username = None
59 dmaap_password = None
60
61 @classmethod
62 def from_template(cls, tmpl, **kwargs):
63 obj = cls()
64 obj.update(copy.deepcopy(TEMPLATES[tmpl]))
65 for keys,value in kwargs.items():
66 current_node = obj
67 keys = keys.split(".")
68 key = keys[0]
69 for i in range(len(keys) - 1):
70 current_node = current_node[keys[i]]
71 key = keys[i]
72 current_node[key] = value
73 return obj
74
75 def publish(self, topic):
76 url = "%s/events/%s" % (self.dmaap_host_url, topic)
77 auth = None
78 if self.dmaap_username and self.dmaap_password:
79 auth = (self.dmaap_username, self.dmaap_password)
80 response = requests.post(url, data=json.dumps(self), auth=auth)
81 return response.status_code
82
83class Event(DMaaPMessage):
84
85 topic = "DCAE-CL-EVENT"
86
87 @staticmethod
88 def abated(**kwargs):
89 return Event.from_template('event_abated', **kwargs)
90
91 @staticmethod
92 def onset(**kwargs):
93 return Event.from_template('event_onset', **kwargs)
94
95 def publish(self):
96 return super().publish(self.topic)
97
98
99class Notification(DMaaPMessage):
100
101 topic = "POLICY-CL-MGT"
102
103 @classmethod
104 def from_template(cls, tmpl, **kwargs):
105 kwargs['notificationTime'] = now_notification_time()
106 return super().from_template(tmpl, **kwargs)
107
108 @staticmethod
109 def active(**kwargs):
110 return Notification.from_template('notification_active', **kwargs)
111
112 @staticmethod
113 def final(**kwargs):
114 class FinalNotification(Notification):
115 @staticmethod
116 def success(**kwargs):
117 return FinalNotification.from_template('notification_final_success', **kwargs)
118 @staticmethod
119 def failed(**kwargs):
ac2550e7bf2692018-03-29 14:24:04 +0200120 msg = FinalNotification.from_template('notification_final_failed', **kwargs)
121 error = ERROR_MESSAGES[random.randint(0, len(ERROR_MESSAGES) - 1)]
122 h = msg['history'][-1]
123 h['actor'],h['message'],h['operation'] = error[0],error[1],error[2]
124 return msg
ac25505082fd72018-03-20 12:35:48 +0100125 @staticmethod
126 def open(**kwargs):
127 return FinalNotification.from_template('notification_final_open', **kwargs)
128 return FinalNotification
129
130 @staticmethod
131 def operation(**kwargs):
132 class OperationNotification(Notification):
133 @staticmethod
134 def success(**kwargs):
135 return OperationNotification.from_template('notification_operation_success', **kwargs)
136 @staticmethod
137 def failure(**kwargs):
138 return OperationNotification.from_template('notification_operation_failure', **kwargs)
139 return OperationNotification.from_template('notification_operation', **kwargs)
140
141 @staticmethod
142 def rejected(**kwargs):
143 class RejectedNotification(Notification):
144 @staticmethod
145 def disabled(**kwargs):
146 return RejectedNotification.from_template('notification_rejected_disabled', **kwargs)
147 @staticmethod
148 def missing_fields(**kwargs):
149 return RejectedNotification.from_template('notification_rejected_missing', **kwargs)
150
151 return RejectedNotification
152
153 def publish(self):
154 return super().publish(self.topic)
155
156
157
158class CLStatus(object):
159
160 def __init__(self, dmaap_url=None,
161 missing=None, disabled=None, op_failure=None):
162 self._stopped = False
ac2550e7bf2692018-03-29 14:24:04 +0200163 def maybe(thing, ):
ac25505082fd72018-03-20 12:35:48 +0100164 if thing is None:
165 thing = not luck(10)
166 return thing
167 self._missing = maybe(missing)
168 self._disabled = maybe(disabled)
169 self._op_failure = maybe(op_failure)
170 self._config = dict(
171 requestID=str(uuid.uuid4()),
172 closedLoopControlName=CONTROL_LOOP_NAMES[random.randint(0, len(CONTROL_LOOP_NAMES) - 1)]
173 )
174
175 def __iter__(self):
176 return next(self)
177
178 def __next__(self):
179 if self._stopped:
180 raise StopIteration()
181 config = self._config
182 config.update(dict(closedLoopAlarmStart=now_dmaap_timestamp()))
183 yield Event.onset(**config)
184 if self._missing:
185 self._stopped = True
186 yield Notification.rejected().missing_fields(**config)
187 raise StopIteration()
188 elif self._disabled:
189 self._stopped = True
190 yield Notification.rejected().disabled(**config)
191 raise StopIteration()
192
193 yield Notification.active(**config)
194 yield Notification.operation(**config)
195
196 config['closedLoopAlarmEnd'] = now_dmaap_timestamp()
197 if self._op_failure:
198 yield Notification.operation().failure(**config)
199 self._stopped = True
200 yield Notification.final().failed(**config)
201 else:
202 yield Notification.operation().success(**config)
203 yield Event.abated(**config)
204 self._stopped = True
205 yield Notification.final().success(**config)
206 raise StopIteration()
207
ac255069a7eff2018-03-27 14:00:27 +0200208def print_usage():
209 print("""
210 ./ds_mocker.py <DMAAP_URL> <EVENT_TOPIC> [NOTIFICATION_TOPIC [REQUEST_TOPIC]]
211 """)
212 exit()
213
214def push(test_datas):
215 for current_i, status in enumerate(test_datas):
ac255074c616e2018-03-28 14:42:49 +0200216 time.sleep(random.randint(0,3))
ac255069a7eff2018-03-27 14:00:27 +0200217 for s in status:
218 # print(s)
219 status_code = s.publish()
220 if status_code != 200:
221 print("Error when publishing : status_code={}".format(status_code))
222 exit(1)
223 time.sleep(random.randint(0,3))
224 print("%03d,missing:%5s,disabled:%5s,op_failure:%5s - %s" % (current_i, status._missing, status._disabled, status._op_failure, status._config))
225
226
ac2550e7bf2692018-03-29 14:24:04 +0200227
ac255069a7eff2018-03-27 14:00:27 +0200228def generate_dataset_1():
ac2550e7bf2692018-03-29 14:24:04 +0200229 test_datas = [CLStatus(missing=False, disabled=False, op_failure=False) for i in range(300)] \
ac25505082fd72018-03-20 12:35:48 +0100230 + [CLStatus(missing=True, disabled=False, op_failure=False) for i in range(5)] \
231 + [CLStatus(missing=False, disabled=True, op_failure=False) for i in range(6)] \
ac2550e7bf2692018-03-29 14:24:04 +0200232 + [CLStatus(missing=False, disabled=False, op_failure=True) for i in range(12)]
ac255069a7eff2018-03-27 14:00:27 +0200233 random.shuffle(test_datas)
234 return test_datas
ac25505082fd72018-03-20 12:35:48 +0100235
ac2550e7bf2692018-03-29 14:24:04 +0200236def generate_error_dataset_1():
237 test_datas = [CLStatus(missing=False, disabled=False, op_failure=True) for i in range(60)]
238 random.shuffle(test_datas)
239 return test_datas
240
241
242DATASETS = {
243 'dataset_1': generate_dataset_1,
244 'op_failure_1': generate_error_dataset_1,
245}
ac255069a7eff2018-03-27 14:00:27 +0200246
247if __name__ == "__main__":
248 import sys
249 if len(sys.argv) < 3:
250 print_usage()
251
252 DMaaPMessage.dmaap_host_url = sys.argv[1]
253 Event.topic = sys.argv[2]
254 Notification.topic = len(sys.argv) > 3 and sys.argv[3] or sys.argv[2]
255 # Request.topic = len(sys.argv) > 4 or Notification.topic
ac2550e7bf2692018-03-29 14:24:04 +0200256 #push(DATASETS['op_failure_1']())
257 push(DATASETS['dataset_1']())