blob: 1b0bed0eaf13c91a3bfd7e1f6e7349dff35377ee [file] [log] [blame]
Peter Szilagyi16d84d62019-04-24 14:51:02 +00001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
wahidw761934a2019-11-27 06:07:26 +000017
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
20
Peter Szilagyi16d84d62019-04-24 14:51:02 +000021==================================================================================
22*/
23/*
wahidwbce67472020-06-15 13:52:55 +000024 Mnemonic: rmrpipe.go
25 Abstract: mangos (RMR) Pipeline SBI implementation
Peter Szilagyi16d84d62019-04-24 14:51:02 +000026 Date: 12 March 2019
27*/
28
29package sbi
30
wahidwbce67472020-06-15 13:52:55 +000031/*
32#include <rmr/rmr.h>
33*/
34import "C"
35
Peter Szilagyi16d84d62019-04-24 14:51:02 +000036import (
wahidwdd6b0562020-03-31 03:09:45 +000037 "bytes"
38 "crypto/md5"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000039 "errors"
wahidwa8596ec2019-12-05 06:30:42 +000040 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
prabhukaliswamyb47d12d2019-12-03 15:06:30 +000041 "routing-manager/pkg/rtmgr"
42 "strconv"
wahidwbce67472020-06-15 13:52:55 +000043 "strings"
wahidwdd6b0562020-03-31 03:09:45 +000044 "fmt"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000045)
46
wahidwbce67472020-06-15 13:52:55 +000047var rmrcallid = 1
48var rmrdynamiccallid = 201
49
50type RmrPush struct {
kalnagy92162652019-07-02 15:15:49 +020051 Sbi
wahidw37c510b2020-04-15 10:48:50 +000052 rcChan chan *xapp.RMRParams
Peter Szilagyi16d84d62019-04-24 14:51:02 +000053}
54
wahidwbce67472020-06-15 13:52:55 +000055type EPStatus struct {
56 endpoint string
57 status bool
58}
59
wahidwdd6b0562020-03-31 03:09:45 +000060type RMRParams struct {
wahidw37c510b2020-04-15 10:48:50 +000061 *xapp.RMRParams
wahidwdd6b0562020-03-31 03:09:45 +000062}
63
wahidwdd6b0562020-03-31 03:09:45 +000064func (params *RMRParams) String() string {
wahidw37c510b2020-04-15 10:48:50 +000065 var b bytes.Buffer
66 sum := md5.Sum(params.Payload)
67 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
68 return b.String()
wahidwdd6b0562020-03-31 03:09:45 +000069}
70
wahidwbce67472020-06-15 13:52:55 +000071func NewRmrPush() *RmrPush {
72 instance := new(RmrPush)
kalnagy92162652019-07-02 15:15:49 +020073 return instance
Peter Szilagyi16d84d62019-04-24 14:51:02 +000074}
75
wahidwbce67472020-06-15 13:52:55 +000076func (c *RmrPush) Initialize(ip string) error {
kalnagy92162652019-07-02 15:15:49 +020077 return nil
78}
79
wahidwbce67472020-06-15 13:52:55 +000080func (c *RmrPush) Terminate() error {
kalnagy92162652019-07-02 15:15:49 +020081 return nil
82}
83
wahidwbce67472020-06-15 13:52:55 +000084func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
wahidwdd6b0562020-03-31 03:09:45 +000085
wahidwa8596ec2019-12-05 06:30:42 +000086 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
wahidwbce67472020-06-15 13:52:55 +000087 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
wahidwdd6b0562020-03-31 03:09:45 +000088 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
wahidw37c510b2020-04-15 10:48:50 +000089 if ep.Whid < 0 {
wahidwdd6b0562020-03-31 03:09:45 +000090 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
wahidw37c510b2020-04-15 10:48:50 +000091 } else {
92 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
kalnagy92162652019-07-02 15:15:49 +020093 }
wahidwdd6b0562020-03-31 03:09:45 +000094
kalnagy92162652019-07-02 15:15:49 +020095 return nil
96}
97
wahidwbce67472020-06-15 13:52:55 +000098func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
wahidwa8596ec2019-12-05 06:30:42 +000099 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100 xapp.Logger.Debug("args: %v", *ep)
wahidwdd6b0562020-03-31 03:09:45 +0000101
102 xapp.Rmr.Closewh(ep.Whid)
kalnagy92162652019-07-02 15:15:49 +0200103 return nil
104}
105
wahidwbce67472020-06-15 13:52:55 +0000106func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
kalnagy92162652019-07-02 15:15:49 +0200107 c.updateEndpoints(rcs, c)
108}
109
wahidwbce67472020-06-15 13:52:55 +0000110func (c *RmrPush) DistributeAll(policies *[]string) error {
wahidwa8596ec2019-12-05 06:30:42 +0000111 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112 xapp.Logger.Debug("args: %v", *policies)
wahidwdd6b0562020-03-31 03:09:45 +0000113
wahidwbce67472020-06-15 13:52:55 +0000114 /*for _, ep := range rtmgr.Eps {
wahidwdd6b0562020-03-31 03:09:45 +0000115 go c.send(ep, policies)
wahidwbce67472020-06-15 13:52:55 +0000116 }*/
117 channel := make(chan EPStatus)
118
119 if rmrcallid == 200 {
120 rmrcallid = 1
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000121 }
wahidwdd6b0562020-03-31 03:09:45 +0000122
wahidwbce67472020-06-15 13:52:55 +0000123 for _, ep := range rtmgr.Eps {
124 go c.send_sync(ep, policies, channel, rmrcallid)
125 }
rangajal20374692020-08-11 06:51:17 +0000126
wahidwbce67472020-06-15 13:52:55 +0000127 rmrcallid++
128
129 count := 0
130 result := make([]EPStatus, len(rtmgr.Eps))
131 for i, _ := range result {
132 result[i] = <-channel
133 if result[i].status == true {
134 count++
135 } else {
136 xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
137 }
138 }
139
140 if count < len(rtmgr.Eps) {
141 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
142 }
143
144
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000145 return nil
146}
147
wahidwbce67472020-06-15 13:52:55 +0000148func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
149 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
wahidwdd6b0562020-03-31 03:09:45 +0000150
wahidwbce67472020-06-15 13:52:55 +0000151 ret := c.send_data(ep, policies, call_id)
wahidw37c510b2020-04-15 10:48:50 +0000152
wahidwbce67472020-06-15 13:52:55 +0000153 channel <- EPStatus{ep.Uuid, ret}
wahidw37c510b2020-04-15 10:48:50 +0000154
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000155}
wahidwdd6b0562020-03-31 03:09:45 +0000156
wahidwbce67472020-06-15 13:52:55 +0000157func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
158 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
159 var state int
160 var retstr string
161
162 var policy = []byte{}
163
164 for _, pe := range *policies {
165 b:= []byte(pe)
166 for j:=0; j<len(b); j++{
167 policy = append(policy,b[j])
168 }
169 }
170 params := &RMRParams{&xapp.RMRParams{}}
171 params.Mtype = 20
172 params.PayloadLen = len(policy)
173 params.Payload =[]byte(policy)
174 params.Mbuf = nil
175 params.Whid = ep.Whid
176 params.Callid = call_id
177 params.Timeout = 200
178 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
179 routestatus := strings.Split(retstr," ")
180 if state != C.RMR_OK && routestatus[0] == "OK" {
181 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
182 return false
183 } else {
184 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
185 return true
186 }
187
188 xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
189 return false
190}
191
wahidw6a9ce492020-08-15 11:29:43 +0530192func (c *RmrPush) CreateEndpoint(payload string,rmrsrc string)(ep *string,whid int) {
rangajal20374692020-08-11 06:51:17 +0000193 return c.createEndpoint(payload,rmrsrc, c)
wahidwdd6b0562020-03-31 03:09:45 +0000194}
195
wahidw6a9ce492020-08-15 11:29:43 +0530196func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
wahidwdd6b0562020-03-31 03:09:45 +0000197 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
198 xapp.Logger.Debug("args: %v", *policies)
199
wahidwbce67472020-06-15 13:52:55 +0000200 if rmrdynamiccallid == 255 {
201 rmrdynamiccallid = 201
202 }
203
wahidw6a9ce492020-08-15 11:29:43 +0530204 go c.sendDynamicRoutes(ep, whid, policies,rmrdynamiccallid)
wahidwbce67472020-06-15 13:52:55 +0000205 rmrdynamiccallid++
wahidwdd6b0562020-03-31 03:09:45 +0000206
207 return nil
208}
rangajal20374692020-08-11 06:51:17 +0000209
wahidw6a9ce492020-08-15 11:29:43 +0530210func (c *RmrPush) sendDynamicRoutes(ep string,whid int, policies *[]string, call_id int) bool {
rangajal20374692020-08-11 06:51:17 +0000211 xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
212 var state int
213 var retstr string
214
215 var policy = []byte{}
216
217 for _, pe := range *policies {
218 b:= []byte(pe)
219 for j:=0; j<len(b); j++{
220 policy = append(policy,b[j])
221 }
222 }
223 params := &RMRParams{&xapp.RMRParams{}}
224 params.Mtype = 20
225 params.PayloadLen = len(policy)
226 params.Payload =[]byte(policy)
227 params.Mbuf = nil
228 params.Whid = whid
229 params.Callid = call_id
230 params.Timeout = 200
231 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
232 routestatus := strings.Split(retstr," ")
233 if state != C.RMR_OK && routestatus[0] == "OK" {
234 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
235 return false
236 } else {
237 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whi_id: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
238 return true
239 }
240
241 xapp.Logger.Error("Route Update to endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " xapp.Rmr.SendCallMsg not called")
242 return false
243}
244