Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 1 | /* |
| 2 | ================================================================================== |
| 3 | Copyright (c) 2019 AT&T Intellectual Property. |
| 4 | Copyright (c) 2019 Nokia |
| 5 | |
| 6 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | you may not use this file except in compliance with the License. |
| 8 | You may obtain a copy of the License at |
| 9 | |
| 10 | http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | |
| 12 | Unless required by applicable law or agreed to in writing, software |
| 13 | distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | See the License for the specific language governing permissions and |
| 16 | limitations under the License. |
| 17 | ================================================================================== |
| 18 | */ |
| 19 | |
| 20 | package xapp |
| 21 | |
| 22 | /* |
| 23 | #include <time.h> |
| 24 | #include <stdlib.h> |
| 25 | #include <stdio.h> |
| 26 | #include <string.h> |
| 27 | #include <rmr/rmr.h> |
| 28 | #include <rmr/RIC_message_types.h> |
| 29 | |
| 30 | void write_bytes_array(unsigned char *dst, void *data, int len) { |
| 31 | memcpy((void *)dst, (void *)data, len); |
| 32 | } |
| 33 | |
| 34 | #cgo CFLAGS: -I../ |
| 35 | #cgo LDFLAGS: -lrmr_nng -lnng |
| 36 | */ |
| 37 | import "C" |
| 38 | |
| 39 | import ( |
| 40 | "github.com/spf13/viper" |
| 41 | "strconv" |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 42 | "strings" |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 43 | "time" |
| 44 | "unsafe" |
| 45 | ) |
| 46 | |
| 47 | var RMRCounterOpts = []CounterOpts{ |
| 48 | {Name: "Transmitted", Help: "The total number of transmited RMR messages"}, |
| 49 | {Name: "Received", Help: "The total number of received RMR messages"}, |
| 50 | {Name: "TransmitError", Help: "The total number of RMR transmission errors"}, |
| 51 | {Name: "ReceiveError", Help: "The total number of RMR receive errors"}, |
| 52 | } |
| 53 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 54 | type RMRParams struct { |
Mohamed Abukar | 19461e1 | 2019-08-23 08:46:11 +0300 | [diff] [blame] | 55 | Mtype int |
| 56 | Payload []byte |
| 57 | PayloadLen int |
| 58 | Meid *RMRMeid |
| 59 | Xid string |
| 60 | SubId int |
| 61 | Src string |
| 62 | Mbuf *C.rmr_mbuf_t |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 63 | } |
| 64 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 65 | func NewRMRClient() *RMRClient { |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 66 | p := C.CString(viper.GetString("rmr.protPort")) |
| 67 | m := C.int(viper.GetInt("rmr.maxSize")) |
| 68 | defer C.free(unsafe.Pointer(p)) |
| 69 | |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 70 | ctx := C.rmr_init(p, m, C.int(0)) |
| 71 | if ctx == nil { |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 72 | Logger.Error("rmrClient: Initializing RMR context failed, bailing out!") |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 73 | } |
| 74 | |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 75 | return &RMRClient{ |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 76 | context: ctx, |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 77 | consumers: make([]MessageConsumer, 0), |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 78 | stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"), |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 79 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 80 | } |
| 81 | |
| 82 | func (m *RMRClient) Start(c MessageConsumer) { |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 83 | if c != nil { |
| 84 | m.consumers = append(m.consumers, c) |
| 85 | } |
| 86 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 87 | for { |
| 88 | Logger.Info("rmrClient: Waiting for RMR to be ready ...") |
| 89 | |
| 90 | if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 { |
| 91 | break |
| 92 | } |
| 93 | time.Sleep(10 * time.Second) |
| 94 | } |
| 95 | m.wg.Add(viper.GetInt("rmr.numWorkers")) |
| 96 | |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 97 | if m.readyCb != nil { |
| 98 | go m.readyCb(m.readyCbParams) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 99 | } |
| 100 | |
| 101 | for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ { |
| 102 | go m.Worker("worker-"+strconv.Itoa(w), 0) |
| 103 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 104 | m.Wait() |
| 105 | } |
| 106 | |
| 107 | func (m *RMRClient) Worker(taskName string, msgSize int) { |
| 108 | p := viper.GetString("rmr.protPort") |
| 109 | Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p) |
| 110 | |
| 111 | defer m.wg.Done() |
| 112 | for { |
| 113 | rxBuffer := C.rmr_rcv_msg(m.context, nil) |
| 114 | if rxBuffer == nil { |
| 115 | m.UpdateStatCounter("ReceiveError") |
| 116 | continue |
| 117 | } |
| 118 | m.UpdateStatCounter("Received") |
| 119 | |
| 120 | go m.parseMessage(rxBuffer) |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { |
| 125 | if len(m.consumers) == 0 { |
| 126 | Logger.Info("rmrClient: No message handlers defined, message discarded!") |
| 127 | return |
| 128 | } |
| 129 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 130 | params := &RMRParams{} |
| 131 | params.Mbuf = rxBuffer |
| 132 | params.Mtype = int(rxBuffer.mtype) |
| 133 | params.SubId = int(rxBuffer.sub_id) |
| 134 | params.Meid = &RMRMeid{} |
| 135 | |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 136 | meidBuf := make([]byte, int(C.RMR_MAX_MEID)) |
| 137 | if meidCstr := C.rmr_get_meid(rxBuffer, (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil { |
Mohamed Abukar | 19461e1 | 2019-08-23 08:46:11 +0300 | [diff] [blame] | 138 | params.Meid.RanName = strings.TrimRight(string(meidBuf), "\000") |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 139 | } |
| 140 | |
| 141 | xidBuf := make([]byte, int(C.RMR_MAX_XID)) |
| 142 | if xidCstr := C.rmr_get_xact(rxBuffer, (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil { |
| 143 | params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000") |
| 144 | } |
| 145 | |
| 146 | srcBuf := make([]byte, int(C.RMR_MAX_SRC)) |
| 147 | if srcStr := C.rmr_get_src(rxBuffer, (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil { |
| 148 | params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000") |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 149 | } |
| 150 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 151 | for _, c := range m.consumers { |
| 152 | cptr := unsafe.Pointer(rxBuffer.payload) |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 153 | params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len)) |
| 154 | params.PayloadLen = int(rxBuffer.len) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 155 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 156 | err := c.Consume(params) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 157 | if err != nil { |
| 158 | Logger.Warn("rmrClient: Consumer returned error: %v", err) |
| 159 | } |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | func (m *RMRClient) Allocate() *C.rmr_mbuf_t { |
| 164 | buf := C.rmr_alloc_msg(m.context, 0) |
| 165 | if buf == nil { |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 166 | Logger.Error("rmrClient: Allocating message buffer failed!") |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 167 | } |
| 168 | |
| 169 | return buf |
| 170 | } |
| 171 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 172 | func (m *RMRClient) SendMsg(params *RMRParams) bool { |
| 173 | return m.Send(params, false) |
Mohamed Abukar | 09d0a37 | 2019-07-14 08:45:30 +0300 | [diff] [blame] | 174 | } |
| 175 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 176 | func (m *RMRClient) SendRts(params *RMRParams) bool { |
| 177 | return m.Send(params, true) |
| 178 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 179 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 180 | func (m *RMRClient) Send(params *RMRParams, isRts bool) bool { |
| 181 | buf := params.Mbuf |
| 182 | if buf == nil { |
| 183 | buf = m.Allocate() |
| 184 | } |
| 185 | |
| 186 | buf.mtype = C.int(params.Mtype) |
| 187 | buf.sub_id = C.int(params.SubId) |
| 188 | buf.len = C.int(len(params.Payload)) |
Mohamed Abukar | 688e6c9 | 2019-09-05 13:12:42 +0300 | [diff] [blame] | 189 | if params.PayloadLen != 0 { |
Mohamed Abukar | 549a131 | 2019-09-05 13:17:49 +0300 | [diff] [blame] | 190 | buf.len = C.int(params.PayloadLen) |
Mohamed Abukar | 688e6c9 | 2019-09-05 13:12:42 +0300 | [diff] [blame] | 191 | } |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 192 | datap := C.CBytes(params.Payload) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 193 | defer C.free(datap) |
| 194 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 195 | if params != nil { |
| 196 | if params.Meid != nil { |
| 197 | b := make([]byte, int(C.RMR_MAX_MEID)) |
Mohamed Abukar | 19461e1 | 2019-08-23 08:46:11 +0300 | [diff] [blame] | 198 | copy(b, []byte(params.Meid.RanName)) |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 199 | C.rmr_bytes2meid(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b))) |
| 200 | } |
| 201 | xidLen := len(params.Xid) |
| 202 | if xidLen > 0 && xidLen <= C.RMR_MAX_XID { |
| 203 | b := make([]byte, int(C.RMR_MAX_MEID)) |
| 204 | copy(b, []byte(params.Xid)) |
| 205 | C.rmr_bytes2xact(buf, (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b))) |
| 206 | } |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 207 | } |
| 208 | C.write_bytes_array(buf.payload, datap, buf.len) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 209 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 210 | return m.SendBuf(buf, isRts) |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 211 | } |
| 212 | |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 213 | func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t, isRts bool) bool { |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 214 | for i := 0; i < 10; i++ { |
| 215 | txBuffer.state = 0 |
Mohamed Abukar | f11ab7a | 2019-08-14 16:55:01 +0300 | [diff] [blame] | 216 | if isRts { |
| 217 | txBuffer = C.rmr_rts_msg(m.context, txBuffer) |
| 218 | } else { |
| 219 | txBuffer = C.rmr_send_msg(m.context, txBuffer) |
| 220 | } |
| 221 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 222 | if txBuffer == nil { |
| 223 | break |
| 224 | } else if txBuffer.state != C.RMR_OK { |
| 225 | if txBuffer.state != C.RMR_ERR_RETRY { |
| 226 | time.Sleep(100 * time.Microsecond) |
| 227 | m.UpdateStatCounter("TransmitError") |
| 228 | } |
| 229 | for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ { |
| 230 | txBuffer = C.rmr_send_msg(m.context, txBuffer) |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | if txBuffer.state == C.RMR_OK { |
| 235 | m.UpdateStatCounter("Transmitted") |
| 236 | return true |
| 237 | } |
| 238 | } |
| 239 | m.UpdateStatCounter("TransmitError") |
| 240 | return false |
| 241 | } |
| 242 | |
| 243 | func (m *RMRClient) UpdateStatCounter(name string) { |
| 244 | m.mux.Lock() |
| 245 | m.stat[name].Inc() |
| 246 | m.mux.Unlock() |
| 247 | } |
| 248 | |
| 249 | func (m *RMRClient) RegisterMetrics() { |
| 250 | m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR") |
| 251 | } |
| 252 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 253 | func (m *RMRClient) Wait() { |
| 254 | m.wg.Wait() |
| 255 | } |
| 256 | |
| 257 | func (m *RMRClient) IsReady() bool { |
| 258 | return m.ready != 0 |
| 259 | } |
| 260 | |
Mohamed Abukar | 192518d | 2019-06-11 18:06:50 +0300 | [diff] [blame] | 261 | func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) { |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 262 | m.readyCb = cb |
Mohamed Abukar | 192518d | 2019-06-11 18:06:50 +0300 | [diff] [blame] | 263 | m.readyCbParams = params |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 264 | } |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 265 | |
| 266 | func (m *RMRClient) GetRicMessageId(name string) (int, bool) { |
| 267 | id, ok := RICMessageTypes[name] |
| 268 | return id, ok |
| 269 | } |
| 270 | |
| 271 | func (m *RMRClient) GetRicMessageName(id int) (s string) { |
| 272 | for k, v := range RICMessageTypes { |
| 273 | if id == v { |
| 274 | return k |
| 275 | } |
| 276 | } |
| 277 | return |
| 278 | } |
Mohamed Abukar | d969b20 | 2019-07-01 17:14:44 +0300 | [diff] [blame] | 279 | |
| 280 | // To be removed ... |
| 281 | func (m *RMRClient) GetStat() (r RMRStatistics) { |
| 282 | return |
| 283 | } |