Implementation of A1-EI data delivery API

Implementation of A1-EI data delivery API which gets data related to ei
job and sends to xapp

Signed-off-by: naman.gupta <naman.gupta@samsung.com>
Change-Id: I80c0f744519e29978c99262caaad6b5e968316e5
diff --git a/a1-go/pkg/restful/restful.go b/a1-go/pkg/restful/restful.go
index b0663dd..d8d1e92 100644
--- a/a1-go/pkg/restful/restful.go
+++ b/a1-go/pkg/restful/restful.go
@@ -28,6 +28,7 @@
 	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
 	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/restapi"
 	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/restapi/operations"
+	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/restapi/operations/a1_e_i_data_delivery"
 	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/restapi/operations/a1_mediator"
 	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/resthooks"
 	"github.com/go-openapi/loads"
@@ -145,7 +146,15 @@
 		return a1_mediator.NewA1ControllerDeletePolicyInstanceAccepted()
 
 	})
-	
+
+	api.A1eiDataDeliveryA1ControllerDataDeliveryHandler = a1_e_i_data_delivery.A1ControllerDataDeliveryHandlerFunc(func(params a1_e_i_data_delivery.A1ControllerDataDeliveryParams) middleware.Responder {
+		a1.Logger.Debug("handler for EI data delivery")
+		if err = r.rh.DataDelivery(params.Body); err != nil {
+			return a1_e_i_data_delivery.NewA1ControllerDataDeliveryNotFound()
+		}
+		return a1_e_i_data_delivery.NewA1ControllerDataDeliveryOK()
+	})
+
 	return api
 
 }
diff --git a/a1-go/pkg/resthooks/resthooks.go b/a1-go/pkg/resthooks/resthooks.go
index e0060f1..c57c2e8 100644
--- a/a1-go/pkg/resthooks/resthooks.go
+++ b/a1-go/pkg/resthooks/resthooks.go
@@ -42,6 +42,8 @@
 	a1InstancePrefix         = "a1.policy_instance."
 	a1InstanceMetadataPrefix = "a1.policy_inst_metadata."
 	a1HandlerPrefix          = "a1.policy_handler."
+	a1PolicyRequest          = 20010
+	a1EIDataDelivery         = 20017
 )
 
 var typeAlreadyError = errors.New("Policy Type already exists")
@@ -382,7 +384,7 @@
 			a1.Logger.Error("error : %v", err)
 			return err
 		}
-		isSent := rh.iRmrSenderInst.RmrSendToXapp(rmrMessage)
+		isSent := rh.iRmrSenderInst.RmrSendToXapp(rmrMessage, a1PolicyRequest)
 		if isSent {
 			a1.Logger.Debug("rmrSendToXapp : message sent")
 		} else {
@@ -671,7 +673,7 @@
 		a1.Logger.Error("error : %v", err)
 		return err
 	}
-	isSent := rh.iRmrSenderInst.RmrSendToXapp(rmrMessage)
+	isSent := rh.iRmrSenderInst.RmrSendToXapp(rmrMessage, a1PolicyRequest)
 	if isSent {
 		a1.Logger.Debug("rmrSendToXapp : message sent")
 	} else {
@@ -681,3 +683,22 @@
 
 	return nil
 }
+
+func (rh *Resthook) DataDelivery(httpBody interface{}) error {
+	a1.Logger.Debug("httpbody : %+v", httpBody)
+	mymap := httpBody.(map[string]interface{})
+	message := rmr.Message{}
+	rmrMessage, err := message.A1EIMessage(mymap["job"].(string), mymap["payload"].(string))
+	if err != nil {
+		a1.Logger.Error("error : %v", err)
+		return err
+	}
+	a1.Logger.Debug("rmrSendToXapp :rmrMessage %+v", rmrMessage)
+	isSent := rh.iRmrSenderInst.RmrSendToXapp(rmrMessage, a1EIDataDelivery)
+	if isSent {
+		a1.Logger.Debug("rmrSendToXapp : message sent")
+	} else {
+		a1.Logger.Error("rmrSendToXapp : message not sent")
+	}
+	return nil
+}
diff --git a/a1-go/pkg/resthooks/resthooks_test.go b/a1-go/pkg/resthooks/resthooks_test.go
index deffc4b..8c6b59a 100644
--- a/a1-go/pkg/resthooks/resthooks_test.go
+++ b/a1-go/pkg/resthooks/resthooks_test.go
@@ -144,7 +144,7 @@
 	a1.Logger.Debug("metadatainstancekey   : %+v", metadatainstancekey)
 	metadatainstancearr := []interface{}{metadatainstancekey, string(metadata)}
 	sdlInst.On("Set", "A1m_ns", metadatainstancearr).Return(nil)
-	rmrSenderInst.On("RmrSendToXapp", "httpBodyString").Return(true)
+	rmrSenderInst.On("RmrSendToXapp", "httpBodyString", 20010).Return(true)
 
 	errresp := rh.CreatePolicyInstance(policyTypeId, policyInstanceID, instancedata)
 
@@ -287,13 +287,31 @@
 
 	httpBodyString := `{"operation":"DELETE","payload":"","policy_instance_id":"123456","policy_type_id":"20001"}`
 
-	rmrSenderInst.On("RmrSendToXapp", httpBodyString).Return(true)
+	rmrSenderInst.On("RmrSendToXapp", httpBodyString, 20010).Return(true)
 
 	errresp := rh.DeletePolicyInstance(policyTypeId, policyInstanceID)
 
 	assert.Nil(t, errresp)
 	sdlInst.AssertExpectations(t)
 }
+func TestDataDelivery(t *testing.T) {
+
+	httpBody := `{
+		"job":"1",
+		"payload":"payload"
+		}
+		`
+	var instancedata interface{}
+
+	json.Unmarshal([]byte(httpBody), &instancedata)
+	a1.Logger.Debug("Marshaled data : %+v", (instancedata))
+	httpBodyString := `{"ei_job_id":"1","payload":"payload"}`
+	rmrSenderInst.On("RmrSendToXapp", httpBodyString, 20017).Return(true)
+	errresp := rh.DataDelivery(instancedata)
+
+	assert.Nil(t, errresp)
+	sdlInst.AssertExpectations(t)
+}
 
 type SdlMock struct {
 	mock.Mock
@@ -353,9 +371,15 @@
 	return args.Bool(0), args.Error(1)
 }
 
-func (rmr *RmrSenderMock) RmrSendToXapp(httpBodyString string) bool {
-	args := rmr.MethodCalled("RmrSendToXapp", httpBodyString)
-	return args.Bool(0)
+func (rmr *RmrSenderMock) RmrSendToXapp(httpBodyString string, mtype int) bool {
+	if httpBodyString == `{"blocking_rate":20,"enforce":true,"trigger_threshold":10,"window_length":20}` {
+		args := rmr.MethodCalled("RmrSendToXapp", httpBodyString, mtype)
+		return args.Bool(0)
+	} else if httpBodyString == `{"ei_job_id":"1","payload":"payload"}` {
+		args := rmr.MethodCalled("RmrSendToXapp", httpBodyString, mtype)
+		return args.Bool(0)
+	}
+	return true
 }
 
 func (s *SdlMock) Remove(ns string, keys []string) error {
diff --git a/a1-go/pkg/rmr/messages.go b/a1-go/pkg/rmr/messages.go
index af4e6c2..36771ef 100644
--- a/a1-go/pkg/rmr/messages.go
+++ b/a1-go/pkg/rmr/messages.go
@@ -43,3 +43,17 @@
 	}
 	return string(data), nil
 }
+
+func (m *Message) A1EIMessage(eiJobId string, httpBody string) (string, error) {
+	var datajson interface{}
+	datajson = map[string]string{
+		"ei_job_id": eiJobId,
+		"payload":   httpBody}
+	data, err := json.Marshal(datajson)
+
+	if err != nil {
+		a1.Logger.Error("marshal error : %v", err)
+		return "", err
+	}
+	return string(data), nil
+}
diff --git a/a1-go/pkg/rmr/rmr.go b/a1-go/pkg/rmr/rmr.go
index 78adf46..660089b 100644
--- a/a1-go/pkg/rmr/rmr.go
+++ b/a1-go/pkg/rmr/rmr.go
@@ -27,8 +27,7 @@
 )
 
 const (
-	a1PolicyRequest = 20010
-	a1SourceName    = "service-ricplt-a1mediator-http"
+	a1SourceName = "service-ricplt-a1mediator-http"
 )
 
 type RmrSender struct {
@@ -36,7 +35,7 @@
 }
 
 type IRmrSender interface {
-	RmrSendToXapp(httpBodyString string) bool
+	RmrSendToXapp(httpBodyString string, messagetype int) bool
 }
 
 func NewRMRSender() IRmrSender {
@@ -56,10 +55,10 @@
 	}
 }
 
-func (rmr *RmrSender) RmrSendToXapp(httpBodyString string) bool {
+func (rmr *RmrSender) RmrSendToXapp(httpBodyString string, messagetype int) bool {
 
 	params := &xapp.RMRParams{}
-	params.Mtype = a1PolicyRequest
+	params.Mtype = messagetype
 	params.SubId = -1
 	params.Xid = ""
 	params.Meid = &xapp.RMRMeid{}