Fixed integration and subscription routes related issues for R3

Change-Id: I8f89ad2524b6dbe33f088f1685d9b3d5820b7e54
Signed-off-by: prabhukaliswamy <prabhu.k@nokia.com>
diff --git a/RELNOTES b/RELNOTES
index 92fde07..2d57a92 100644
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.4.1
+* Fixed integration and subscription routes related issues for R3
+
 ### v0.3.9
 * Added RSM platform component routes for message types RAN_CONNECTED, RAN_RESTARTED, RAN_RECONFIGURED, RIC_RES_STATUS_REQ,RIC_RES_STATUS_RESP and RIC_RES_STATUS_FAILURE
 * xApp manager interface changes for Subscription Request API
diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go
index 101c743..568b7ad 100644
--- a/cmd/rtmgr.go
+++ b/cmd/rtmgr.go
@@ -47,7 +47,7 @@
 )
 
 const SERVICENAME = "rtmgr"
-const INTERVAL time.Duration = 2
+const INTERVAL time.Duration = 60
 
 var (
 	args map[string]*string
@@ -123,7 +123,6 @@
 	go serveSBI(triggerSBI, sbiEngine, sdlEngine, rpeEngine)
 
 	for {
-		time.Sleep(INTERVAL * time.Second)
 		if *args["nbi"] == "httpGetter" {
 			data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXApps(*args["xm-url"])
 			if err != nil {
@@ -134,6 +133,9 @@
 		}
 
 		triggerSBI <- true
+
+		time.Sleep(INTERVAL * time.Second)
+		rtmgr.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
 	}
 }
 
diff --git a/container-tag.yaml b/container-tag.yaml
index dc6f324..3308c36 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.4.0
+tag: 0.4.1
diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go
index 9606288..8ceca1d 100644
--- a/pkg/nbi/httprestful.go
+++ b/pkg/nbi/httprestful.go
@@ -35,6 +35,8 @@
 	"encoding/json"
 	"errors"
 	"fmt"
+	"github.com/go-openapi/loads"
+	"github.com/go-openapi/runtime/middleware"
 	"net/url"
 	"os"
 	"routing-manager/pkg/models"
@@ -46,9 +48,6 @@
 	"routing-manager/pkg/sdl"
 	"strconv"
 	"time"
-
-	"github.com/go-openapi/loads"
-	"github.com/go-openapi/runtime/middleware"
 )
 
 //var myClient = &http.Client{Timeout: 1 * time.Second}
@@ -217,6 +216,8 @@
 			if err != nil {
 				return handle.NewProvideXappSubscriptionHandleBadRequest()
 			} else {
+				//Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+				time.Sleep(1 * time.Second)
 				return handle.NewGetHandlesOK()
 			}
 		})
@@ -226,6 +227,8 @@
 			if err != nil {
 				return handle.NewDeleteXappSubscriptionHandleNoContent()
 			} else {
+				//Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+				time.Sleep(1 * time.Second)
 				return handle.NewGetHandlesOK()
 			}
 		})
@@ -237,7 +240,7 @@
 }
 
 func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
-	rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
+	rtmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
 	r, err := myClient.Get(xmurl)
 	if err != nil {
 		return nil, err
@@ -252,10 +255,10 @@
 			rtmgr.Logger.Warn("Json decode failed: " + err.Error())
 		}
 		rtmgr.Logger.Info("HTTP GET: OK")
-		rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
+		rtmgr.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
 		return &xapps, err
 	}
-	rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
+	rtmgr.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
 	return nil, nil
 }
 
@@ -315,8 +318,12 @@
 			if err != nil {
 				rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
 			} else if data != nil {
-				sdlEngine.WriteXApps(fileName, data)
-				triggerSBI <- true
+				rtmgr.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
+				alldata, err1 := httpGetXApps(xmurl)
+				if alldata != nil && err1 == nil {
+					sdlEngine.WriteXApps(fileName, alldata)
+					triggerSBI <- true
+				}
 			}
 		}
 	}()
diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go
index 9a36564..aa8e12f 100644
--- a/pkg/rpe/rpe.go
+++ b/pkg/rpe/rpe.go
@@ -33,8 +33,8 @@
 	"errors"
 	"routing-manager/pkg/rtmgr"
 	"routing-manager/pkg/sbi"
-	"runtime"
 	"strconv"
+	"runtime"
 )
 
 var (
@@ -90,26 +90,26 @@
 		rxList := []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
 		messageId := rtmgr.MessageTypes[messageType]
 		route := rtmgr.RouteTableEntry{
-			MessageType: messageId,
-			TxList:      txList,
-			RxGroups:    rxList,
-			SubID:       subId}
-		*routeTable = append(*routeTable, route)
-		rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
-		rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
-	} else {
-		pc, _, _, ok := runtime.Caller(1)
-		details := runtime.FuncForPC(pc)
-		if ok && details != nil {
-			rtmgr.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
+				MessageType: messageId,
+				TxList:      txList,
+				RxGroups:    rxList,
+				SubID:       subId}
+			*routeTable = append(*routeTable, route)
+			rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
+			rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
+		} else {
+			pc,_,_,ok := runtime.Caller(1)
+			details := runtime.FuncForPC(pc)
+			if ok && details != nil {
+				rtmgr.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
+			}
 		}
-	}
 }
 
 func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
 	rtmgr.Logger.Debug("rpe.generateXappRoutes invoked")
 	rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
-	if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.TxMessages) > 0 && len(xAppEp.RxMessages) > 0 {
+	if xAppEp.XAppType != sbi.PlatformType && ( len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0 ) {
 		//xApp -> Subscription Manager
 		r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1)
 		r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1)
@@ -121,7 +121,7 @@
 	}
 }
 
-func (r *Rpe) generateSubscriptionRoutes(e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
 	rtmgr.Logger.Debug("rpe.addSubscriptionRoutes invoked")
 	subscriptionList := &rtmgr.Subs
 	for _, subscription := range *subscriptionList {
@@ -129,15 +129,18 @@
 		xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port))
 		rtmgr.Logger.Debug("xApp UUID: %v", xAppUuid)
 		xAppEp := getEndpointByUuid(xAppUuid)
-		//Subscription Manager -> xApp
-		r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
-		r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
-		r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
-		r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
-		//E2 Termination -> xApp
-		r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID)
-		r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID)
-		r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID)
+		if xAppEp.Uuid == selectedxAppEp.Uuid { 
+			rtmgr.Logger.Debug("xApp UUID is matched for selected xApp.UUID: %v and xApp.Name: %v", selectedxAppEp.Uuid, selectedxAppEp.Name)
+			//Subscription Manager -> xApp
+			r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
+			r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
+			r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
+			r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
+			//E2 Termination -> xApp
+			r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID)
+			r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID)
+			r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID)
+		}
 	}
 }
 
@@ -227,9 +230,9 @@
 
 	for _, endPoint := range endPointList {
 		rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
-		if endPoint.XAppType != sbi.PlatformType && len(endPoint.TxMessages) > 0 && len(endPoint.RxMessages) > 0 {
+		if endPoint.XAppType != sbi.PlatformType && ( len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0 ) {
 			r.generateXappRoutes(endPoint, e2TermEp, subManEp, routeTable)
-			r.generateSubscriptionRoutes(e2TermEp, subManEp, routeTable)
+			r.generateSubscriptionRoutes(endPoint, e2TermEp, subManEp, routeTable)
 		}
 	}
 	return routeTable
diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go
index 6e3e225..4f56753 100644
--- a/pkg/sbi/nngpush.go
+++ b/pkg/sbi/nngpush.go
@@ -30,12 +30,11 @@
 
 import (
 	"errors"
-	"routing-manager/pkg/rtmgr"
-	"strconv"
-
 	"nanomsg.org/go/mangos/v2"
 	"nanomsg.org/go/mangos/v2/protocol/push"
 	_ "nanomsg.org/go/mangos/v2/transport/all"
+	"routing-manager/pkg/rtmgr"
+	"strconv"
 )
 
 type NngPush struct {
@@ -61,6 +60,7 @@
 
 func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
 	rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
+	rtmgr.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
 	for _, ep := range rtmgr.Eps {
 		uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
 		if uri == pipe.Address() {
diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go
index 7d07160..2737cc1 100644
--- a/pkg/sbi/sbi.go
+++ b/pkg/sbi/sbi.go
@@ -64,6 +64,7 @@
 }
 
 func (s *Sbi) pruneEndpointList(sbi Engine) {
+	rtmgr.Logger.Debug("pruneEndpointList invoked.")
 	for _, ep := range rtmgr.Eps {
 		if !ep.Keepalive {
 			rtmgr.Logger.Debug("deleting %v", ep)