blob: 0bb14c34e0097b7a0bb50f8c9c7d47f05c366b21 [file] [log] [blame]
Peter Szilagyi16d84d62019-04-24 14:51:02 +00001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
wahidw761934a2019-11-27 06:07:26 +000017
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
20
Peter Szilagyi16d84d62019-04-24 14:51:02 +000021==================================================================================
22*/
23/*
24 Mnemonic: nngpipe.go
25 Abstract: mangos (NNG) Pipeline SBI implementation
26 Date: 12 March 2019
27*/
28
29package sbi
30
31import (
wahidwdd6b0562020-03-31 03:09:45 +000032 "bytes"
33 "crypto/md5"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000034 "errors"
wahidwa8596ec2019-12-05 06:30:42 +000035 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
prabhukaliswamyb47d12d2019-12-03 15:06:30 +000036 "routing-manager/pkg/rtmgr"
37 "strconv"
wahidw37c510b2020-04-15 10:48:50 +000038 //"time"
wahidwdd6b0562020-03-31 03:09:45 +000039 "fmt"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000040)
41
kalnagy92162652019-07-02 15:15:49 +020042type NngPush struct {
43 Sbi
wahidw37c510b2020-04-15 10:48:50 +000044 rcChan chan *xapp.RMRParams
Peter Szilagyi16d84d62019-04-24 14:51:02 +000045}
46
wahidwdd6b0562020-03-31 03:09:45 +000047type RMRParams struct {
wahidw37c510b2020-04-15 10:48:50 +000048 *xapp.RMRParams
wahidwdd6b0562020-03-31 03:09:45 +000049}
50
wahidwdd6b0562020-03-31 03:09:45 +000051func (params *RMRParams) String() string {
wahidw37c510b2020-04-15 10:48:50 +000052 var b bytes.Buffer
53 sum := md5.Sum(params.Payload)
54 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
55 return b.String()
wahidwdd6b0562020-03-31 03:09:45 +000056}
57
kalnagy92162652019-07-02 15:15:49 +020058func NewNngPush() *NngPush {
59 instance := new(NngPush)
kalnagy92162652019-07-02 15:15:49 +020060 return instance
Peter Szilagyi16d84d62019-04-24 14:51:02 +000061}
62
kalnagy92162652019-07-02 15:15:49 +020063func (c *NngPush) Initialize(ip string) error {
64 return nil
65}
66
67func (c *NngPush) Terminate() error {
68 return nil
69}
70
71func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
wahidwdd6b0562020-03-31 03:09:45 +000072
wahidwa8596ec2019-12-05 06:30:42 +000073 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
wahidwdd6b0562020-03-31 03:09:45 +000074 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
75 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
wahidw37c510b2020-04-15 10:48:50 +000076 if ep.Whid < 0 {
wahidwdd6b0562020-03-31 03:09:45 +000077 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
wahidw37c510b2020-04-15 10:48:50 +000078 } else {
79 xapp.Logger.Debug("Wormhole ID is %v and EP is %v", ep.Whid, endpoint)
kalnagy92162652019-07-02 15:15:49 +020080 }
wahidwdd6b0562020-03-31 03:09:45 +000081
kalnagy92162652019-07-02 15:15:49 +020082 return nil
83}
84
85func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
wahidwa8596ec2019-12-05 06:30:42 +000086 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
87 xapp.Logger.Debug("args: %v", *ep)
wahidwdd6b0562020-03-31 03:09:45 +000088
89 xapp.Rmr.Closewh(ep.Whid)
kalnagy92162652019-07-02 15:15:49 +020090 return nil
91}
92
93func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
94 c.updateEndpoints(rcs, c)
95}
96
kalnagy92162652019-07-02 15:15:49 +020097func (c *NngPush) DistributeAll(policies *[]string) error {
wahidwa8596ec2019-12-05 06:30:42 +000098 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
99 xapp.Logger.Debug("args: %v", *policies)
wahidwdd6b0562020-03-31 03:09:45 +0000100
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000101 for _, ep := range rtmgr.Eps {
wahidwdd6b0562020-03-31 03:09:45 +0000102 go c.send(ep, policies)
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000103 }
wahidwdd6b0562020-03-31 03:09:45 +0000104
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000105 return nil
106}
107
kalnagy92162652019-07-02 15:15:49 +0200108func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
wahidwa8596ec2019-12-05 06:30:42 +0000109 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
wahidwdd6b0562020-03-31 03:09:45 +0000110
wahidw37c510b2020-04-15 10:48:50 +0000111 var policy = []byte{}
112 cumulative_policy := 0
113 count := 0
114 maxrecord := xapp.Config.GetInt("maxrecord")
115 if maxrecord == 0 {
116 maxrecord = 10
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000117 }
wahidw37c510b2020-04-15 10:48:50 +0000118
119 for _, pe := range *policies {
120 b := []byte(pe)
121 for j := 0; j < len(b); j++ {
122 policy = append(policy, b[j])
123 }
124 count++
125 cumulative_policy++
126 if count == maxrecord || cumulative_policy == len(*policies) {
127 params := &RMRParams{&xapp.RMRParams{}}
128 params.Mtype = 20
129 params.PayloadLen = len(policy)
130 params.Payload = []byte(policy)
131 params.Mbuf = nil
132 params.Whid = ep.Whid
133 xapp.Rmr.SendMsg(params.RMRParams)
134 count = 0
135 policy = nil
136 xapp.Logger.Debug("Sent message with payload len = %d", params.PayloadLen)
137 }
138 }
139
wahidwa8596ec2019-12-05 06:30:42 +0000140 xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000141}
wahidwdd6b0562020-03-31 03:09:45 +0000142
wahidw37c510b2020-04-15 10:48:50 +0000143func (c *NngPush) CreateEndpoint(payload string) *rtmgr.Endpoint {
wahidwdd6b0562020-03-31 03:09:45 +0000144 return c.createEndpoint(payload, c)
145}
146
147func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
148 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
149 xapp.Logger.Debug("args: %v", *policies)
150
151 go c.send(ep, policies)
152
153 return nil
154}