Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 1 | /* |
| 2 | ================================================================================== |
| 3 | Copyright (c) 2019 AT&T Intellectual Property. |
| 4 | Copyright (c) 2019 Nokia |
| 5 | |
| 6 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | you may not use this file except in compliance with the License. |
| 8 | You may obtain a copy of the License at |
| 9 | |
| 10 | http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | |
| 12 | Unless required by applicable law or agreed to in writing, software |
| 13 | distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | See the License for the specific language governing permissions and |
| 16 | limitations under the License. |
| 17 | ================================================================================== |
| 18 | */ |
| 19 | |
| 20 | package xapp |
| 21 | |
| 22 | /* |
| 23 | #include <time.h> |
| 24 | #include <stdlib.h> |
| 25 | #include <stdio.h> |
| 26 | #include <string.h> |
| 27 | #include <rmr/rmr.h> |
| 28 | #include <rmr/RIC_message_types.h> |
| 29 | |
| 30 | void write_bytes_array(unsigned char *dst, void *data, int len) { |
| 31 | memcpy((void *)dst, (void *)data, len); |
| 32 | } |
| 33 | |
| 34 | #cgo CFLAGS: -I../ |
| 35 | #cgo LDFLAGS: -lrmr_nng -lnng |
| 36 | */ |
| 37 | import "C" |
| 38 | |
| 39 | import ( |
| 40 | "github.com/spf13/viper" |
| 41 | "strconv" |
| 42 | "sync" |
| 43 | "time" |
| 44 | "unsafe" |
| 45 | ) |
| 46 | |
| 47 | var RMRCounterOpts = []CounterOpts{ |
| 48 | {Name: "Transmitted", Help: "The total number of transmited RMR messages"}, |
| 49 | {Name: "Received", Help: "The total number of received RMR messages"}, |
| 50 | {Name: "TransmitError", Help: "The total number of RMR transmission errors"}, |
| 51 | {Name: "ReceiveError", Help: "The total number of RMR receive errors"}, |
| 52 | } |
| 53 | |
| 54 | // To be removed ... |
| 55 | type RMRStatistics struct{} |
| 56 | |
| 57 | type RMRClient struct { |
Mohamed Abukar | 192518d | 2019-06-11 18:06:50 +0300 | [diff] [blame^] | 58 | context unsafe.Pointer |
| 59 | ready int |
| 60 | wg sync.WaitGroup |
| 61 | mux sync.Mutex |
| 62 | stat map[string]Counter |
| 63 | consumers []MessageConsumer |
| 64 | readyCb ReadyCB |
| 65 | readyCbParams interface{} |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 66 | } |
| 67 | |
| 68 | type MessageConsumer interface { |
| 69 | Consume(mtype int, sid int, len int, payload []byte) error |
| 70 | } |
| 71 | |
| 72 | func NewRMRClient() *RMRClient { |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 73 | p := C.CString(viper.GetString("rmr.protPort")) |
| 74 | m := C.int(viper.GetInt("rmr.maxSize")) |
| 75 | defer C.free(unsafe.Pointer(p)) |
| 76 | |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 77 | ctx := C.rmr_init(p, m, C.int(0)) |
| 78 | if ctx == nil { |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 79 | Logger.Error("rmrClient: Initializing RMR context failed, bailing out!") |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 80 | } |
| 81 | |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 82 | return &RMRClient{ |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 83 | context: ctx, |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 84 | consumers: make([]MessageConsumer, 0), |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 85 | stat: Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"), |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 86 | } |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 87 | } |
| 88 | |
| 89 | func (m *RMRClient) Start(c MessageConsumer) { |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 90 | for { |
| 91 | Logger.Info("rmrClient: Waiting for RMR to be ready ...") |
| 92 | |
| 93 | if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 { |
| 94 | break |
| 95 | } |
| 96 | time.Sleep(10 * time.Second) |
| 97 | } |
| 98 | m.wg.Add(viper.GetInt("rmr.numWorkers")) |
| 99 | |
| 100 | if c != nil { |
| 101 | m.consumers = append(m.consumers, c) |
| 102 | } |
| 103 | |
| 104 | for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ { |
| 105 | go m.Worker("worker-"+strconv.Itoa(w), 0) |
| 106 | } |
| 107 | |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 108 | if m.readyCb != nil { |
Mohamed Abukar | 192518d | 2019-06-11 18:06:50 +0300 | [diff] [blame^] | 109 | m.readyCb(m.readyCbParams) |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 110 | } |
| 111 | |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 112 | m.Wait() |
| 113 | } |
| 114 | |
| 115 | func (m *RMRClient) Worker(taskName string, msgSize int) { |
| 116 | p := viper.GetString("rmr.protPort") |
| 117 | Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p) |
| 118 | |
| 119 | defer m.wg.Done() |
| 120 | for { |
| 121 | rxBuffer := C.rmr_rcv_msg(m.context, nil) |
| 122 | if rxBuffer == nil { |
| 123 | m.UpdateStatCounter("ReceiveError") |
| 124 | continue |
| 125 | } |
| 126 | m.UpdateStatCounter("Received") |
| 127 | |
| 128 | go m.parseMessage(rxBuffer) |
| 129 | } |
| 130 | } |
| 131 | |
| 132 | func (m *RMRClient) parseMessage(rxBuffer *C.rmr_mbuf_t) { |
| 133 | if len(m.consumers) == 0 { |
| 134 | Logger.Info("rmrClient: No message handlers defined, message discarded!") |
| 135 | return |
| 136 | } |
| 137 | |
| 138 | for _, c := range m.consumers { |
| 139 | cptr := unsafe.Pointer(rxBuffer.payload) |
| 140 | payload := C.GoBytes(cptr, C.int(rxBuffer.len)) |
| 141 | |
| 142 | err := c.Consume(int(rxBuffer.mtype), int(rxBuffer.sub_id), int(rxBuffer.len), payload) |
| 143 | if err != nil { |
| 144 | Logger.Warn("rmrClient: Consumer returned error: %v", err) |
| 145 | } |
| 146 | } |
| 147 | } |
| 148 | |
| 149 | func (m *RMRClient) Allocate() *C.rmr_mbuf_t { |
| 150 | buf := C.rmr_alloc_msg(m.context, 0) |
| 151 | if buf == nil { |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 152 | Logger.Error("rmrClient: Allocating message buffer failed!") |
Mohamed Abukar | 2e78e42 | 2019-06-02 11:45:52 +0300 | [diff] [blame] | 153 | } |
| 154 | |
| 155 | return buf |
| 156 | } |
| 157 | |
| 158 | func (m *RMRClient) Send(mtype int, sid int, len int, payload []byte) bool { |
| 159 | buf := m.Allocate() |
| 160 | |
| 161 | buf.mtype = C.int(mtype) |
| 162 | buf.sub_id = C.int(sid) |
| 163 | buf.len = C.int(len) |
| 164 | datap := C.CBytes(payload) |
| 165 | defer C.free(datap) |
| 166 | |
| 167 | C.write_bytes_array(buf.payload, datap, C.int(len)) |
| 168 | |
| 169 | return m.SendBuf(buf) |
| 170 | } |
| 171 | |
| 172 | func (m *RMRClient) SendBuf(txBuffer *C.rmr_mbuf_t) bool { |
| 173 | for i := 0; i < 10; i++ { |
| 174 | txBuffer.state = 0 |
| 175 | txBuffer := C.rmr_send_msg(m.context, txBuffer) |
| 176 | if txBuffer == nil { |
| 177 | break |
| 178 | } else if txBuffer.state != C.RMR_OK { |
| 179 | if txBuffer.state != C.RMR_ERR_RETRY { |
| 180 | time.Sleep(100 * time.Microsecond) |
| 181 | m.UpdateStatCounter("TransmitError") |
| 182 | } |
| 183 | for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ { |
| 184 | txBuffer = C.rmr_send_msg(m.context, txBuffer) |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | if txBuffer.state == C.RMR_OK { |
| 189 | m.UpdateStatCounter("Transmitted") |
| 190 | return true |
| 191 | } |
| 192 | } |
| 193 | m.UpdateStatCounter("TransmitError") |
| 194 | return false |
| 195 | } |
| 196 | |
| 197 | func (m *RMRClient) UpdateStatCounter(name string) { |
| 198 | m.mux.Lock() |
| 199 | m.stat[name].Inc() |
| 200 | m.mux.Unlock() |
| 201 | } |
| 202 | |
| 203 | func (m *RMRClient) RegisterMetrics() { |
| 204 | m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR") |
| 205 | } |
| 206 | |
| 207 | // To be removed ... |
| 208 | func (m *RMRClient) GetStat() (r RMRStatistics) { |
| 209 | return |
| 210 | } |
| 211 | |
| 212 | func (m *RMRClient) Wait() { |
| 213 | m.wg.Wait() |
| 214 | } |
| 215 | |
| 216 | func (m *RMRClient) IsReady() bool { |
| 217 | return m.ready != 0 |
| 218 | } |
| 219 | |
Mohamed Abukar | 192518d | 2019-06-11 18:06:50 +0300 | [diff] [blame^] | 220 | func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) { |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 221 | m.readyCb = cb |
Mohamed Abukar | 192518d | 2019-06-11 18:06:50 +0300 | [diff] [blame^] | 222 | m.readyCbParams = params |
Mohamed Abukar | 349a098 | 2019-06-08 18:15:42 +0300 | [diff] [blame] | 223 | } |
Mohamed Abukar | 775722c | 2019-06-10 16:41:57 +0300 | [diff] [blame] | 224 | |
| 225 | func (m *RMRClient) GetRicMessageId(name string) (int, bool) { |
| 226 | id, ok := RICMessageTypes[name] |
| 227 | return id, ok |
| 228 | } |
| 229 | |
| 230 | func (m *RMRClient) GetRicMessageName(id int) (s string) { |
| 231 | for k, v := range RICMessageTypes { |
| 232 | if id == v { |
| 233 | return k |
| 234 | } |
| 235 | } |
| 236 | return |
| 237 | } |