Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 1 | /* |
| 2 | ================================================================================== |
| 3 | Copyright (c) 2019 AT&T Intellectual Property. |
| 4 | Copyright (c) 2019 Nokia |
| 5 | |
| 6 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | you may not use this file except in compliance with the License. |
| 8 | You may obtain a copy of the License at |
| 9 | |
| 10 | http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | |
| 12 | Unless required by applicable law or agreed to in writing, software |
| 13 | distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | See the License for the specific language governing permissions and |
| 16 | limitations under the License. |
| 17 | ================================================================================== |
| 18 | */ |
| 19 | |
| 20 | package xapp |
| 21 | |
| 22 | /* |
| 23 | #include <time.h> |
| 24 | #include <stdlib.h> |
| 25 | #include <stdio.h> |
| 26 | #include <string.h> |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 27 | #include <sys/epoll.h> |
| 28 | #include <unistd.h> |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 29 | #include <rmr/rmr.h> |
| 30 | #include <rmr/RIC_message_types.h> |
| 31 | |
| 32 | void write_bytes_array(unsigned char *dst, void *data, int len) { |
| 33 | memcpy((void *)dst, (void *)data, len); |
| 34 | } |
| 35 | |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 36 | int init_epoll(int rcv_fd) { |
| 37 | struct epoll_event epe; |
| 38 | int epoll_fd = epoll_create1( 0 ); |
| 39 | epe.events = EPOLLIN; |
| 40 | epe.data.fd = rcv_fd; |
| 41 | epoll_ctl( epoll_fd, EPOLL_CTL_ADD, rcv_fd, &epe ); |
| 42 | return epoll_fd; |
| 43 | } |
| 44 | |
| 45 | void close_epoll(int epoll_fd) { |
| 46 | if(epoll_fd >= 0) { |
| 47 | close(epoll_fd); |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | int wait_epoll(int epoll_fd,int rcv_fd) { |
| 52 | struct epoll_event events[1]; |
| 53 | if( epoll_wait( epoll_fd, events, 1, -1 ) > 0 ) { |
| 54 | if( events[0].data.fd == rcv_fd ) { |
| 55 | return 1; |
| 56 | } |
| 57 | } |
| 58 | return 0; |
| 59 | } |
| 60 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 61 | #cgo CFLAGS: -I../ |
wahidw | 1c24a3c | 2020-03-10 08:45:02 +0000 | [diff] [blame] | 62 | #cgo LDFLAGS: -lrmr_si |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 63 | */ |
| 64 | import "C" |
| 65 | |
| 66 | import ( |
Mohamed Abukar | 467a99f | 2019-11-20 16:07:49 +0200 | [diff] [blame] | 67 | "fmt" |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 68 | "github.com/spf13/viper" |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 69 | "strings" |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 70 | "time" |
| 71 | "unsafe" |
| 72 | ) |
| 73 | |
| 74 | var RMRCounterOpts = []CounterOpts{ |
| 75 | {Name: "Transmitted", Help: "The total number of transmited RMR messages"}, |
| 76 | {Name: "Received", Help: "The total number of received RMR messages"}, |
| 77 | {Name: "TransmitError", Help: "The total number of RMR transmission errors"}, |
| 78 | {Name: "ReceiveError", Help: "The total number of RMR receive errors"}, |
| 79 | } |
| 80 | |
Mohamed Abukar | 467a99f | 2019-11-20 16:07:49 +0200 | [diff] [blame] | 81 | var RMRErrors = map[int]string{ |
| 82 | C.RMR_OK: "state is good", |
| 83 | C.RMR_ERR_BADARG: "argument passed to function was unusable", |
| 84 | C.RMR_ERR_NOENDPT: "send/call could not find an endpoint based on msg type", |
| 85 | C.RMR_ERR_EMPTY: "msg received had no payload; attempt to send an empty message", |
| 86 | C.RMR_ERR_NOHDR: "message didn't contain a valid header", |
| 87 | C.RMR_ERR_SENDFAILED: "send failed; errno has nano reason", |
| 88 | C.RMR_ERR_CALLFAILED: "unable to send call() message", |
| 89 | C.RMR_ERR_NOWHOPEN: "no wormholes are open", |
| 90 | C.RMR_ERR_WHID: "wormhole id was invalid", |
| 91 | C.RMR_ERR_OVERFLOW: "operation would have busted through a buffer/field size", |
| 92 | C.RMR_ERR_RETRY: "request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)", |
| 93 | C.RMR_ERR_RCVFAILED: "receive failed (hard error)", |
| 94 | C.RMR_ERR_TIMEOUT: "message processing call timed out", |
| 95 | C.RMR_ERR_UNSET: "the message hasn't been populated with a transport buffer", |
| 96 | C.RMR_ERR_TRUNC: "received message likely truncated", |
| 97 | C.RMR_ERR_INITFAILED: "initialization of something (probably message) failed", |
| 98 | C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request", |
| 99 | } |
| 100 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 101 | type RMRParams struct { |
Mohamed Abukar | 19461e1 | 2019-08-23 08:46:11 +0300 | [diff] [blame] | 102 | Mtype int |
| 103 | Payload []byte |
| 104 | PayloadLen int |
| 105 | Meid *RMRMeid |
| 106 | Xid string |
| 107 | SubId int |
| 108 | Src string |
| 109 | Mbuf *C.rmr_mbuf_t |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 110 | Whid int |
wahidw | e0948fe | 2020-03-19 14:49:41 +0000 | [diff] [blame] | 111 | Callid int |
| 112 | Timeout int |
Mohamed Abukar | 791a77f | 2020-02-12 19:08:42 +0200 | [diff] [blame] | 113 | status int |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 114 | } |
| 115 | |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 116 | func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient { |
Juha Hyttinen | f49009a | 2019-11-26 10:28:14 +0200 | [diff] [blame] | 117 | p := C.CString(protPort) |
| 118 | m := C.int(maxSize) |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 119 | c := C.int(threadType) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 120 | defer C.free(unsafe.Pointer(p)) |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 121 | ctx := C.rmr_init(p, m, c) |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 122 | if ctx == nil { |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 123 | Logger.Error("rmrClient: Initializing RMR context failed, bailing out!") |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 124 | } |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 125 | return &RMRClient{ |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 126 | protPort: protPort, |
| 127 | context: ctx, |
| 128 | consumers: make([]MessageConsumer, 0), |
| 129 | stat: Metric.RegisterCounterGroup(RMRCounterOpts, statDesc), |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 130 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 131 | } |
| 132 | |
Juha Hyttinen | f49009a | 2019-11-26 10:28:14 +0200 | [diff] [blame] | 133 | func NewRMRClient() *RMRClient { |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 134 | return NewRMRClientWithParams(viper.GetString("rmr.protPort"), viper.GetInt("rmr.maxSize"), viper.GetInt("rmr.threadType"), "RMR") |
Juha Hyttinen | f49009a | 2019-11-26 10:28:14 +0200 | [diff] [blame] | 135 | } |
| 136 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 137 | func (m *RMRClient) Start(c MessageConsumer) { |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 138 | if c != nil { |
| 139 | m.consumers = append(m.consumers, c) |
| 140 | } |
| 141 | |
Juha Hyttinen | cc3300c | 2019-12-11 09:29:12 +0200 | [diff] [blame] | 142 | var counter int = 0 |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 143 | for { |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 144 | m.contextMux.Lock() |
| 145 | m.ready = int(C.rmr_ready(m.context)) |
| 146 | m.contextMux.Unlock() |
| 147 | if m.ready == 1 { |
Juha Hyttinen | cc3300c | 2019-12-11 09:29:12 +0200 | [diff] [blame] | 148 | Logger.Info("rmrClient: RMR is ready after %d seconds waiting...", counter) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 149 | break |
| 150 | } |
Juha Hyttinen | cc3300c | 2019-12-11 09:29:12 +0200 | [diff] [blame] | 151 | if counter%10 == 0 { |
| 152 | Logger.Info("rmrClient: Waiting for RMR to be ready ...") |
| 153 | } |
| 154 | time.Sleep(1 * time.Second) |
| 155 | counter++ |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 156 | } |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 157 | m.wg.Add(1) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 158 | |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 159 | if m.readyCb != nil { |
| 160 | go m.readyCb(m.readyCbParams) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 161 | } |
| 162 | |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 163 | go func() { |
| 164 | m.contextMux.Lock() |
| 165 | rfd := C.rmr_get_rcvfd(m.context) |
| 166 | m.contextMux.Unlock() |
| 167 | efd := C.init_epoll(rfd) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 168 | |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 169 | defer m.wg.Done() |
| 170 | for { |
| 171 | if int(C.wait_epoll(efd, rfd)) == 0 { |
| 172 | continue |
| 173 | } |
| 174 | m.contextMux.Lock() |
| 175 | rxBuffer := C.rmr_rcv_msg(m.context, nil) |
| 176 | m.contextMux.Unlock() |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 177 | |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 178 | if rxBuffer == nil { |
| 179 | m.LogMBufError("RecvMsg failed", rxBuffer) |
| 180 | m.UpdateStatCounter("ReceiveError") |
| 181 | continue |
| 182 | } |
| 183 | m.UpdateStatCounter("Received") |
| 184 | m.parseMessage(rxBuffer) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 185 | } |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 186 | }() |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 187 | |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 188 | m.wg.Wait() |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 189 | } |
| 190 | |
| 191 | func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { |
| 192 | if len(m.consumers) == 0 { |
| 193 | Logger.Info("rmrClient: No message handlers defined, message discarded!") |
| 194 | return |
| 195 | } |
| 196 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 197 | params := &RMRParams{} |
| 198 | params.Mbuf = rxBuffer |
| 199 | params.Mtype = int(rxBuffer.mtype) |
| 200 | params.SubId = int(rxBuffer.sub_id) |
| 201 | params.Meid = &RMRMeid{} |
| 202 | |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 203 | meidBuf := make([]byte, int(C.RMR_MAX_MEID)) |
| 204 | if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil { |
Mohamed Abukar | 19461e1 | 2019-08-23 08:46:11 +0300 | [diff] [blame] | 205 | params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000") |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 206 | } |
| 207 | |
| 208 | xidBuf := make([]byte, int(C.RMR_MAX_XID)) |
| 209 | if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil { |
| 210 | params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000") |
| 211 | } |
| 212 | |
| 213 | srcBuf := make([]byte, int(C.RMR_MAX_SRC)) |
| 214 | if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil { |
| 215 | params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000") |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 216 | } |
| 217 | |
Mohamed Abukar | b4c7039 | 2020-02-13 20:43:15 +0200 | [diff] [blame] | 218 | // Default case: a single consumer |
| 219 | if len(m.consumers) == 1 && m.consumers[0] != nil { |
| 220 | params.PayloadLen = int(rxBuffer.len) |
| 221 | params.Payload = (*[1 << 30]byte)(unsafe.Pointer(rxBuffer.payload))[:params.PayloadLen:params.PayloadLen] |
| 222 | err := m.consumers[0].Consume(params) |
| 223 | if err != nil { |
| 224 | Logger.Warn("rmrClient: Consumer returned error: %v", err) |
| 225 | } |
| 226 | return |
| 227 | } |
| 228 | |
| 229 | // Special case for multiple consumers |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 230 | for _, c := range m.consumers { |
| 231 | cptr := unsafe.Pointer(rxBuffer.payload) |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 232 | params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len)) |
| 233 | params.PayloadLen = int(rxBuffer.len) |
Mohamed Abukar | b4c7039 | 2020-02-13 20:43:15 +0200 | [diff] [blame] | 234 | params.Mtype = int(rxBuffer.mtype) |
| 235 | params.SubId = int(rxBuffer.sub_id) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 236 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 237 | err := c.Consume(params) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 238 | if err != nil { |
| 239 | Logger.Warn("rmrClient: Consumer returned error: %v", err) |
| 240 | } |
| 241 | } |
| 242 | } |
| 243 | |
Juha Hyttinen | 1307b2d | 2020-04-15 13:45:51 +0300 | [diff] [blame] | 244 | func (m *RMRClient) Allocate(size int) *C.rmr_mbuf_t { |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 245 | m.contextMux.Lock() |
| 246 | defer m.contextMux.Unlock() |
Juha Hyttinen | 1307b2d | 2020-04-15 13:45:51 +0300 | [diff] [blame] | 247 | buf := C.rmr_alloc_msg(m.context, C.int(size)) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 248 | if buf == nil { |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 249 | Logger.Error("rmrClient: Allocating message buffer failed!") |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 250 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 251 | return buf |
| 252 | } |
| 253 | |
Mohamed Abukar | 0213960 | 2019-09-30 08:55:35 +0300 | [diff] [blame] | 254 | func (m *RMRClient) Free(mbuf *C.rmr_mbuf_t) { |
| 255 | if mbuf == nil { |
Mohamed Abukar | 0213960 | 2019-09-30 08:55:35 +0300 | [diff] [blame] | 256 | return |
| 257 | } |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 258 | m.contextMux.Lock() |
| 259 | defer m.contextMux.Unlock() |
Mohamed Abukar | 0213960 | 2019-09-30 08:55:35 +0300 | [diff] [blame] | 260 | C.rmr_free_msg(mbuf) |
| 261 | } |
| 262 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 263 | func (m *RMRClient) SendMsg(params *RMRParams) bool { |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 264 | return m.Send(params, false) |
Mohamed Abukar | 09d0a37 | 2019-07-14 08:45:30 +0300 | [diff] [blame] | 265 | } |
| 266 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 267 | func (m *RMRClient) SendRts(params *RMRParams) bool { |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 268 | return m.Send(params, true) |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 269 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 270 | |
wahidw | e0948fe | 2020-03-19 14:49:41 +0000 | [diff] [blame] | 271 | func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t { |
Juha Hyttinen | 1307b2d | 2020-04-15 13:45:51 +0300 | [diff] [blame] | 272 | if params.Mbuf != nil { |
| 273 | m.Free(params.Mbuf) |
| 274 | params.Mbuf = nil |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 275 | } |
| 276 | |
Juha Hyttinen | 1307b2d | 2020-04-15 13:45:51 +0300 | [diff] [blame] | 277 | payLen := len(params.Payload) |
| 278 | if params.PayloadLen != 0 { |
| 279 | payLen = params.PayloadLen |
| 280 | } |
| 281 | |
| 282 | txBuffer := m.Allocate(payLen) |
| 283 | if txBuffer == nil { |
| 284 | return nil |
| 285 | } |
Mohamed Abukar | f0ee2c6 | 2019-10-14 19:29:24 +0300 | [diff] [blame] | 286 | txBuffer.mtype = C.int(params.Mtype) |
| 287 | txBuffer.sub_id = C.int(params.SubId) |
Juha Hyttinen | 1307b2d | 2020-04-15 13:45:51 +0300 | [diff] [blame] | 288 | txBuffer.len = C.int(payLen) |
| 289 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 290 | datap := C.CBytes(params.Payload) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 291 | defer C.free(datap) |
| 292 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 293 | if params != nil { |
| 294 | if params.Meid != nil { |
| 295 | b := make([]byte, int(C.RMR_MAX_MEID)) |
Mohamed Abukar | 19461e1 | 2019-08-23 08:46:11 +0300 | [diff] [blame] | 296 | copy(b, []byte(params.Meid.RanName)) |
Mohamed Abukar | f0ee2c6 | 2019-10-14 19:29:24 +0300 | [diff] [blame] | 297 | C.rmr_bytes2meid(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b))) |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 298 | } |
| 299 | xidLen := len(params.Xid) |
| 300 | if xidLen > 0 && xidLen <= C.RMR_MAX_XID { |
Juha Hyttinen | 6e075ce | 2019-11-06 08:42:34 +0200 | [diff] [blame] | 301 | b := make([]byte, int(C.RMR_MAX_XID)) |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 302 | copy(b, []byte(params.Xid)) |
Mohamed Abukar | f0ee2c6 | 2019-10-14 19:29:24 +0300 | [diff] [blame] | 303 | C.rmr_bytes2xact(txBuffer, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b))) |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 304 | } |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 305 | } |
Mohamed Abukar | f0ee2c6 | 2019-10-14 19:29:24 +0300 | [diff] [blame] | 306 | C.write_bytes_array(txBuffer.payload, datap, txBuffer.len) |
wahidw | e0948fe | 2020-03-19 14:49:41 +0000 | [diff] [blame] | 307 | return txBuffer |
| 308 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 309 | |
wahidw | e0948fe | 2020-03-19 14:49:41 +0000 | [diff] [blame] | 310 | func (m *RMRClient) Send(params *RMRParams, isRts bool) bool { |
| 311 | |
| 312 | txBuffer := m.CopyBuffer(params) |
| 313 | if txBuffer == nil { |
| 314 | return false |
| 315 | } |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 316 | params.status = m.SendBuf(txBuffer, isRts, params.Whid) |
Mohamed Abukar | 791a77f | 2020-02-12 19:08:42 +0200 | [diff] [blame] | 317 | if params.status == int(C.RMR_OK) { |
| 318 | return true |
| 319 | } |
| 320 | return false |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 321 | } |
| 322 | |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 323 | func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool, whid int) int { |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 324 | var ( |
| 325 | currBuffer *C.rmr_mbuf_t |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 326 | counterName string = "Transmitted" |
| 327 | ) |
| 328 | |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 329 | m.contextMux.Lock() |
Mohamed Abukar | f0ee2c6 | 2019-10-14 19:29:24 +0300 | [diff] [blame] | 330 | txBuffer.state = 0 |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 331 | if whid != 0 { |
| 332 | currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) |
Mohamed Abukar | f0ee2c6 | 2019-10-14 19:29:24 +0300 | [diff] [blame] | 333 | } else { |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 334 | if isRts { |
| 335 | currBuffer = C.rmr_rts_msg(m.context, txBuffer) |
| 336 | } else { |
| 337 | currBuffer = C.rmr_send_msg(m.context, txBuffer) |
| 338 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 339 | } |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 340 | m.contextMux.Unlock() |
Mohamed Abukar | f0ee2c6 | 2019-10-14 19:29:24 +0300 | [diff] [blame] | 341 | |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 342 | if currBuffer == nil { |
| 343 | m.UpdateStatCounter("TransmitError") |
Mohamed Abukar | 467a99f | 2019-11-20 16:07:49 +0200 | [diff] [blame] | 344 | return m.LogMBufError("SendBuf failed", txBuffer) |
Mohamed Abukar | f0ee2c6 | 2019-10-14 19:29:24 +0300 | [diff] [blame] | 345 | } |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 346 | |
| 347 | // Just quick retry seems to help for K8s issue |
Mohamed Abukar | 791a77f | 2020-02-12 19:08:42 +0200 | [diff] [blame] | 348 | maxRetryOnFailure := viper.GetInt("rmr.maxRetryOnFailure") |
| 349 | if maxRetryOnFailure == 0 { |
| 350 | maxRetryOnFailure = 5 |
| 351 | } |
| 352 | |
| 353 | for j := 0; j < maxRetryOnFailure && currBuffer != nil && currBuffer.state == C.RMR_ERR_RETRY; j++ { |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 354 | m.contextMux.Lock() |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 355 | if whid != 0 { |
| 356 | currBuffer = C.rmr_wh_send_msg(m.context, C.rmr_whid_t(whid), txBuffer) |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 357 | } else { |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 358 | if isRts { |
| 359 | currBuffer = C.rmr_rts_msg(m.context, txBuffer) |
| 360 | } else { |
| 361 | currBuffer = C.rmr_send_msg(m.context, txBuffer) |
| 362 | } |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 363 | } |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 364 | m.contextMux.Unlock() |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 365 | } |
| 366 | |
| 367 | if currBuffer.state != C.RMR_OK { |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 368 | counterName = "TransmitError" |
Mohamed Abukar | 791a77f | 2020-02-12 19:08:42 +0200 | [diff] [blame] | 369 | m.LogMBufError("SendBuf failed", currBuffer) |
Mohamed Abukar | d5dc8e1 | 2019-10-15 09:58:17 +0300 | [diff] [blame] | 370 | } |
| 371 | |
| 372 | m.UpdateStatCounter(counterName) |
Juha Hyttinen | d1fcac7 | 2020-02-25 09:20:26 +0200 | [diff] [blame] | 373 | defer m.Free(currBuffer) |
Mohamed Abukar | 791a77f | 2020-02-12 19:08:42 +0200 | [diff] [blame] | 374 | |
| 375 | return int(currBuffer.state) |
| 376 | } |
| 377 | |
wahidw | e0948fe | 2020-03-19 14:49:41 +0000 | [diff] [blame] | 378 | func (m *RMRClient) SendCallMsg(params *RMRParams) (int, string) { |
| 379 | var ( |
| 380 | currBuffer *C.rmr_mbuf_t |
| 381 | counterName string = "Transmitted" |
| 382 | ) |
| 383 | txBuffer := m.CopyBuffer(params) |
| 384 | if txBuffer == nil { |
| 385 | return C.RMR_ERR_INITFAILED, "" |
| 386 | } |
| 387 | |
| 388 | txBuffer.state = 0 |
| 389 | |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 390 | m.contextMux.Lock() |
wahidw | e0948fe | 2020-03-19 14:49:41 +0000 | [diff] [blame] | 391 | currBuffer = C.rmr_wh_call(m.context, C.int(params.Whid), txBuffer, C.int(params.Callid), C.int(params.Timeout)) |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 392 | m.contextMux.Unlock() |
wahidw | e0948fe | 2020-03-19 14:49:41 +0000 | [diff] [blame] | 393 | |
| 394 | if currBuffer == nil { |
| 395 | m.UpdateStatCounter("TransmitError") |
| 396 | return m.LogMBufError("SendBuf failed", txBuffer), "" |
| 397 | } |
| 398 | |
| 399 | if currBuffer.state != C.RMR_OK { |
| 400 | counterName = "TransmitError" |
| 401 | m.LogMBufError("SendBuf failed", currBuffer) |
| 402 | } |
| 403 | |
| 404 | m.UpdateStatCounter(counterName) |
| 405 | defer m.Free(currBuffer) |
| 406 | |
| 407 | cptr := unsafe.Pointer(currBuffer.payload) |
| 408 | payload := C.GoBytes(cptr, C.int(currBuffer.len)) |
| 409 | |
| 410 | return int(currBuffer.state), string(payload) |
| 411 | } |
| 412 | |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 413 | func (m *RMRClient) Openwh(target string) C.rmr_whid_t { |
| 414 | return m.Wh_open(target) |
| 415 | } |
| 416 | |
| 417 | func (m *RMRClient) Wh_open(target string) C.rmr_whid_t { |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 418 | m.contextMux.Lock() |
| 419 | defer m.contextMux.Unlock() |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 420 | endpoint := C.CString(target) |
| 421 | return C.rmr_wh_open(m.context, endpoint) |
| 422 | } |
| 423 | |
| 424 | func (m *RMRClient) Closewh(whid int) { |
| 425 | m.Wh_close(C.rmr_whid_t(whid)) |
| 426 | } |
| 427 | |
| 428 | func (m *RMRClient) Wh_close(whid C.rmr_whid_t) { |
Juha Hyttinen | d28b8dd | 2020-05-27 09:21:08 +0300 | [diff] [blame^] | 429 | m.contextMux.Lock() |
| 430 | defer m.contextMux.Unlock() |
wahidw | ad0a271 | 2020-03-04 09:54:15 +0000 | [diff] [blame] | 431 | C.rmr_wh_close(m.context, whid) |
| 432 | } |
| 433 | |
Mohamed Abukar | 791a77f | 2020-02-12 19:08:42 +0200 | [diff] [blame] | 434 | func (m *RMRClient) IsRetryError(params *RMRParams) bool { |
| 435 | if params.status == int(C.RMR_ERR_RETRY) { |
| 436 | return true |
| 437 | } |
| 438 | return false |
| 439 | } |
| 440 | |
| 441 | func (m *RMRClient) IsNoEndPointError(params *RMRParams) bool { |
| 442 | if params.status == int(C.RMR_ERR_NOENDPT) { |
| 443 | return true |
| 444 | } |
| 445 | return false |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 446 | } |
| 447 | |
| 448 | func (m *RMRClient) UpdateStatCounter(name string) { |
| 449 | m.mux.Lock() |
| 450 | m.stat[name].Inc() |
| 451 | m.mux.Unlock() |
| 452 | } |
| 453 | |
| 454 | func (m *RMRClient) RegisterMetrics() { |
| 455 | m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR") |
| 456 | } |
| 457 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 458 | func (m *RMRClient) Wait() { |
| 459 | m.wg.Wait() |
| 460 | } |
| 461 | |
| 462 | func (m *RMRClient) IsReady() bool { |
| 463 | return m.ready != 0 |
| 464 | } |
| 465 | |
Mohamed Abukar | 192518d | 2019-06-11 18:06:50 +0300 | [diff] [blame] | 466 | func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) { |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 467 | m.readyCb = cb |
Mohamed Abukar | 192518d | 2019-06-11 18:06:50 +0300 | [diff] [blame] | 468 | m.readyCbParams = params |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 469 | } |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 470 | |
| 471 | func (m *RMRClient) GetRicMessageId(name string) (int, bool) { |
| 472 | id, ok := RICMessageTypes[name] |
| 473 | return id, ok |
| 474 | } |
| 475 | |
| 476 | func (m *RMRClient) GetRicMessageName(id int) (s string) { |
| 477 | for k, v := range RICMessageTypes { |
| 478 | if id == v { |
| 479 | return k |
| 480 | } |
| 481 | } |
| 482 | return |
| 483 | } |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 484 | |
Mohamed Abukar | 791a77f | 2020-02-12 19:08:42 +0200 | [diff] [blame] | 485 | func (m *RMRClient) LogMBufError(text string, mbuf *C.rmr_mbuf_t) int { |
Mohamed Abukar | 467a99f | 2019-11-20 16:07:49 +0200 | [diff] [blame] | 486 | Logger.Debug(fmt.Sprintf("rmrClient: %s -> [tp=%v] %v - %s", text, mbuf.tp_state, mbuf.state, RMRErrors[int(mbuf.state)])) |
Mohamed Abukar | 791a77f | 2020-02-12 19:08:42 +0200 | [diff] [blame] | 487 | return int(mbuf.state) |
Mohamed Abukar | 467a99f | 2019-11-20 16:07:49 +0200 | [diff] [blame] | 488 | } |
| 489 | |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 490 | // To be removed ... |
| 491 | func (m *RMRClient) GetStat() (r RMRStatistics) { |
| 492 | return |
| 493 | } |