Add version v0.1.0
* Introduces NNGPush SBI module
* Bugfixes in argument handling
* different RMR policy generator for Push and Pub communication
Change-Id: I2907c0649619d1a02da6d15b41bc2ad668c6245b
Signed-off-by: Peter Szilagyi <peter.3.szilagyi@nokia.com>
diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go
index a2da825..83b3790 100644
--- a/pkg/sbi/sbi.go
+++ b/pkg/sbi/sbi.go
@@ -1,4 +1,5 @@
/*
+w
==================================================================================
Copyright (c) 2019 AT&T Intellectual Property.
Copyright (c) 2019 Nokia
@@ -28,8 +29,14 @@
"errors"
"fmt"
"rtmgr"
+ "strconv"
)
+const DEFAULT_NNG_PUBSUB_SOCKET_PREFIX = "tcp://"
+const DEFAULT_NNG_PUBSUB_SOCKET_NUMBER = 4560
+const DEFAULT_NNG_PIPELINE_SOCKET_PREFIX = "tcp://"
+const DEFAULT_NNG_PIPELINE_SOCKET_NUMBER = 4561
+
var (
SupportedSbis = []*SbiEngineConfig{
&SbiEngineConfig{
@@ -40,6 +47,8 @@
},
openSocket(openNngPub),
closeSocket(closeNngPub),
+ createEndpointSocket(createNngPubEndpointSocket),
+ destroyEndpointSocket(createNngPubEndpointSocket),
distributeAll(publishAll),
true,
},
@@ -49,10 +58,12 @@
Version: "v1",
Protocol: "nngpipeline",
},
- openSocket(nil),
- closeSocket(nil),
- distributeAll(nil),
- false,
+ openSocket(openNngPush),
+ closeSocket(closeNngPush),
+ createEndpointSocket(createNngPushEndpointSocket),
+ destroyEndpointSocket(destroyNngPushEndpointSocket),
+ distributeAll(pushAll),
+ true,
},
}
)
@@ -68,9 +79,50 @@
func GetSbi(sbiName string) (*SbiEngineConfig, error) {
for _, sbi := range SupportedSbis {
- if sbi.Engine.Name == sbiName && sbi.IsAvailable {
+ if (*sbi).Engine.Name == sbiName && (*sbi).IsAvailable {
return sbi, nil
}
}
- return nil, errors.New("SBI:" + sbiName + "is not supported or still not a available")
+ return nil, errors.New("SBI:" + sbiName + " is not supported or still not available")
+}
+
+func pruneEndpointList(sbii *SbiEngineConfig) {
+ for _, ep := range rtmgr.Eps {
+ if !ep.Keepalive {
+ sbii.DestroyEndpointSocket(ep)
+ delete(rtmgr.Eps, ep.Uuid)
+ } else {
+ rtmgr.Eps[ep.Uuid].Keepalive = false
+ }
+ }
+}
+
+func UpdateEndpointList(xapps *[]rtmgr.XApp, sbii *SbiEngineConfig) {
+ for _, xapp := range *xapps {
+ for _, instance := range xapp.Instances {
+ uuid := instance.Ip + ":" + strconv.Itoa(int(instance.Port))
+ if _, ok := rtmgr.Eps[uuid]; ok {
+ rtmgr.Eps[uuid].Keepalive = true
+ } else {
+ ep := &rtmgr.Endpoint{
+ uuid,
+ instance.Name,
+ xapp.Name,
+ instance.Ip,
+ instance.Port,
+ instance.TxMessages,
+ instance.RxMessages,
+ nil,
+ false,
+ true,
+ }
+ if err := sbii.CreateEndpointSocket(ep); err != nil {
+ rtmgr.Logger.Error("can't create socket for endpoint: " + ep.Name + " due to:" + err.Error())
+ continue
+ }
+ rtmgr.Eps[uuid] = ep
+ }
+ }
+ }
+ pruneEndpointList(sbii)
}