blob: c835408fa303f19c83d1c5b1fcaa6c8fe47212f6 [file] [log] [blame]
Mohamed Abukar2e78e422019-06-02 11:45:52 +03001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17==================================================================================
18*/
19
20package xapp
21
22/*
23#include <time.h>
24#include <stdlib.h>
25#include <stdio.h>
26#include <string.h>
27#include <rmr/rmr.h>
28#include <rmr/RIC_message_types.h>
29
30void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
32}
33
34#cgo CFLAGS: -I../
35#cgo LDFLAGS: -lrmr_nng -lnng
36*/
37import "C"
38
39import (
40 "github.com/spf13/viper"
41 "strconv"
42 "sync"
43 "time"
44 "unsafe"
45)
46
47var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
52}
53
54// To be removed ...
55type RMRStatistics struct{}
56
57type RMRClient struct {
Mohamed Abukar192518d2019-06-11 18:06:50 +030058 context unsafe.Pointer
59 ready int
60 wg sync.WaitGroup
61 mux sync.Mutex
62 stat map[string]Counter
63 consumers []MessageConsumer
64 readyCb ReadyCB
65 readyCbParams interface{}
Mohamed Abukar2e78e422019-06-02 11:45:52 +030066}
67
68type MessageConsumer interface {
69 Consume(mtype int, sid int, len int, payload []byte) error
70}
71
72func NewRMRClient() *RMRClient {
Mohamed Abukar2e78e422019-06-02 11:45:52 +030073 p := C.CString(viper.GetString("rmr.protPort"))
74 m := C.int(viper.GetInt("rmr.maxSize"))
75 defer C.free(unsafe.Pointer(p))
76
Mohamed Abukar349a0982019-06-08 18:15:42 +030077 ctx := C.rmr_init(p, m, C.int(0))
78 if ctx == nil {
Mohamed Abukar775722c2019-06-10 16:41:57 +030079 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
Mohamed Abukar2e78e422019-06-02 11:45:52 +030080 }
81
Mohamed Abukar349a0982019-06-08 18:15:42 +030082 return &RMRClient{
Mohamed Abukar775722c2019-06-10 16:41:57 +030083 context: ctx,
Mohamed Abukar349a0982019-06-08 18:15:42 +030084 consumers: make([]MessageConsumer, 0),
Mohamed Abukar775722c2019-06-10 16:41:57 +030085 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
Mohamed Abukar349a0982019-06-08 18:15:42 +030086 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +030087}
88
89func (m *RMRClient) Start(c MessageConsumer) {
Mohamed Abukar2e78e422019-06-02 11:45:52 +030090 for {
91 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
92
93 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
94 break
95 }
96 time.Sleep(10 * time.Second)
97 }
98 m.wg.Add(viper.GetInt("rmr.numWorkers"))
99
100 if c != nil {
101 m.consumers = append(m.consumers, c)
102 }
103
104 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
105 go m.Worker("worker-"+strconv.Itoa(w), 0)
106 }
107
Mohamed Abukar349a0982019-06-08 18:15:42 +0300108 if m.readyCb != nil {
Mohamed Abukar192518d2019-06-11 18:06:50 +0300109 m.readyCb(m.readyCbParams)
Mohamed Abukar349a0982019-06-08 18:15:42 +0300110 }
111
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300112 m.Wait()
113}
114
115func (m *RMRClient) Worker(taskName string, msgSize int) {
116 p := viper.GetString("rmr.protPort")
117 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
118
119 defer m.wg.Done()
120 for {
121 rxBuffer := C.rmr_rcv_msg(m.context, nil)
122 if rxBuffer == nil {
123 m.UpdateStatCounter("ReceiveError")
124 continue
125 }
126 m.UpdateStatCounter("Received")
127
128 go m.parseMessage(rxBuffer)
129 }
130}
131
132func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
133 if len(m.consumers) == 0 {
134 Logger.Info("rmrClient: No message handlers defined, message discarded!")
135 return
136 }
137
138 for _, c := range m.consumers {
139 cptr := unsafe.Pointer(rxBuffer.payload)
140 payload := C.GoBytes(cptr, C.int(rxBuffer.len))
141
142 err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), int(rxBuffer.len), payload)
143 if err != nil {
144 Logger.Warn("rmrClient: Consumer returned error: %v", err)
145 }
146 }
147}
148
149func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
150 buf := C.rmr_alloc_msg(m.context, 0)
151 if buf == nil {
Mohamed Abukar775722c2019-06-10 16:41:57 +0300152 Logger.Error("rmrClient: Allocating message buffer failed!")
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300153 }
154
155 return buf
156}
157
158func (m *RMRClient) Send(mtype int, sid int, len int, payload []byte) bool {
159 buf := m.Allocate()
160
161 buf.mtype = C.int(mtype)
162 buf.sub_id = C.int(sid)
163 buf.len = C.int(len)
164 datap := C.CBytes(payload)
165 defer C.free(datap)
166
167 C.write_bytes_array(buf.payload, datap, C.int(len))
168
169 return m.SendBuf(buf)
170}
171
172func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool {
173 for i := 0; i < 10; i++ {
174 txBuffer.state = 0
175 txBuffer := C.rmr_send_msg(m.context, txBuffer)
176 if txBuffer == nil {
177 break
178 } else if txBuffer.state != C.RMR_OK {
179 if txBuffer.state != C.RMR_ERR_RETRY {
180 time.Sleep(100 * time.Microsecond)
181 m.UpdateStatCounter("TransmitError")
182 }
183 for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
184 txBuffer = C.rmr_send_msg(m.context, txBuffer)
185 }
186 }
187
188 if txBuffer.state == C.RMR_OK {
189 m.UpdateStatCounter("Transmitted")
190 return true
191 }
192 }
193 m.UpdateStatCounter("TransmitError")
194 return false
195}
196
197func (m *RMRClient) UpdateStatCounter(name string) {
198 m.mux.Lock()
199 m.stat[name].Inc()
200 m.mux.Unlock()
201}
202
203func (m *RMRClient) RegisterMetrics() {
204 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
205}
206
207// To be removed ...
208func (m *RMRClient) GetStat() (r RMRStatistics) {
209 return
210}
211
212func (m *RMRClient) Wait() {
213 m.wg.Wait()
214}
215
216func (m *RMRClient) IsReady() bool {
217 return m.ready != 0
218}
219
Mohamed Abukar192518d2019-06-11 18:06:50 +0300220func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
Mohamed Abukar349a0982019-06-08 18:15:42 +0300221 m.readyCb = cb
Mohamed Abukar192518d2019-06-11 18:06:50 +0300222 m.readyCbParams = params
Mohamed Abukar349a0982019-06-08 18:15:42 +0300223}
Mohamed Abukar775722c2019-06-10 16:41:57 +0300224
225func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
226 id, ok := RICMessageTypes[name]
227 return id, ok
228}
229
230func (m *RMRClient) GetRicMessageName(id int) (s string) {
231 for k, v := range RICMessageTypes {
232 if id == v {
233 return k
234 }
235 }
236 return
237}