blob: d90da022b7a8abf28cd5dcb14c4282f6dfbe6abb [file] [log] [blame]
Mohamed Abukar2e78e422019-06-02 11:45:52 +03001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17==================================================================================
18*/
19
20package xapp
21
22/*
23#include <time.h>
24#include <stdlib.h>
25#include <stdio.h>
26#include <string.h>
27#include <rmr/rmr.h>
28#include <rmr/RIC_message_types.h>
29
30void write_bytes_array(unsigned char *dst, void *data, int len) {
31 memcpy((void *)dst, (void *)data, len);
32}
33
34#cgo CFLAGS: -I../
35#cgo LDFLAGS: -lrmr_nng -lnng
36*/
37import "C"
38
39import (
Mohamed Abukar467a99f2019-11-20 16:07:49 +020040 "fmt"
Mohamed Abukar2e78e422019-06-02 11:45:52 +030041 "github.com/spf13/viper"
42 "strconv"
Mohamed Abukard969b202019-07-01 17:14:44 +030043 "strings"
Mohamed Abukar2e78e422019-06-02 11:45:52 +030044 "time"
45 "unsafe"
46)
47
48var RMRCounterOpts = []CounterOpts{
49 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
50 {Name: "Received", Help: "The total number of received RMR messages"},
51 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
52 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
53}
54
Mohamed Abukar467a99f2019-11-20 16:07:49 +020055var RMRErrors = map[int]string{
56 C.RMR_OK: "state is good",
57 C.RMR_ERR_BADARG: "argument passed to function was unusable",
58 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
59 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
60 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
61 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
62 C.RMR_ERR_CALLFAILED: "unable to send call() message",
63 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
64 C.RMR_ERR_WHID: "wormhole id was invalid",
65 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
66 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
67 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
68 C.RMR_ERR_TIMEOUT: "message processing call timed out",
69 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
70 C.RMR_ERR_TRUNC: "received message likely truncated",
71 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
72 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
73}
74
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +030075type RMRParams struct {
Mohamed Abukar19461e12019-08-23 08:46:11 +030076 Mtype int
77 Payload []byte
78 PayloadLen int
79 Meid *RMRMeid
80 Xid string
81 SubId int
82 Src string
83 Mbuf *C.rmr_mbuf_t
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +030084}
85
Mohamed Abukar2e78e422019-06-02 11:45:52 +030086func NewRMRClient() *RMRClient {
Mohamed Abukar2e78e422019-06-02 11:45:52 +030087 p := C.CString(viper.GetString("rmr.protPort"))
88 m := C.int(viper.GetInt("rmr.maxSize"))
89 defer C.free(unsafe.Pointer(p))
90
Mohamed Abukar349a0982019-06-08 18:15:42 +030091 ctx := C.rmr_init(p, m, C.int(0))
92 if ctx == nil {
Mohamed Abukar775722c2019-06-10 16:41:57 +030093 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
Mohamed Abukar2e78e422019-06-02 11:45:52 +030094 }
95
Mohamed Abukar349a0982019-06-08 18:15:42 +030096 return &RMRClient{
Mohamed Abukar775722c2019-06-10 16:41:57 +030097 context: ctx,
Mohamed Abukar349a0982019-06-08 18:15:42 +030098 consumers: make([]MessageConsumer, 0),
Mohamed Abukar775722c2019-06-10 16:41:57 +030099 stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
Mohamed Abukar349a0982019-06-08 18:15:42 +0300100 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300101}
102
103func (m *RMRClient) Start(c MessageConsumer) {
Mohamed Abukard969b202019-07-01 17:14:44 +0300104 if c != nil {
105 m.consumers = append(m.consumers, c)
106 }
107
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300108 for {
109 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
110
111 if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
112 break
113 }
114 time.Sleep(10 * time.Second)
115 }
116 m.wg.Add(viper.GetInt("rmr.numWorkers"))
117
Mohamed Abukard969b202019-07-01 17:14:44 +0300118 if m.readyCb != nil {
119 go m.readyCb(m.readyCbParams)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300120 }
121
122 for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
123 go m.Worker("worker-"+strconv.Itoa(w), 0)
124 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300125 m.Wait()
126}
127
128func (m *RMRClient) Worker(taskName string, msgSize int) {
129 p := viper.GetString("rmr.protPort")
130 Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
131
132 defer m.wg.Done()
133 for {
134 rxBuffer := C.rmr_rcv_msg(m.context, nil)
135 if rxBuffer == nil {
Mohamed Abukar467a99f2019-11-20 16:07:49 +0200136 m.LogMBufError("RecvMsg failed", rxBuffer)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300137 m.UpdateStatCounter("ReceiveError")
138 continue
139 }
140 m.UpdateStatCounter("Received")
141
142 go m.parseMessage(rxBuffer)
143 }
144}
145
146func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
147 if len(m.consumers) == 0 {
148 Logger.Info("rmrClient: No message handlers defined, message discarded!")
149 return
150 }
151
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300152 params := &RMRParams{}
153 params.Mbuf = rxBuffer
154 params.Mtype = int(rxBuffer.mtype)
155 params.SubId = int(rxBuffer.sub_id)
156 params.Meid = &RMRMeid{}
157
Mohamed Abukard969b202019-07-01 17:14:44 +0300158 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
159 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
Mohamed Abukar19461e12019-08-23 08:46:11 +0300160 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300161 }
162
163 xidBuf := make([]byte, int(C.RMR_MAX_XID))
164 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
165 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
166 }
167
168 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
169 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
170 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
Mohamed Abukard969b202019-07-01 17:14:44 +0300171 }
172
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300173 for _, c := range m.consumers {
174 cptr := unsafe.Pointer(rxBuffer.payload)
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300175 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
176 params.PayloadLen = int(rxBuffer.len)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300177
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300178 err := c.Consume(params)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300179 if err != nil {
180 Logger.Warn("rmrClient: Consumer returned error: %v", err)
181 }
182 }
183}
184
185func (m *RMRClient) Allocate() *C.rmr_mbuf_t {
186 buf := C.rmr_alloc_msg(m.context, 0)
187 if buf == nil {
Mohamed Abukar775722c2019-06-10 16:41:57 +0300188 Logger.Error("rmrClient: Allocating message buffer failed!")
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300189 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300190 return buf
191}
192
Mohamed Abukar02139602019-09-30 08:55:35 +0300193func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
194 if mbuf == nil {
Mohamed Abukar02139602019-09-30 08:55:35 +0300195 return
196 }
197 C.rmr_free_msg(mbuf)
198}
199
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300200func (m *RMRClient) SendMsg(params *RMRParams) bool {
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300201 return m.Send(params, false)
Mohamed Abukar09d0a372019-07-14 08:45:30 +0300202}
203
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300204func (m *RMRClient) SendRts(params *RMRParams) bool {
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300205 return m.Send(params, true)
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300206}
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300207
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300208func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300209 txBuffer := params.Mbuf
210 if txBuffer == nil {
211 txBuffer = m.Allocate()
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300212 }
213
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300214 txBuffer.mtype = C.int(params.Mtype)
215 txBuffer.sub_id = C.int(params.SubId)
216 txBuffer.len = C.int(len(params.Payload))
Mohamed Abukar688e6c92019-09-05 13:12:42 +0300217 if params.PayloadLen != 0 {
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300218 txBuffer.len = C.int(params.PayloadLen)
Mohamed Abukar688e6c92019-09-05 13:12:42 +0300219 }
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300220 datap := C.CBytes(params.Payload)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300221 defer C.free(datap)
222
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300223 if params != nil {
224 if params.Meid != nil {
225 b := make([]byte, int(C.RMR_MAX_MEID))
Mohamed Abukar19461e12019-08-23 08:46:11 +0300226 copy(b, []byte(params.Meid.RanName))
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300227 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300228 }
229 xidLen := len(params.Xid)
230 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
Juha Hyttinen6e075ce2019-11-06 08:42:34 +0200231 b := make([]byte, int(C.RMR_MAX_XID))
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300232 copy(b, []byte(params.Xid))
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300233 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300234 }
Mohamed Abukard969b202019-07-01 17:14:44 +0300235 }
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300236 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300237
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300238 return m.SendBuf(txBuffer, isRts)
239}
240
241func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool {
242 var (
243 currBuffer *C.rmr_mbuf_t
244 state bool = true
245 counterName string = "Transmitted"
246 )
247
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300248 txBuffer.state = 0
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300249 if isRts {
250 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
251 } else {
252 currBuffer = C.rmr_send_msg(m.context, txBuffer)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300253 }
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300254
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300255 if currBuffer == nil {
256 m.UpdateStatCounter("TransmitError")
Mohamed Abukar467a99f2019-11-20 16:07:49 +0200257 return m.LogMBufError("SendBuf failed", txBuffer)
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300258 }
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300259
260 // Just quick retry seems to help for K8s issue
261 for j := 0; j < 3 && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
262 if isRts {
263 currBuffer = C.rmr_rts_msg(m.context, currBuffer)
264 } else {
265 currBuffer = C.rmr_send_msg(m.context, currBuffer)
266 }
267 }
268
269 if currBuffer.state != C.RMR_OK {
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300270 counterName = "TransmitError"
Mohamed Abukar467a99f2019-11-20 16:07:49 +0200271 state = m.LogMBufError("SendBuf failed", currBuffer)
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300272 }
273
274 m.UpdateStatCounter(counterName)
275 m.Free(currBuffer)
276 return state
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300277}
278
279func (m *RMRClient) UpdateStatCounter(name string) {
280 m.mux.Lock()
281 m.stat[name].Inc()
282 m.mux.Unlock()
283}
284
285func (m *RMRClient) RegisterMetrics() {
286 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
287}
288
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300289func (m *RMRClient) Wait() {
290 m.wg.Wait()
291}
292
293func (m *RMRClient) IsReady() bool {
294 return m.ready != 0
295}
296
Mohamed Abukar192518d2019-06-11 18:06:50 +0300297func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
Mohamed Abukar349a0982019-06-08 18:15:42 +0300298 m.readyCb = cb
Mohamed Abukar192518d2019-06-11 18:06:50 +0300299 m.readyCbParams = params
Mohamed Abukar349a0982019-06-08 18:15:42 +0300300}
Mohamed Abukar775722c2019-06-10 16:41:57 +0300301
302func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
303 id, ok := RICMessageTypes[name]
304 return id, ok
305}
306
307func (m *RMRClient) GetRicMessageName(id int) (s string) {
308 for k, v := range RICMessageTypes {
309 if id == v {
310 return k
311 }
312 }
313 return
314}
Mohamed Abukard969b202019-07-01 17:14:44 +0300315
Mohamed Abukar467a99f2019-11-20 16:07:49 +0200316func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) bool {
317 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
318 return false
319}
320
Mohamed Abukard969b202019-07-01 17:14:44 +0300321// To be removed ...
322func (m *RMRClient) GetStat() (r RMRStatistics) {
323 return
324}