blob: c0e1c8faa9f86a2c50ce883516ed2c81a2fa615f [file] [log] [blame]
Mohamed Abukar2e78e422019-06-02 11:45:52 +03001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17==================================================================================
18*/
19
20package xapp
21
22/*
23#include <time.h>
24#include <stdlib.h>
25#include <stdio.h>
26#include <string.h>
27#include <rmr/rmr.h>
28#include <rmr/RIC_message_types.h>
29
30void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
32}
33
34#cgo CFLAGS: -I../
35#cgo LDFLAGS: -lrmr_nng -lnng
36*/
37import "C"
38
39import (
40 "github.com/spf13/viper"
41 "strconv"
Mohamed Abukard969b202019-07-01 17:14:44 +030042 "strings"
Mohamed Abukar2e78e422019-06-02 11:45:52 +030043 "time"
44 "unsafe"
45)
46
47var RMRCounterOpts = []CounterOpts{
48 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
49 {Name: "Received", Help: "The total number of received RMR messages"},
50 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
51 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
52}
53
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +030054type RMRParams struct {
Mohamed Abukar19461e12019-08-23 08:46:11 +030055 Mtype int
56 Payload []byte
57 PayloadLen int
58 Meid *RMRMeid
59 Xid string
60 SubId int
61 Src string
62 Mbuf *C.rmr_mbuf_t
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +030063}
64
Mohamed Abukar2e78e422019-06-02 11:45:52 +030065func NewRMRClient() *RMRClient {
Mohamed Abukar2e78e422019-06-02 11:45:52 +030066 p := C.CString(viper.GetString("rmr.protPort"))
67 m := C.int(viper.GetInt("rmr.maxSize"))
68 defer C.free(unsafe.Pointer(p))
69
Mohamed Abukar349a0982019-06-08 18:15:42 +030070 ctx := C.rmr_init(p, m, C.int(0))
71 if ctx == nil {
Mohamed Abukar775722c2019-06-10 16:41:57 +030072 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
Mohamed Abukar2e78e422019-06-02 11:45:52 +030073 }
74
Mohamed Abukar349a0982019-06-08 18:15:42 +030075 return &RMRClient{
Mohamed Abukar775722c2019-06-10 16:41:57 +030076 context: ctx,
Mohamed Abukar349a0982019-06-08 18:15:42 +030077 consumers: make([]MessageConsumer, 0),
Mohamed Abukar775722c2019-06-10 16:41:57 +030078 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
Mohamed Abukar349a0982019-06-08 18:15:42 +030079 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +030080}
81
82func (m *RMRClient) Start(c MessageConsumer) {
Mohamed Abukard969b202019-07-01 17:14:44 +030083 if c != nil {
84 m.consumers = append(m.consumers, c)
85 }
86
Mohamed Abukar2e78e422019-06-02 11:45:52 +030087 for {
88 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
89
90 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
91 break
92 }
93 time.Sleep(10 * time.Second)
94 }
95 m.wg.Add(viper.GetInt("rmr.numWorkers"))
96
Mohamed Abukard969b202019-07-01 17:14:44 +030097 if m.readyCb != nil {
98 go m.readyCb(m.readyCbParams)
Mohamed Abukar2e78e422019-06-02 11:45:52 +030099 }
100
101 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
102 go m.Worker("worker-"+strconv.Itoa(w), 0)
103 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300104 m.Wait()
105}
106
107func (m *RMRClient) Worker(taskName string, msgSize int) {
108 p := viper.GetString("rmr.protPort")
109 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
110
111 defer m.wg.Done()
112 for {
113 rxBuffer := C.rmr_rcv_msg(m.context, nil)
114 if rxBuffer == nil {
115 m.UpdateStatCounter("ReceiveError")
116 continue
117 }
118 m.UpdateStatCounter("Received")
119
120 go m.parseMessage(rxBuffer)
121 }
122}
123
124func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
125 if len(m.consumers) == 0 {
126 Logger.Info("rmrClient: No message handlers defined, message discarded!")
127 return
128 }
129
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300130 params := &RMRParams{}
131 params.Mbuf = rxBuffer
132 params.Mtype = int(rxBuffer.mtype)
133 params.SubId = int(rxBuffer.sub_id)
134 params.Meid = &RMRMeid{}
135
Mohamed Abukard969b202019-07-01 17:14:44 +0300136 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
137 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
Mohamed Abukar19461e12019-08-23 08:46:11 +0300138 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300139 }
140
141 xidBuf := make([]byte, int(C.RMR_MAX_XID))
142 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
143 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
144 }
145
146 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
147 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
148 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
Mohamed Abukard969b202019-07-01 17:14:44 +0300149 }
150
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300151 for _, c := range m.consumers {
152 cptr := unsafe.Pointer(rxBuffer.payload)
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300153 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
154 params.PayloadLen = int(rxBuffer.len)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300155
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300156 err := c.Consume(params)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300157 if err != nil {
158 Logger.Warn("rmrClient: Consumer returned error: %v", err)
159 }
160 }
161}
162
163func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
164 buf := C.rmr_alloc_msg(m.context, 0)
165 if buf == nil {
Mohamed Abukar775722c2019-06-10 16:41:57 +0300166 Logger.Error("rmrClient: Allocating message buffer failed!")
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300167 }
168
169 return buf
170}
171
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300172func (m *RMRClient) SendMsg(params *RMRParams) bool {
173 return m.Send(params, false)
Mohamed Abukar09d0a372019-07-14 08:45:30 +0300174}
175
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300176func (m *RMRClient) SendRts(params *RMRParams) bool {
177 return m.Send(params, true)
178}
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300179
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300180func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
181 buf := params.Mbuf
182 if buf == nil {
183 buf = m.Allocate()
184 }
185
186 buf.mtype = C.int(params.Mtype)
187 buf.sub_id = C.int(params.SubId)
188 buf.len = C.int(len(params.Payload))
Mohamed Abukar688e6c92019-09-05 13:12:42 +0300189 if params.PayloadLen != 0 {
Mohamed Abukar549a1312019-09-05 13:17:49 +0300190 buf.len = C.int(params.PayloadLen)
Mohamed Abukar688e6c92019-09-05 13:12:42 +0300191 }
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300192 datap := C.CBytes(params.Payload)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300193 defer C.free(datap)
194
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300195 if params != nil {
196 if params.Meid != nil {
197 b := make([]byte, int(C.RMR_MAX_MEID))
Mohamed Abukar19461e12019-08-23 08:46:11 +0300198 copy(b, []byte(params.Meid.RanName))
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300199 C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
200 }
201 xidLen := len(params.Xid)
202 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
203 b := make([]byte, int(C.RMR_MAX_MEID))
204 copy(b, []byte(params.Xid))
205 C.rmr_bytes2xact(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
206 }
Mohamed Abukard969b202019-07-01 17:14:44 +0300207 }
208 C.write_bytes_array(buf.payload, datap, buf.len)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300209
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300210 return m.SendBuf(buf, isRts)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300211}
212
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300213func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300214 for i := 0; i < 10; i++ {
215 txBuffer.state = 0
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300216 if isRts {
217 txBuffer = C.rmr_rts_msg(m.context, txBuffer)
218 } else {
219 txBuffer = C.rmr_send_msg(m.context, txBuffer)
220 }
221
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300222 if txBuffer == nil {
223 break
224 } else if txBuffer.state != C.RMR_OK {
225 if txBuffer.state != C.RMR_ERR_RETRY {
226 time.Sleep(100 * time.Microsecond)
227 m.UpdateStatCounter("TransmitError")
228 }
229 for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
230 txBuffer = C.rmr_send_msg(m.context, txBuffer)
231 }
232 }
233
234 if txBuffer.state == C.RMR_OK {
235 m.UpdateStatCounter("Transmitted")
236 return true
237 }
238 }
239 m.UpdateStatCounter("TransmitError")
240 return false
241}
242
243func (m *RMRClient) UpdateStatCounter(name string) {
244 m.mux.Lock()
245 m.stat[name].Inc()
246 m.mux.Unlock()
247}
248
249func (m *RMRClient) RegisterMetrics() {
250 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
251}
252
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300253func (m *RMRClient) Wait() {
254 m.wg.Wait()
255}
256
257func (m *RMRClient) IsReady() bool {
258 return m.ready != 0
259}
260
Mohamed Abukar192518d2019-06-11 18:06:50 +0300261func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
Mohamed Abukar349a0982019-06-08 18:15:42 +0300262 m.readyCb = cb
Mohamed Abukar192518d2019-06-11 18:06:50 +0300263 m.readyCbParams = params
Mohamed Abukar349a0982019-06-08 18:15:42 +0300264}
Mohamed Abukar775722c2019-06-10 16:41:57 +0300265
266func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
267 id, ok := RICMessageTypes[name]
268 return id, ok
269}
270
271func (m *RMRClient) GetRicMessageName(id int) (s string) {
272 for k, v := range RICMessageTypes {
273 if id == v {
274 return k
275 }
276 }
277 return
278}
Mohamed Abukard969b202019-07-01 17:14:44 +0300279
280// To be removed ...
281func (m *RMRClient) GetStat() (r RMRStatistics) {
282 return
283}