Support for subscription callback
Change-Id: If35eb7b2ee3e4841dfdcb9345172aa337afaf140
Signed-off-by: Mohamed Abukar <abukar.mohamed@nokia.com>
diff --git a/ci/Dockerfile b/ci/Dockerfile
index 7b95438..8303114 100755
--- a/ci/Dockerfile
+++ b/ci/Dockerfile
@@ -35,7 +35,7 @@
RUN curl -s https://packagecloud.io/install/repositories/o-ran-sc/master/script.deb.sh | bash
# RMR
-ARG RMRVERSION=3.6.0
+ARG RMRVERSION=3.6.5
#RUN apt-get install -y rmr=${RMRVERSION} rmr-dev=${RMRVERSION}
RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb
RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb
diff --git a/config/config-file.yaml b/config/config-file.yaml
index 42236ec..6f163c3 100755
--- a/config/config-file.yaml
+++ b/config/config-file.yaml
@@ -26,6 +26,8 @@
"subscription":
"host": "localhost:8088"
"timeout": 2
+ "retryCount": 10
+ "retryDelay": 5
"db":
"namespace": "sdl"
"waitForSdl": false
diff --git a/pkg/xapp/subscription.go b/pkg/xapp/subscription.go
index 6ceabf9..cf8dc04 100755
--- a/pkg/xapp/subscription.go
+++ b/pkg/xapp/subscription.go
@@ -20,14 +20,17 @@
package xapp
import (
+ "bytes"
"encoding/json"
"fmt"
"github.com/go-openapi/loads"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/runtime/middleware"
"github.com/go-openapi/strfmt"
+ "github.com/spf13/viper"
"io/ioutil"
"net/http"
+ "strings"
"time"
apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
@@ -48,6 +51,7 @@
type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
type SubscriptionQueryHandler func() (models.SubscriptionList, error)
type SubscriptionDeleteHandler func(string) error
+type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
type Subscriber struct {
localAddr string
@@ -56,6 +60,8 @@
remoteUrl string
remoteProt []string
timeout time.Duration
+ clientUrl string
+ clientCB SubscriptionResponseCallback
}
func NewSubscriber(host string, timo int) *Subscriber {
@@ -67,18 +73,34 @@
timo = 20
}
- return &Subscriber{
+ r := &Subscriber{
remoteHost: host,
remoteUrl: "/ric/v1",
remoteProt: []string{"http"},
timeout: time.Duration(timo) * time.Second,
localAddr: "0.0.0.0",
localPort: 8088,
+ clientUrl: "/ric/v1/subscriptions/response",
+ }
+ Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
+
+ return r
+}
+
+func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
+ if req.Body != nil {
+ var resp apimodel.SubscriptionResponse
+ if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
+ if r.clientCB != nil {
+ r.clientCB(&resp)
+ }
+ }
+ req.Body.Close()
}
}
// Server interface: listen and receive subscription requests
-func (r *Subscriber) Listen(add SubscriptionHandler, get SubscriptionQueryHandler, del SubscriptionDeleteHandler) error {
+func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
if err != nil {
return err
@@ -86,10 +108,10 @@
api := operations.NewXappFrameworkAPI(swaggerSpec)
- // Subscription: query
+ // Subscription: Query
api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
func(p query.GetAllSubscriptionsParams) middleware.Responder {
- if resp, err := get(); err == nil {
+ if resp, err := getSubscription(); err == nil {
return query.NewGetAllSubscriptionsOK().WithPayload(resp)
}
return query.NewGetAllSubscriptionsInternalServerError()
@@ -98,25 +120,25 @@
// SubscriptionType: Report
api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
func(p report.SubscribeReportParams) middleware.Responder {
- if resp, err := add(models.SubscriptionTypeReport, p.ReportParams); err == nil {
+ if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
return report.NewSubscribeReportCreated().WithPayload(resp)
}
return report.NewSubscribeReportInternalServerError()
})
- // SubscriptionType: policy
+ // SubscriptionType: Policy
api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
func(p policy.SubscribePolicyParams) middleware.Responder {
- if resp, err := add(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
+ if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
return policy.NewSubscribePolicyCreated().WithPayload(resp)
}
return policy.NewSubscribePolicyInternalServerError()
})
- // SubscriptionType: delete
+ // SubscriptionType: Delete
api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
func(p common.UnsubscribeParams) middleware.Responder {
- if err := del(p.SubscriptionID); err == nil {
+ if err := delSubscription(p.SubscriptionID); err == nil {
return common.NewUnsubscribeNoContent()
}
return common.NewUnsubscribeInternalServerError()
@@ -134,6 +156,50 @@
return nil
}
+// Server interface: send notification to client
+func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
+ respData, err := json.Marshal(resp)
+ if err != nil {
+ Logger.Error("json.Marshal failed: %v", err)
+ return err
+ }
+
+ port := strings.Split(viper.GetString("local.host"), ":")[1]
+ clientUrl := fmt.Sprintf("http://%s:%s%s", clientEndpoint, port, r.clientUrl)
+
+ retries := viper.GetInt("subscription.retryCount")
+ if retries == 0 {
+ retries = 10
+ }
+
+ delay := viper.GetInt("subscription.retryDelay")
+ if delay == 0 {
+ delay = 5
+ }
+
+ for i := 0; i < retries; i++ {
+ r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
+ if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
+ break
+ }
+
+ if err != nil {
+ Logger.Error("%v", err)
+ }
+ if r != nil && r.StatusCode != http.StatusOK {
+ Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
+ }
+ time.Sleep(time.Duration(delay) * time.Second)
+ }
+
+ return err
+}
+
+// Subscription interface for xApp: Response callback
+func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
+ r.clientCB = c
+}
+
// Subscription interface for xApp: REPORT
func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
diff --git a/pkg/xapp/subscription_test.go b/pkg/xapp/subscription_test.go
index b9713af..a5601cc 100755
--- a/pkg/xapp/subscription_test.go
+++ b/pkg/xapp/subscription_test.go
@@ -7,28 +7,29 @@
package xapp
import (
- apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"github.com/stretchr/testify/assert"
"testing"
"time"
+ "fmt"
)
var suite *testing.T
var meid = "gnb123456"
var funId = int64(1)
-var clientEndpoint = "localhost:4561"
+var clientEndpoint = "localhost"
var direction = int64(0)
var procedureCode = int64(27)
var typeOfMessage = int64(1)
-var reportParams = apimodel.ReportParams{
- Meid: meid,
- RANFunctionID: &funId,
+var reportParams = clientmodel.ReportParams{
+ Meid: meid,
+ RANFunctionID: &funId,
ClientEndpoint: &clientEndpoint,
- EventTriggers: apimodel.EventTriggerList{
- &apimodel.EventTrigger{
+ EventTriggers: clientmodel.EventTriggerList{
+ &clientmodel.EventTrigger{
InterfaceDirection: direction,
ProcedureCode: procedureCode,
TypeOfMessage: typeOfMessage,
@@ -36,18 +37,34 @@
},
}
-var policyParams = apimodel.PolicyParams{
- Meid: &meid,
- RANFunctionID: &funId,
+var policyParams = clientmodel.PolicyParams{
+ Meid: &meid,
+ RANFunctionID: &funId,
ClientEndpoint: &clientEndpoint,
- EventTriggers: apimodel.EventTriggerList{
- &apimodel.EventTrigger{
+ EventTriggers: clientmodel.EventTriggerList{
+ &clientmodel.EventTrigger{
InterfaceDirection: direction,
ProcedureCode: procedureCode,
TypeOfMessage: typeOfMessage,
},
},
- PolicyActionDefinitions: &apimodel.PolicyActionDefinition{},
+ PolicyActionDefinitions: &clientmodel.PolicyActionDefinition{},
+}
+
+func processSubscriptions(subscriptionId string) {
+ // Generate requestorId, instanceId
+ reqId := int64(11)
+ instanceId := int64(22)
+
+ resp := &models.SubscriptionResponse{
+ SubscriptionID: &subscriptionId,
+ SubscriptionInstances: []*models.SubscriptionInstance{
+ &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId},
+ },
+ }
+
+ // Notify the client: don't worry about errors ... Notify() will handle retries, etc.
+ Subscription.Notify(resp, clientEndpoint)
}
func subscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
@@ -65,15 +82,15 @@
assert.Equal(suite, clientEndpoint, *p.ClientEndpoint)
}
- subId := "xapp-11"
- reqId := int64(11)
- instanceId := int64(22)
+ // Generate a unique subscriptionId
+ subscriptionId := fmt.Sprintf("%s-%s", meid, clientEndpoint)
+
+ // Process subscriptions on the background
+ go processSubscriptions(subscriptionId)
+
+ // and send response immediately
return &models.SubscriptionResponse{
- SubscriptionID: &subId,
- SubscriptionInstances: []*models.SubscriptionInstance{
- &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId},
- &models.SubscriptionInstance{RequestorID: &reqId, InstanceID: &instanceId},
- },
+ SubscriptionID: &subscriptionId,
}, nil
}
@@ -113,27 +130,29 @@
}
func TestSubscriptionReportHandling(t *testing.T) {
- resp, err := Subscription.SubscribeReport(&reportParams)
+ Subscription.SetResponseCB(func(resp *clientmodel.SubscriptionResponse) {
+ assert.Equal(t, len(resp.SubscriptionInstances), 1)
+ assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11))
+ assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22))
+ })
+ _, err := Subscription.SubscribeReport(&reportParams)
assert.Equal(t, err, nil)
- assert.Equal(t, len(resp.SubscriptionInstances), 2)
- assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11))
- assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22))
- assert.Equal(t, *resp.SubscriptionInstances[1].RequestorID, int64(11))
- assert.Equal(t, *resp.SubscriptionInstances[1].InstanceID, int64(22))
}
func TestSubscriptionPolicytHandling(t *testing.T) {
- resp, err := Subscription.SubscribePolicy(&policyParams)
+ Subscription.SetResponseCB(func(resp *clientmodel.SubscriptionResponse) {
+ assert.Equal(t, len(resp.SubscriptionInstances), 1)
+ assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11))
+ assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22))
+ })
+ _, err := Subscription.SubscribePolicy(&policyParams)
assert.Equal(t, err, nil)
- assert.Equal(t, len(resp.SubscriptionInstances), 2)
- assert.Equal(t, *resp.SubscriptionInstances[0].RequestorID, int64(11))
- assert.Equal(t, *resp.SubscriptionInstances[0].InstanceID, int64(22))
}
func TestSubscriptionDeleteHandling(t *testing.T) {
err := Subscription.UnSubscribe(clientEndpoint)
assert.Equal(t, err, nil)
-}
\ No newline at end of file
+}
diff --git a/pkg/xapp/xapp_test.go b/pkg/xapp/xapp_test.go
index e52c3ee..9696e6e 100755
--- a/pkg/xapp/xapp_test.go
+++ b/pkg/xapp/xapp_test.go
@@ -190,44 +190,44 @@
}
func TestMessagesReceivedSuccessfullyUsingWhCall(t *testing.T) {
- time.Sleep(time.Duration(5) * time.Second)
- whid := Rmr.Openwh("localhost:4560")
- params := &RMRParams{}
- params.Payload = []byte("newrt|start\nnewrt|end\n")
- params.Whid = int(whid)
- params.Callid = 4
- params.Timeout = 1000
- Rmr.SendCallMsg(params)
+ time.Sleep(time.Duration(5) * time.Second)
+ whid := Rmr.Openwh("localhost:4560")
+ params := &RMRParams{}
+ params.Payload = []byte("newrt|start\nnewrt|end\n")
+ params.Whid = int(whid)
+ params.Callid = 4
+ params.Timeout = 1000
+ Rmr.SendCallMsg(params)
// Allow time to process the messages
time.Sleep(time.Duration(2) * time.Second)
- waitForSdl := viper.GetBool("db.waitForSdl")
- stats := getMetrics(t)
- if !strings.Contains(stats, "ricxapp_RMR_Transmitted 200") {
- t.Errorf("Error: ricxapp_RMR_Transmitted value incorrect: %v", stats)
- }
+ waitForSdl := viper.GetBool("db.waitForSdl")
+ stats := getMetrics(t)
+ if !strings.Contains(stats, "ricxapp_RMR_Transmitted 200") {
+ t.Errorf("Error: ricxapp_RMR_Transmitted value incorrect: %v", stats)
+ }
- if !strings.Contains(stats, "ricxapp_RMR_Received 201") {
- t.Errorf("Error: ricxapp_RMR_Received value incorrect: %v", stats)
- }
+ if !strings.Contains(stats, "ricxapp_RMR_Received 201") {
+ t.Errorf("Error: ricxapp_RMR_Received value incorrect: %v", stats)
+ }
- if !strings.Contains(stats, "ricxapp_RMR_TransmitError 1") {
- t.Errorf("Error: ricxapp_RMR_TransmitError value incorrect")
- }
+ if !strings.Contains(stats, "ricxapp_RMR_TransmitError 1") {
+ t.Errorf("Error: ricxapp_RMR_TransmitError value incorrect")
+ }
- if !strings.Contains(stats, "ricxapp_RMR_ReceiveError 0") {
- t.Errorf("Error: ricxapp_RMR_ReceiveError value incorrect")
- }
+ if !strings.Contains(stats, "ricxapp_RMR_ReceiveError 0") {
+ t.Errorf("Error: ricxapp_RMR_ReceiveError value incorrect")
+ }
- if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_Stored 201") {
- t.Errorf("Error: ricxapp_SDL_Stored value incorrect")
- }
+ if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_Stored 201") {
+ t.Errorf("Error: ricxapp_SDL_Stored value incorrect")
+ }
- if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_StoreError 0") {
- t.Errorf("Error: ricxapp_SDL_StoreError value incorrect")
- }
- Rmr.Closewh(int(whid))
+ if waitForSdl && !strings.Contains(stats, "ricxapp_SDL_StoreError 0") {
+ t.Errorf("Error: ricxapp_SDL_StoreError value incorrect")
+ }
+ Rmr.Closewh(int(whid))
}
func TestSubscribeChannels(t *testing.T) {