blob: 90bc64b6430d19f906572086aef59c4e4d5dd4f7 [file] [log] [blame]
Mohamed Abukar2e78e422019-06-02 11:45:52 +03001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17==================================================================================
18*/
19
20package xapp
21
22/*
23#include <time.h>
24#include <stdlib.h>
25#include <stdio.h>
26#include <string.h>
Juha Hyttinend28b8dd2020-05-27 09:21:08 +030027#include <sys/epoll.h>
28#include <unistd.h>
Mohamed Abukar2e78e422019-06-02 11:45:52 +030029#include <rmr/rmr.h>
30#include <rmr/RIC_message_types.h>
31
32void write_bytes_array(unsigned char *dst, void *data, int len) {
33 memcpy((void *)dst, (void *)data, len);
34}
35
Juha Hyttinend28b8dd2020-05-27 09:21:08 +030036int init_epoll(int rcv_fd) {
37 struct epoll_event epe;
38 int epoll_fd = epoll_create1( 0 );
39 epe.events = EPOLLIN;
40 epe.data.fd = rcv_fd;
41 epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe );
42 return epoll_fd;
43}
44
45void close_epoll(int epoll_fd) {
46 if(epoll_fd >= 0) {
47 close(epoll_fd);
48 }
49}
50
51int wait_epoll(int epoll_fd,int rcv_fd) {
52 struct epoll_event events[1];
53 if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) {
54 if( events[0].data.fd == rcv_fd ) {
55 return 1;
56 }
57 }
58 return 0;
59}
60
Mohamed Abukar2e78e422019-06-02 11:45:52 +030061#cgo CFLAGS: -I../
wahidw1c24a3c2020-03-10 08:45:02 +000062#cgo LDFLAGS: -lrmr_si
Mohamed Abukar2e78e422019-06-02 11:45:52 +030063*/
64import "C"
65
66import (
Mohamed Abukar467a99f2019-11-20 16:07:49 +020067 "fmt"
Mohamed Abukar2e78e422019-06-02 11:45:52 +030068 "github.com/spf13/viper"
Mohamed Abukard969b202019-07-01 17:14:44 +030069 "strings"
Mohamed Abukar2e78e422019-06-02 11:45:52 +030070 "time"
71 "unsafe"
72)
73
74var RMRCounterOpts = []CounterOpts{
75 {Name: "Transmitted", Help: "The total number of transmited RMR messages"},
76 {Name: "Received", Help: "The total number of received RMR messages"},
77 {Name: "TransmitError", Help: "The total number of RMR transmission errors"},
78 {Name: "ReceiveError", Help: "The total number of RMR receive errors"},
79}
80
Mohamed Abukar467a99f2019-11-20 16:07:49 +020081var RMRErrors = map[int]string{
82 C.RMR_OK: "state is good",
83 C.RMR_ERR_BADARG: "argument passed to function was unusable",
84 C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type",
85 C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message",
86 C.RMR_ERR_NOHDR: "message didn't contain a valid header",
87 C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason",
88 C.RMR_ERR_CALLFAILED: "unable to send call() message",
89 C.RMR_ERR_NOWHOPEN: "no wormholes are open",
90 C.RMR_ERR_WHID: "wormhole id was invalid",
91 C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size",
92 C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)",
93 C.RMR_ERR_RCVFAILED: "receive failed (hard error)",
94 C.RMR_ERR_TIMEOUT: "message processing call timed out",
95 C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer",
96 C.RMR_ERR_TRUNC: "received message likely truncated",
97 C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed",
98 C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
99}
100
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300101type RMRParams struct {
Mohamed Abukar19461e12019-08-23 08:46:11 +0300102 Mtype int
103 Payload []byte
104 PayloadLen int
105 Meid *RMRMeid
106 Xid string
107 SubId int
108 Src string
109 Mbuf *C.rmr_mbuf_t
wahidwad0a2712020-03-04 09:54:15 +0000110 Whid int
wahidwe0948fe2020-03-19 14:49:41 +0000111 Callid int
112 Timeout int
Mohamed Abukar791a77f2020-02-12 19:08:42 +0200113 status int
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300114}
115
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300116func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
Juha Hyttinenf49009a2019-11-26 10:28:14 +0200117 p := C.CString(protPort)
118 m := C.int(maxSize)
wahidwad0a2712020-03-04 09:54:15 +0000119 c := C.int(threadType)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300120 defer C.free(unsafe.Pointer(p))
wahidwad0a2712020-03-04 09:54:15 +0000121 ctx := C.rmr_init(p, m, c)
Mohamed Abukar349a0982019-06-08 18:15:42 +0300122 if ctx == nil {
Mohamed Abukar775722c2019-06-10 16:41:57 +0300123 Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300124 }
Mohamed Abukar349a0982019-06-08 18:15:42 +0300125 return &RMRClient{
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300126 protPort: protPort,
127 context: ctx,
128 consumers: make([]MessageConsumer, 0),
129 stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc),
Mohamed Abukar349a0982019-06-08 18:15:42 +0300130 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300131}
132
Juha Hyttinenf49009a2019-11-26 10:28:14 +0200133func NewRMRClient() *RMRClient {
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300134 return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR")
Juha Hyttinenf49009a2019-11-26 10:28:14 +0200135}
136
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300137func (m *RMRClient) Start(c MessageConsumer) {
Mohamed Abukard969b202019-07-01 17:14:44 +0300138 if c != nil {
139 m.consumers = append(m.consumers, c)
140 }
141
Juha Hyttinencc3300c2019-12-11 09:29:12 +0200142 var counter int = 0
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300143 for {
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300144 m.contextMux.Lock()
145 m.ready = int(C.rmr_ready(m.context))
146 m.contextMux.Unlock()
147 if m.ready == 1 {
Juha Hyttinencc3300c2019-12-11 09:29:12 +0200148 Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300149 break
150 }
Juha Hyttinencc3300c2019-12-11 09:29:12 +0200151 if counter%10 == 0 {
152 Logger.Info("rmrClient: Waiting for RMR to be ready ...")
153 }
154 time.Sleep(1 * time.Second)
155 counter++
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300156 }
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300157 m.wg.Add(1)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300158
Mohamed Abukard969b202019-07-01 17:14:44 +0300159 if m.readyCb != nil {
160 go m.readyCb(m.readyCbParams)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300161 }
162
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300163 go func() {
164 m.contextMux.Lock()
165 rfd := C.rmr_get_rcvfd(m.context)
166 m.contextMux.Unlock()
167 efd := C.init_epoll(rfd)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300168
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300169 defer m.wg.Done()
170 for {
171 if int(C.wait_epoll(efd, rfd)) == 0 {
172 continue
173 }
174 m.contextMux.Lock()
175 rxBuffer := C.rmr_rcv_msg(m.context, nil)
176 m.contextMux.Unlock()
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300177
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300178 if rxBuffer == nil {
179 m.LogMBufError("RecvMsg failed", rxBuffer)
180 m.UpdateStatCounter("ReceiveError")
181 continue
182 }
183 m.UpdateStatCounter("Received")
184 m.parseMessage(rxBuffer)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300185 }
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300186 }()
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300187
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300188 m.wg.Wait()
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300189}
190
191func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) {
192 if len(m.consumers) == 0 {
193 Logger.Info("rmrClient: No message handlers defined, message discarded!")
194 return
195 }
196
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300197 params := &RMRParams{}
198 params.Mbuf = rxBuffer
199 params.Mtype = int(rxBuffer.mtype)
200 params.SubId = int(rxBuffer.sub_id)
201 params.Meid = &RMRMeid{}
202
Mohamed Abukard969b202019-07-01 17:14:44 +0300203 meidBuf := make([]byte, int(C.RMR_MAX_MEID))
204 if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
Mohamed Abukar19461e12019-08-23 08:46:11 +0300205 params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000")
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300206 }
207
208 xidBuf := make([]byte, int(C.RMR_MAX_XID))
209 if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
210 params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
211 }
212
213 srcBuf := make([]byte, int(C.RMR_MAX_SRC))
214 if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
215 params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
Mohamed Abukard969b202019-07-01 17:14:44 +0300216 }
217
Mohamed Abukarb4c70392020-02-13 20:43:15 +0200218 // Default case: a single consumer
219 if len(m.consumers) == 1 && m.consumers[0] != nil {
220 params.PayloadLen = int(rxBuffer.len)
221 params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen]
222 err := m.consumers[0].Consume(params)
223 if err != nil {
224 Logger.Warn("rmrClient: Consumer returned error: %v", err)
225 }
226 return
227 }
228
229 // Special case for multiple consumers
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300230 for _, c := range m.consumers {
231 cptr := unsafe.Pointer(rxBuffer.payload)
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300232 params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
233 params.PayloadLen = int(rxBuffer.len)
Mohamed Abukarb4c70392020-02-13 20:43:15 +0200234 params.Mtype = int(rxBuffer.mtype)
235 params.SubId = int(rxBuffer.sub_id)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300236
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300237 err := c.Consume(params)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300238 if err != nil {
239 Logger.Warn("rmrClient: Consumer returned error: %v", err)
240 }
241 }
242}
243
Juha Hyttinen1307b2d2020-04-15 13:45:51 +0300244func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t {
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300245 m.contextMux.Lock()
246 defer m.contextMux.Unlock()
Juha Hyttinen1307b2d2020-04-15 13:45:51 +0300247 buf := C.rmr_alloc_msg(m.context, C.int(size))
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300248 if buf == nil {
Mohamed Abukar775722c2019-06-10 16:41:57 +0300249 Logger.Error("rmrClient: Allocating message buffer failed!")
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300250 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300251 return buf
252}
253
Mohamed Abukar02139602019-09-30 08:55:35 +0300254func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) {
255 if mbuf == nil {
Mohamed Abukar02139602019-09-30 08:55:35 +0300256 return
257 }
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300258 m.contextMux.Lock()
259 defer m.contextMux.Unlock()
Mohamed Abukar02139602019-09-30 08:55:35 +0300260 C.rmr_free_msg(mbuf)
261}
262
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300263func (m *RMRClient) SendMsg(params *RMRParams) bool {
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300264 return m.Send(params, false)
Mohamed Abukar09d0a372019-07-14 08:45:30 +0300265}
266
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300267func (m *RMRClient) SendRts(params *RMRParams) bool {
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300268 return m.Send(params, true)
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300269}
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300270
wahidwe0948fe2020-03-19 14:49:41 +0000271func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
Juha Hyttinen1307b2d2020-04-15 13:45:51 +0300272 if params.Mbuf != nil {
273 m.Free(params.Mbuf)
274 params.Mbuf = nil
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300275 }
276
Juha Hyttinen1307b2d2020-04-15 13:45:51 +0300277 payLen := len(params.Payload)
278 if params.PayloadLen != 0 {
279 payLen = params.PayloadLen
280 }
281
282 txBuffer := m.Allocate(payLen)
283 if txBuffer == nil {
284 return nil
285 }
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300286 txBuffer.mtype = C.int(params.Mtype)
287 txBuffer.sub_id = C.int(params.SubId)
Juha Hyttinen1307b2d2020-04-15 13:45:51 +0300288 txBuffer.len = C.int(payLen)
289
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300290 datap := C.CBytes(params.Payload)
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300291 defer C.free(datap)
292
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300293 if params != nil {
294 if params.Meid != nil {
295 b := make([]byte, int(C.RMR_MAX_MEID))
Mohamed Abukar19461e12019-08-23 08:46:11 +0300296 copy(b, []byte(params.Meid.RanName))
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300297 C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300298 }
299 xidLen := len(params.Xid)
300 if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
Juha Hyttinen6e075ce2019-11-06 08:42:34 +0200301 b := make([]byte, int(C.RMR_MAX_XID))
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300302 copy(b, []byte(params.Xid))
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300303 C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
Mohamed Abukarf11ab7a2019-08-14 16:55:01 +0300304 }
Mohamed Abukard969b202019-07-01 17:14:44 +0300305 }
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300306 C.write_bytes_array(txBuffer.payload, datap, txBuffer.len)
wahidwe0948fe2020-03-19 14:49:41 +0000307 return txBuffer
308}
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300309
wahidwe0948fe2020-03-19 14:49:41 +0000310func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
311
312 txBuffer := m.CopyBuffer(params)
313 if txBuffer == nil {
314 return false
315 }
wahidwad0a2712020-03-04 09:54:15 +0000316 params.status = m.SendBuf(txBuffer, isRts, params.Whid)
Mohamed Abukar791a77f2020-02-12 19:08:42 +0200317 if params.status == int(C.RMR_OK) {
318 return true
319 }
320 return false
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300321}
322
wahidwad0a2712020-03-04 09:54:15 +0000323func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int {
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300324 var (
325 currBuffer *C.rmr_mbuf_t
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300326 counterName string = "Transmitted"
327 )
328
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300329 m.contextMux.Lock()
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300330 txBuffer.state = 0
wahidwad0a2712020-03-04 09:54:15 +0000331 if whid != 0 {
332 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300333 } else {
wahidwad0a2712020-03-04 09:54:15 +0000334 if isRts {
335 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
336 } else {
337 currBuffer = C.rmr_send_msg(m.context, txBuffer)
338 }
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300339 }
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300340 m.contextMux.Unlock()
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300341
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300342 if currBuffer == nil {
343 m.UpdateStatCounter("TransmitError")
Mohamed Abukar467a99f2019-11-20 16:07:49 +0200344 return m.LogMBufError("SendBuf failed", txBuffer)
Mohamed Abukarf0ee2c62019-10-14 19:29:24 +0300345 }
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300346
347 // Just quick retry seems to help for K8s issue
Mohamed Abukar791a77f2020-02-12 19:08:42 +0200348 maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure")
349 if maxRetryOnFailure == 0 {
350 maxRetryOnFailure = 5
351 }
352
353 for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ {
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300354 m.contextMux.Lock()
wahidwad0a2712020-03-04 09:54:15 +0000355 if whid != 0 {
356 currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer)
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300357 } else {
wahidwad0a2712020-03-04 09:54:15 +0000358 if isRts {
359 currBuffer = C.rmr_rts_msg(m.context, txBuffer)
360 } else {
361 currBuffer = C.rmr_send_msg(m.context, txBuffer)
362 }
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300363 }
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300364 m.contextMux.Unlock()
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300365 }
366
367 if currBuffer.state != C.RMR_OK {
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300368 counterName = "TransmitError"
Mohamed Abukar791a77f2020-02-12 19:08:42 +0200369 m.LogMBufError("SendBuf failed", currBuffer)
Mohamed Abukard5dc8e12019-10-15 09:58:17 +0300370 }
371
372 m.UpdateStatCounter(counterName)
Juha Hyttinend1fcac72020-02-25 09:20:26 +0200373 defer m.Free(currBuffer)
Mohamed Abukar791a77f2020-02-12 19:08:42 +0200374
375 return int(currBuffer.state)
376}
377
wahidwe0948fe2020-03-19 14:49:41 +0000378func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) {
379 var (
380 currBuffer *C.rmr_mbuf_t
381 counterName string = "Transmitted"
382 )
383 txBuffer := m.CopyBuffer(params)
384 if txBuffer == nil {
385 return C.RMR_ERR_INITFAILED, ""
386 }
387
388 txBuffer.state = 0
389
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300390 m.contextMux.Lock()
wahidwe0948fe2020-03-19 14:49:41 +0000391 currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout))
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300392 m.contextMux.Unlock()
wahidwe0948fe2020-03-19 14:49:41 +0000393
394 if currBuffer == nil {
395 m.UpdateStatCounter("TransmitError")
396 return m.LogMBufError("SendBuf failed", txBuffer), ""
397 }
398
399 if currBuffer.state != C.RMR_OK {
400 counterName = "TransmitError"
401 m.LogMBufError("SendBuf failed", currBuffer)
402 }
403
404 m.UpdateStatCounter(counterName)
405 defer m.Free(currBuffer)
406
407 cptr := unsafe.Pointer(currBuffer.payload)
408 payload := C.GoBytes(cptr, C.int(currBuffer.len))
409
410 return int(currBuffer.state), string(payload)
411}
412
wahidwad0a2712020-03-04 09:54:15 +0000413func (m *RMRClient) Openwh(target string) C.rmr_whid_t {
414 return m.Wh_open(target)
415}
416
417func (m *RMRClient) Wh_open(target string) C.rmr_whid_t {
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300418 m.contextMux.Lock()
419 defer m.contextMux.Unlock()
wahidwad0a2712020-03-04 09:54:15 +0000420 endpoint := C.CString(target)
421 return C.rmr_wh_open(m.context, endpoint)
422}
423
424func (m *RMRClient) Closewh(whid int) {
425 m.Wh_close(C.rmr_whid_t(whid))
426}
427
428func (m *RMRClient) Wh_close(whid C.rmr_whid_t) {
Juha Hyttinend28b8dd2020-05-27 09:21:08 +0300429 m.contextMux.Lock()
430 defer m.contextMux.Unlock()
wahidwad0a2712020-03-04 09:54:15 +0000431 C.rmr_wh_close(m.context, whid)
432}
433
Mohamed Abukar791a77f2020-02-12 19:08:42 +0200434func (m *RMRClient) IsRetryError(params *RMRParams) bool {
435 if params.status == int(C.RMR_ERR_RETRY) {
436 return true
437 }
438 return false
439}
440
441func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool {
442 if params.status == int(C.RMR_ERR_NOENDPT) {
443 return true
444 }
445 return false
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300446}
447
448func (m *RMRClient) UpdateStatCounter(name string) {
449 m.mux.Lock()
450 m.stat[name].Inc()
451 m.mux.Unlock()
452}
453
454func (m *RMRClient) RegisterMetrics() {
455 m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
456}
457
Mohamed Abukar2e78e422019-06-02 11:45:52 +0300458func (m *RMRClient) Wait() {
459 m.wg.Wait()
460}
461
462func (m *RMRClient) IsReady() bool {
463 return m.ready != 0
464}
465
Mohamed Abukar192518d2019-06-11 18:06:50 +0300466func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
Mohamed Abukar349a0982019-06-08 18:15:42 +0300467 m.readyCb = cb
Mohamed Abukar192518d2019-06-11 18:06:50 +0300468 m.readyCbParams = params
Mohamed Abukar349a0982019-06-08 18:15:42 +0300469}
Mohamed Abukar775722c2019-06-10 16:41:57 +0300470
471func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
472 id, ok := RICMessageTypes[name]
473 return id, ok
474}
475
476func (m *RMRClient) GetRicMessageName(id int) (s string) {
477 for k, v := range RICMessageTypes {
478 if id == v {
479 return k
480 }
481 }
482 return
483}
Mohamed Abukard969b202019-07-01 17:14:44 +0300484
Mohamed Abukar791a77f2020-02-12 19:08:42 +0200485func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int {
Mohamed Abukar467a99f2019-11-20 16:07:49 +0200486 Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)]))
Mohamed Abukar791a77f2020-02-12 19:08:42 +0200487 return int(mbuf.state)
Mohamed Abukar467a99f2019-11-20 16:07:49 +0200488}
489
Mohamed Abukard969b202019-07-01 17:14:44 +0300490// To be removed ...
491func (m *RMRClient) GetStat() (r RMRStatistics) {
492 return
493}