blob: dbd5d5910f1c036eb7e59abb88cd11f2643d7ff3 [file] [log] [blame]
Peter Szilagyi16d84d62019-04-24 14:51:02 +00001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
wahidw761934a2019-11-27 06:07:26 +000017
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
20
Peter Szilagyi16d84d62019-04-24 14:51:02 +000021==================================================================================
22*/
23/*
wahidwbce67472020-06-15 13:52:55 +000024 Mnemonic: rmrpipe.go
25 Abstract: mangos (RMR) Pipeline SBI implementation
Peter Szilagyi16d84d62019-04-24 14:51:02 +000026 Date: 12 March 2019
27*/
28
29package sbi
30
wahidwbce67472020-06-15 13:52:55 +000031/*
32#include <rmr/rmr.h>
33*/
34import "C"
35
Peter Szilagyi16d84d62019-04-24 14:51:02 +000036import (
wahidwdd6b0562020-03-31 03:09:45 +000037 "bytes"
38 "crypto/md5"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000039 "errors"
wahidwa8596ec2019-12-05 06:30:42 +000040 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
prabhukaliswamyb47d12d2019-12-03 15:06:30 +000041 "routing-manager/pkg/rtmgr"
42 "strconv"
wahidwbce67472020-06-15 13:52:55 +000043 "strings"
wahidwdd6b0562020-03-31 03:09:45 +000044 "fmt"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000045)
46
wahidwbce67472020-06-15 13:52:55 +000047var rmrcallid = 1
48var rmrdynamiccallid = 201
49
50type RmrPush struct {
kalnagy92162652019-07-02 15:15:49 +020051 Sbi
wahidw37c510b2020-04-15 10:48:50 +000052 rcChan chan *xapp.RMRParams
Peter Szilagyi16d84d62019-04-24 14:51:02 +000053}
54
wahidwbce67472020-06-15 13:52:55 +000055type EPStatus struct {
56 endpoint string
57 status bool
58}
59
wahidwdd6b0562020-03-31 03:09:45 +000060type RMRParams struct {
wahidw37c510b2020-04-15 10:48:50 +000061 *xapp.RMRParams
wahidwdd6b0562020-03-31 03:09:45 +000062}
63
wahidwdd6b0562020-03-31 03:09:45 +000064func (params *RMRParams) String() string {
wahidw37c510b2020-04-15 10:48:50 +000065 var b bytes.Buffer
66 sum := md5.Sum(params.Payload)
67 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
68 return b.String()
wahidwdd6b0562020-03-31 03:09:45 +000069}
70
wahidwbce67472020-06-15 13:52:55 +000071func NewRmrPush() *RmrPush {
72 instance := new(RmrPush)
kalnagy92162652019-07-02 15:15:49 +020073 return instance
Peter Szilagyi16d84d62019-04-24 14:51:02 +000074}
75
wahidwbce67472020-06-15 13:52:55 +000076func (c *RmrPush) Initialize(ip string) error {
kalnagy92162652019-07-02 15:15:49 +020077 return nil
78}
79
wahidwbce67472020-06-15 13:52:55 +000080func (c *RmrPush) Terminate() error {
kalnagy92162652019-07-02 15:15:49 +020081 return nil
82}
83
wahidwbce67472020-06-15 13:52:55 +000084func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
wahidwdd6b0562020-03-31 03:09:45 +000085
wahidwa8596ec2019-12-05 06:30:42 +000086 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
wahidwbce67472020-06-15 13:52:55 +000087 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
wahidwdd6b0562020-03-31 03:09:45 +000088 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
wahidw37c510b2020-04-15 10:48:50 +000089 if ep.Whid < 0 {
wahidwdd6b0562020-03-31 03:09:45 +000090 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
wahidw37c510b2020-04-15 10:48:50 +000091 } else {
92 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
kalnagy92162652019-07-02 15:15:49 +020093 }
wahidwdd6b0562020-03-31 03:09:45 +000094
kalnagy92162652019-07-02 15:15:49 +020095 return nil
96}
97
wahidwbce67472020-06-15 13:52:55 +000098func (c *RmrPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
wahidwa8596ec2019-12-05 06:30:42 +000099 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
100 xapp.Logger.Debug("args: %v", *ep)
wahidwdd6b0562020-03-31 03:09:45 +0000101
102 xapp.Rmr.Closewh(ep.Whid)
kalnagy92162652019-07-02 15:15:49 +0200103 return nil
104}
105
wahidwbce67472020-06-15 13:52:55 +0000106func (c *RmrPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
kalnagy92162652019-07-02 15:15:49 +0200107 c.updateEndpoints(rcs, c)
108}
109
wahidwbce67472020-06-15 13:52:55 +0000110func (c *RmrPush) DistributeAll(policies *[]string) error {
wahidwa8596ec2019-12-05 06:30:42 +0000111 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
112 xapp.Logger.Debug("args: %v", *policies)
wahidwdd6b0562020-03-31 03:09:45 +0000113
wahidwbce67472020-06-15 13:52:55 +0000114 /*for _, ep := range rtmgr.Eps {
wahidwdd6b0562020-03-31 03:09:45 +0000115 go c.send(ep, policies)
wahidwbce67472020-06-15 13:52:55 +0000116 }*/
117 channel := make(chan EPStatus)
118
119 if rmrcallid == 200 {
120 rmrcallid = 1
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000121 }
wahidwdd6b0562020-03-31 03:09:45 +0000122
wahidwbce67472020-06-15 13:52:55 +0000123 for _, ep := range rtmgr.Eps {
124 go c.send_sync(ep, policies, channel, rmrcallid)
125 }
rangajal20374692020-08-11 06:51:17 +0000126 for rEp, id := range rtmgr.RmrEp {
127 go c.send_rt_process(rEp,id,policies,rmrcallid)
128 }
129
wahidwbce67472020-06-15 13:52:55 +0000130 rmrcallid++
131
132 count := 0
133 result := make([]EPStatus, len(rtmgr.Eps))
134 for i, _ := range result {
135 result[i] = <-channel
136 if result[i].status == true {
137 count++
138 } else {
139 xapp.Logger.Error("RMR send failed for endpoint %v", result[i].endpoint)
140 }
141 }
142
143 if count < len(rtmgr.Eps) {
144 return errors.New(" RMR response count " + string(count) + " is less than half of endpoint list " + string(len(rtmgr.Eps)))
145 }
146
147
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000148 return nil
149}
150
wahidwbce67472020-06-15 13:52:55 +0000151func (c *RmrPush) send_sync(ep *rtmgr.Endpoint, policies *[]string, channel chan EPStatus, call_id int) {
152 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
wahidwdd6b0562020-03-31 03:09:45 +0000153
wahidwbce67472020-06-15 13:52:55 +0000154 ret := c.send_data(ep, policies, call_id)
wahidw37c510b2020-04-15 10:48:50 +0000155
wahidwbce67472020-06-15 13:52:55 +0000156 channel <- EPStatus{ep.Uuid, ret}
wahidw37c510b2020-04-15 10:48:50 +0000157
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000158}
wahidwdd6b0562020-03-31 03:09:45 +0000159
wahidwbce67472020-06-15 13:52:55 +0000160func (c *RmrPush) send_data(ep *rtmgr.Endpoint, policies *[]string, call_id int) bool {
161 xapp.Logger.Debug("Invoked send_data to endpoint: " + ep.Uuid + " call_id: " + strconv.Itoa(call_id))
162 var state int
163 var retstr string
164
165 var policy = []byte{}
166
167 for _, pe := range *policies {
168 b:= []byte(pe)
169 for j:=0; j<len(b); j++{
170 policy = append(policy,b[j])
171 }
172 }
173 params := &RMRParams{&xapp.RMRParams{}}
174 params.Mtype = 20
175 params.PayloadLen = len(policy)
176 params.Payload =[]byte(policy)
177 params.Mbuf = nil
178 params.Whid = ep.Whid
179 params.Callid = call_id
180 params.Timeout = 200
181 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
182 routestatus := strings.Split(retstr," ")
183 if state != C.RMR_OK && routestatus[0] == "OK" {
184 xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
185 return false
186 } else {
187 xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
188 return true
189 }
190
191 xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
192 return false
193}
194
rangajal20374692020-08-11 06:51:17 +0000195func (c *RmrPush) CreateEndpoint(payload string,rmrsrc string)*rtmgr.Endpoint {
196 return c.createEndpoint(payload,rmrsrc, c)
wahidwdd6b0562020-03-31 03:09:45 +0000197}
198
wahidwbce67472020-06-15 13:52:55 +0000199func (c *RmrPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
wahidwdd6b0562020-03-31 03:09:45 +0000200 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
201 xapp.Logger.Debug("args: %v", *policies)
202
wahidwbce67472020-06-15 13:52:55 +0000203 if rmrdynamiccallid == 255 {
204 rmrdynamiccallid = 201
205 }
206
207 go c.send_data(ep, policies,rmrdynamiccallid)
208 rmrdynamiccallid++
wahidwdd6b0562020-03-31 03:09:45 +0000209
210 return nil
211}
rangajal20374692020-08-11 06:51:17 +0000212
213func (c *RmrPush) send_rt_process(ep string,whid int, policies *[]string, call_id int) bool {
214 xapp.Logger.Debug("Invoked send_rt_process to endpoint: " + ep + " call_id: " + strconv.Itoa(call_id) + "whid: " + strconv.Itoa(whid))
215 var state int
216 var retstr string
217
218 var policy = []byte{}
219
220 for _, pe := range *policies {
221 b:= []byte(pe)
222 for j:=0; j<len(b); j++{
223 policy = append(policy,b[j])
224 }
225 }
226 params := &RMRParams{&xapp.RMRParams{}}
227 params.Mtype = 20
228 params.PayloadLen = len(policy)
229 params.Payload =[]byte(policy)
230 params.Mbuf = nil
231 params.Whid = whid
232 params.Callid = call_id
233 params.Timeout = 200
234 state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
235 routestatus := strings.Split(retstr," ")
236 if state != C.RMR_OK && routestatus[0] == "OK" {
237 xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
238 return false
239 } else {
240 xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whi_id: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
241 return true
242 }
243
244 xapp.Logger.Error("Route Update to endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " xapp.Rmr.SendCallMsg not called")
245 return false
246}
247