blob: 44512999cba04b37142b1ae9284272926d5289c9 [file] [log] [blame]
Peter Szilagyi16d84d62019-04-24 14:51:02 +00001/*
2==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
wahidw761934a2019-11-27 06:07:26 +000017
18 This source code is part of the near-RT RIC (RAN Intelligent Controller)
19 platform project (RICP).
20
Peter Szilagyi16d84d62019-04-24 14:51:02 +000021==================================================================================
22*/
23/*
24 Mnemonic: nngpipe.go
25 Abstract: mangos (NNG) Pipeline SBI implementation
26 Date: 12 March 2019
27*/
28
29package sbi
30
wahidwcd7867c2020-02-05 10:01:12 +000031/*
32#include <time.h>
33#include <stdlib.h>
34#include <stdio.h>
35#include <string.h>
36#include <rmr/rmr.h>
37#include <rmr/RIC_message_types.h>
38
39
40#cgo CFLAGS: -I../
41#cgo LDFLAGS: -lrmr_nng -lnng
42*/
43import "C"
44
Peter Szilagyi16d84d62019-04-24 14:51:02 +000045import (
wahidwdd6b0562020-03-31 03:09:45 +000046 "bytes"
47 "crypto/md5"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000048 "errors"
wahidwa8596ec2019-12-05 06:30:42 +000049 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000050 "nanomsg.org/go/mangos/v2"
51 "nanomsg.org/go/mangos/v2/protocol/push"
52 _ "nanomsg.org/go/mangos/v2/transport/all"
prabhukaliswamyb47d12d2019-12-03 15:06:30 +000053 "routing-manager/pkg/rtmgr"
54 "strconv"
wahidw0e38e1b2020-02-18 18:20:55 +000055 "time"
wahidwdd6b0562020-03-31 03:09:45 +000056 "fmt"
Peter Szilagyi16d84d62019-04-24 14:51:02 +000057)
58
kalnagy92162652019-07-02 15:15:49 +020059type NngPush struct {
60 Sbi
61 NewSocket CreateNewNngSocketHandler
wahidw0e38e1b2020-02-18 18:20:55 +000062 rcChan chan *xapp.RMRParams
Peter Szilagyi16d84d62019-04-24 14:51:02 +000063}
64
wahidwdd6b0562020-03-31 03:09:45 +000065type RMRParams struct {
66 *xapp.RMRParams
67}
68
69
70func (params *RMRParams) String() string {
71 var b bytes.Buffer
72 sum := md5.Sum(params.Payload)
73 fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Payhash=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid.RanName, params.PayloadLen, len(params.Payload), sum)
74 return b.String()
75}
76
kalnagy92162652019-07-02 15:15:49 +020077func NewNngPush() *NngPush {
78 instance := new(NngPush)
79 instance.NewSocket = createNewPushSocket
80 return instance
Peter Szilagyi16d84d62019-04-24 14:51:02 +000081}
82
kalnagy92162652019-07-02 15:15:49 +020083func createNewPushSocket() (NngSocket, error) {
wahidwa8596ec2019-12-05 06:30:42 +000084 xapp.Logger.Debug("Invoked: createNewPushSocket()")
kalnagy92162652019-07-02 15:15:49 +020085 socket, err := push.NewSocket()
Peter Szilagyi16d84d62019-04-24 14:51:02 +000086 if err != nil {
kalnagy92162652019-07-02 15:15:49 +020087 return nil, errors.New("can't create new push socket due to:" + err.Error())
Peter Szilagyi16d84d62019-04-24 14:51:02 +000088 }
kalnagy92162652019-07-02 15:15:49 +020089 socket.SetPipeEventHook(pipeEventHandler)
90 return socket, nil
Peter Szilagyi16d84d62019-04-24 14:51:02 +000091}
92
93func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
wahidwa8596ec2019-12-05 06:30:42 +000094 xapp.Logger.Debug("Invoked: pipeEventHandler()")
95 xapp.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
Peter Szilagyi16d84d62019-04-24 14:51:02 +000096 for _, ep := range rtmgr.Eps {
zkoczkaaaf8d392019-10-02 17:16:06 +020097 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
Peter Szilagyi16d84d62019-04-24 14:51:02 +000098 if uri == pipe.Address() {
99 switch event {
100 case 1:
101 ep.IsReady = true
wahidwa8596ec2019-12-05 06:30:42 +0000102 xapp.Logger.Debug("Endpoint " + uri + " successfully attached")
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000103 default:
104 ep.IsReady = false
wahidwa8596ec2019-12-05 06:30:42 +0000105 xapp.Logger.Debug("Endpoint " + uri + " has been detached")
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000106 }
kalnagy92162652019-07-02 15:15:49 +0200107 }
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000108 }
109}
110
kalnagy92162652019-07-02 15:15:49 +0200111func (c *NngPush) Initialize(ip string) error {
112 return nil
113}
114
115func (c *NngPush) Terminate() error {
116 return nil
117}
118
119func (c *NngPush) AddEndpoint(ep *rtmgr.Endpoint) error {
wahidwdd6b0562020-03-31 03:09:45 +0000120
wahidwa8596ec2019-12-05 06:30:42 +0000121 xapp.Logger.Debug("Invoked sbi.AddEndpoint")
122 xapp.Logger.Debug("args: %v", *ep)
wahidwdd6b0562020-03-31 03:09:45 +0000123 endpoint := ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
124 ep.Whid = int(xapp.Rmr.Openwh(endpoint))
125 if ep.Whid < 0 {
126 return errors.New("can't open warmhole connection for endpoint:" + ep.Uuid + " due to invalid Wormhole ID: " + string(ep.Whid))
127 }else {
128 xapp.Logger.Debug("Wormhole ID is %v and EP is %v",ep.Whid,endpoint)
kalnagy92162652019-07-02 15:15:49 +0200129 }
wahidwdd6b0562020-03-31 03:09:45 +0000130
kalnagy92162652019-07-02 15:15:49 +0200131 return nil
132}
133
134func (c *NngPush) DeleteEndpoint(ep *rtmgr.Endpoint) error {
wahidwa8596ec2019-12-05 06:30:42 +0000135 xapp.Logger.Debug("Invoked sbi. DeleteEndpoint")
136 xapp.Logger.Debug("args: %v", *ep)
wahidwdd6b0562020-03-31 03:09:45 +0000137
138 xapp.Rmr.Closewh(ep.Whid)
kalnagy92162652019-07-02 15:15:49 +0200139 return nil
140}
141
142func (c *NngPush) UpdateEndpoints(rcs *rtmgr.RicComponents) {
143 c.updateEndpoints(rcs, c)
144}
145
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000146/*
147NOTE: Asynchronous dial starts a goroutine which keep maintains the connection to the given endpoint
148*/
kalnagy92162652019-07-02 15:15:49 +0200149func (c *NngPush) dial(ep *rtmgr.Endpoint) error {
wahidwa8596ec2019-12-05 06:30:42 +0000150 xapp.Logger.Debug("Dialing to endpoint: " + ep.Uuid)
zkoczkaaaf8d392019-10-02 17:16:06 +0200151 uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000152 options := make(map[string]interface{})
153 options[mangos.OptionDialAsynch] = true
kalnagy92162652019-07-02 15:15:49 +0200154 if err := ep.Socket.(NngSocket).DialOptions(uri, options); err != nil {
155 return errors.New("can't dial on push socket to " + uri + " due to: " + err.Error())
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000156 }
kalnagy92162652019-07-02 15:15:49 +0200157 return nil
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000158}
159
kalnagy92162652019-07-02 15:15:49 +0200160func (c *NngPush) DistributeAll(policies *[]string) error {
wahidwa8596ec2019-12-05 06:30:42 +0000161 xapp.Logger.Debug("Invoked: sbi.DistributeAll")
162 xapp.Logger.Debug("args: %v", *policies)
wahidwdd6b0562020-03-31 03:09:45 +0000163
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000164 for _, ep := range rtmgr.Eps {
wahidwdd6b0562020-03-31 03:09:45 +0000165 go c.send(ep, policies)
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000166 }
wahidwdd6b0562020-03-31 03:09:45 +0000167
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000168 return nil
169}
170
kalnagy92162652019-07-02 15:15:49 +0200171func (c *NngPush) send(ep *rtmgr.Endpoint, policies *[]string) {
wahidwa8596ec2019-12-05 06:30:42 +0000172 xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
wahidwdd6b0562020-03-31 03:09:45 +0000173
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000174 for _, pe := range *policies {
wahidwdd6b0562020-03-31 03:09:45 +0000175 params := &RMRParams{&xapp.RMRParams{}}
176 params.Mtype = 20
177 params.PayloadLen = len([]byte(pe))
178 params.Payload =[]byte(pe)
179 params.Mbuf = nil
180 params.Whid = ep.Whid
181 time.Sleep(1 * time.Millisecond)
182 xapp.Rmr.SendMsg(params.RMRParams)
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000183 }
wahidwa8596ec2019-12-05 06:30:42 +0000184 xapp.Logger.Info("NNG PUSH to endpoint " + ep.Uuid + ": OK (# of Entries:" + strconv.Itoa(len(*policies)) + ")")
Peter Szilagyi16d84d62019-04-24 14:51:02 +0000185}
wahidwdd6b0562020-03-31 03:09:45 +0000186
187func (c *NngPush) CreateEndpoint(payload string) (*rtmgr.Endpoint){
188 return c.createEndpoint(payload, c)
189}
190
191func (c *NngPush) DistributeToEp(policies *[]string, ep *rtmgr.Endpoint) error {
192 xapp.Logger.Debug("Invoked: sbi.DistributeToEp")
193 xapp.Logger.Debug("args: %v", *policies)
194
195 go c.send(ep, policies)
196
197 return nil
198}
199