blob: 05c1b9861b66ad0e03334867877ac5891f10a3c1 [file] [log] [blame]
ac25505082fd72018-03-20 12:35:48 +01001#!/usr/bin/env python3
2import os
3import json
4import copy
5import random
6import requests
7import uuid
8import time
9from datetime import datetime
10
11def luck(n=2):
12 """ gives 1 chance out of n (default: 2) to return True """
13 assert n > 1
14 return bool(random.randint(0, n-1))
15def now_dmaap_timestamp():
16 return str(datetime.now().timestamp()).replace(".","")[:13]
17def now_notification_time():
18 return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")
19
20CONTROL_LOOP_NAMES = [
21 'ClosedLoop-vUSP-SIG-d925ed73-8231-4d02-9545-db4e101f88f8',
22 'ClosedLoop-vUSP-SIG-37b1c91e-fd6b-4abd-af15-771902d5fdb1',
23 'ClosedLoop-vUSP-SIG-c2597657-7113-4efb-a1f9-397a7a24c3e1',
24 'ClosedLoop-vUSP-SIG-a11318ba-4c61-46b8-a9da-bd40bec11d45',
25 'ClosedLoop-vUSP-SIG-5321c558-2254-4efb-ac24-99dd54edc94f',
26]
27
28TEMPLATES = {
29 'event_abated' :'event_abated.json',
30 'event_onset' :'event_onset.json',
31 'notification_active' :'notification_active.json',
32 'notification_final_failed' :'notification_final_failed.json',
33 'notification_final_open' :'notification_final_open.json',
34 'notification_final_success' :'notification_final_success.json',
35 'notification_operation_failure' :'notification_operation_failure.json',
36 'notification_operation' :'notification_operation.json',
37 'notification_operation_success' :'notification_operation_success.json',
38 'notification_rejected_disabled' :'notification_rejected_disabled.json',
39 'notification_rejected_missing' :'notification_rejected_missing.json',
40}
41
42for key in TEMPLATES:
43 with open(TEMPLATES[key]) as f:
44 content = f.read()
45 TEMPLATES[key] = json.loads(content)
46
47
48class DMaaPMessage(dict):
49
50 dmaap_host_url = "http://dmaap.host.url:9200/"
51 dmaap_username = None
52 dmaap_password = None
53
54 @classmethod
55 def from_template(cls, tmpl, **kwargs):
56 obj = cls()
57 obj.update(copy.deepcopy(TEMPLATES[tmpl]))
58 for keys,value in kwargs.items():
59 current_node = obj
60 keys = keys.split(".")
61 key = keys[0]
62 for i in range(len(keys) - 1):
63 current_node = current_node[keys[i]]
64 key = keys[i]
65 current_node[key] = value
66 return obj
67
68 def publish(self, topic):
69 url = "%s/events/%s" % (self.dmaap_host_url, topic)
70 auth = None
71 if self.dmaap_username and self.dmaap_password:
72 auth = (self.dmaap_username, self.dmaap_password)
73 response = requests.post(url, data=json.dumps(self), auth=auth)
74 return response.status_code
75
76class Event(DMaaPMessage):
77
78 topic = "DCAE-CL-EVENT"
79
80 @staticmethod
81 def abated(**kwargs):
82 return Event.from_template('event_abated', **kwargs)
83
84 @staticmethod
85 def onset(**kwargs):
86 return Event.from_template('event_onset', **kwargs)
87
88 def publish(self):
89 return super().publish(self.topic)
90
91
92class Notification(DMaaPMessage):
93
94 topic = "POLICY-CL-MGT"
95
96 @classmethod
97 def from_template(cls, tmpl, **kwargs):
98 kwargs['notificationTime'] = now_notification_time()
99 return super().from_template(tmpl, **kwargs)
100
101 @staticmethod
102 def active(**kwargs):
103 return Notification.from_template('notification_active', **kwargs)
104
105 @staticmethod
106 def final(**kwargs):
107 class FinalNotification(Notification):
108 @staticmethod
109 def success(**kwargs):
110 return FinalNotification.from_template('notification_final_success', **kwargs)
111 @staticmethod
112 def failed(**kwargs):
113 return FinalNotification.from_template('notification_final_failed', **kwargs)
114 @staticmethod
115 def open(**kwargs):
116 return FinalNotification.from_template('notification_final_open', **kwargs)
117 return FinalNotification
118
119 @staticmethod
120 def operation(**kwargs):
121 class OperationNotification(Notification):
122 @staticmethod
123 def success(**kwargs):
124 return OperationNotification.from_template('notification_operation_success', **kwargs)
125 @staticmethod
126 def failure(**kwargs):
127 return OperationNotification.from_template('notification_operation_failure', **kwargs)
128 return OperationNotification.from_template('notification_operation', **kwargs)
129
130 @staticmethod
131 def rejected(**kwargs):
132 class RejectedNotification(Notification):
133 @staticmethod
134 def disabled(**kwargs):
135 return RejectedNotification.from_template('notification_rejected_disabled', **kwargs)
136 @staticmethod
137 def missing_fields(**kwargs):
138 return RejectedNotification.from_template('notification_rejected_missing', **kwargs)
139
140 return RejectedNotification
141
142 def publish(self):
143 return super().publish(self.topic)
144
145
146
147class CLStatus(object):
148
149 def __init__(self, dmaap_url=None,
150 missing=None, disabled=None, op_failure=None):
151 self._stopped = False
152 def maybe(thing):
153 if thing is None:
154 thing = not luck(10)
155 return thing
156 self._missing = maybe(missing)
157 self._disabled = maybe(disabled)
158 self._op_failure = maybe(op_failure)
159 self._config = dict(
160 requestID=str(uuid.uuid4()),
161 closedLoopControlName=CONTROL_LOOP_NAMES[random.randint(0, len(CONTROL_LOOP_NAMES) - 1)]
162 )
163
164 def __iter__(self):
165 return next(self)
166
167 def __next__(self):
168 if self._stopped:
169 raise StopIteration()
170 config = self._config
171 config.update(dict(closedLoopAlarmStart=now_dmaap_timestamp()))
172 yield Event.onset(**config)
173 if self._missing:
174 self._stopped = True
175 yield Notification.rejected().missing_fields(**config)
176 raise StopIteration()
177 elif self._disabled:
178 self._stopped = True
179 yield Notification.rejected().disabled(**config)
180 raise StopIteration()
181
182 yield Notification.active(**config)
183 yield Notification.operation(**config)
184
185 config['closedLoopAlarmEnd'] = now_dmaap_timestamp()
186 if self._op_failure:
187 yield Notification.operation().failure(**config)
188 self._stopped = True
189 yield Notification.final().failed(**config)
190 else:
191 yield Notification.operation().success(**config)
192 yield Event.abated(**config)
193 self._stopped = True
194 yield Notification.final().success(**config)
195 raise StopIteration()
196
ac25505082fd72018-03-20 12:35:48 +0100197test_datas = [CLStatus(missing=False, disabled=False, op_failure=False) for i in range(45)] \
198 + [CLStatus(missing=True, disabled=False, op_failure=False) for i in range(5)] \
199 + [CLStatus(missing=False, disabled=True, op_failure=False) for i in range(6)] \
200 + [CLStatus(missing=False, disabled=False, op_failure=True) for i in range(7)]
201random.shuffle(test_datas)
202random.shuffle(test_datas)
203
204for current_i, status in enumerate(test_datas):
205 time.sleep(random.randint(0,6))
206 for s in status:
ac25505082fd72018-03-20 12:35:48 +0100207 status_code = s.publish()
208 if status_code != 200:
209 print("Error when publishing : status_code={}".format(status_code))
210 exit(1)
211 time.sleep(random.randint(0,3))
212 print("%03d,missing:%5s,disabled:%5s,op_failure:%5s - %s" % (current_i, status._missing, status._disabled, status._op_failure, status._config))