Support for subscription callback

Change-Id: If35eb7b2ee3e4841dfdcb9345172aa337afaf140
Signed-off-by: Mohamed Abukar <abukar.mohamed@nokia.com>
diff --git a/pkg/xapp/subscription.go b/pkg/xapp/subscription.go
index 6ceabf9..cf8dc04 100755
--- a/pkg/xapp/subscription.go
+++ b/pkg/xapp/subscription.go
@@ -20,14 +20,17 @@
 package xapp
 
 import (
+	"bytes"
 	"encoding/json"
 	"fmt"
 	"github.com/go-openapi/loads"
 	httptransport "github.com/go-openapi/runtime/client"
 	"github.com/go-openapi/runtime/middleware"
 	"github.com/go-openapi/strfmt"
+	"github.com/spf13/viper"
 	"io/ioutil"
 	"net/http"
+	"strings"
 	"time"
 
 	apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
@@ -48,6 +51,7 @@
 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
 type SubscriptionDeleteHandler func(string) error
+type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
 
 type Subscriber struct {
 	localAddr  string
@@ -56,6 +60,8 @@
 	remoteUrl  string
 	remoteProt []string
 	timeout    time.Duration
+	clientUrl  string
+	clientCB   SubscriptionResponseCallback
 }
 
 func NewSubscriber(host string, timo int) *Subscriber {
@@ -67,18 +73,34 @@
 		timo = 20
 	}
 
-	return &Subscriber{
+	r := &Subscriber{
 		remoteHost: host,
 		remoteUrl:  "/ric/v1",
 		remoteProt: []string{"http"},
 		timeout:    time.Duration(timo) * time.Second,
 		localAddr:  "0.0.0.0",
 		localPort:  8088,
+		clientUrl:  "/ric/v1/subscriptions/response",
+	}
+	Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
+
+	return r
+}
+
+func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
+	if req.Body != nil {
+		var resp apimodel.SubscriptionResponse
+		if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
+			if r.clientCB != nil {
+				r.clientCB(&resp)
+			}
+		}
+		req.Body.Close()
 	}
 }
 
 // Server interface: listen and receive subscription requests
-func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandler, del SubscriptionDeleteHandler) error {
+func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
 	swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
 	if err != nil {
 		return err
@@ -86,10 +108,10 @@
 
 	api := operations.NewXappFrameworkAPI(swaggerSpec)
 
-	// Subscription: query
+	// Subscription: Query
 	api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
 		func(p query.GetAllSubscriptionsParams) middleware.Responder {
-			if resp, err := get(); err == nil {
+			if resp, err := getSubscription(); err == nil {
 				return query.NewGetAllSubscriptionsOK().WithPayload(resp)
 			}
 			return query.NewGetAllSubscriptionsInternalServerError()
@@ -98,25 +120,25 @@
 	// SubscriptionType: Report
 	api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
 		func(p report.SubscribeReportParams) middleware.Responder {
-			if resp, err := add(models.SubscriptionTypeReport, p.ReportParams); err == nil {
+			if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
 				return report.NewSubscribeReportCreated().WithPayload(resp)
 			}
 			return report.NewSubscribeReportInternalServerError()
 		})
 
-	// SubscriptionType: policy
+	// SubscriptionType: Policy
 	api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
 		func(p policy.SubscribePolicyParams) middleware.Responder {
-			if resp, err := add(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
+			if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
 				return policy.NewSubscribePolicyCreated().WithPayload(resp)
 			}
 			return policy.NewSubscribePolicyInternalServerError()
 		})
 
-	// SubscriptionType: delete
+	// SubscriptionType: Delete
 	api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
 		func(p common.UnsubscribeParams) middleware.Responder {
-			if err := del(p.SubscriptionID); err == nil {
+			if err := delSubscription(p.SubscriptionID); err == nil {
 				return common.NewUnsubscribeNoContent()
 			}
 			return common.NewUnsubscribeInternalServerError()
@@ -134,6 +156,50 @@
 	return nil
 }
 
+// Server interface: send notification to client
+func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
+	respData, err := json.Marshal(resp)
+	if err != nil {
+		Logger.Error("json.Marshal failed: %v", err)
+		return err
+	}
+
+	port := strings.Split(viper.GetString("local.host"), ":")[1]
+	clientUrl := fmt.Sprintf("http://%s:%s%s", clientEndpoint, port, r.clientUrl)
+
+	retries := viper.GetInt("subscription.retryCount")
+	if retries == 0 {
+		retries = 10
+	}
+
+	delay := viper.GetInt("subscription.retryDelay")
+	if delay == 0 {
+		delay = 5
+	}
+
+	for i := 0; i < retries; i++ {
+		r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
+		if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
+			break
+		}
+
+		if err != nil {
+			Logger.Error("%v", err)
+		}
+		if r != nil && r.StatusCode != http.StatusOK {
+			Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
+		}
+		time.Sleep(time.Duration(delay) * time.Second)
+	}
+
+	return err
+}
+
+// Subscription interface for xApp: Response callback
+func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
+	r.clientCB = c
+}
+
 // Subscription interface for xApp: REPORT
 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
 	params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)