Subscription manager v0.10.0

Contains v0.8.0-v0.10.0
Handle RICsubscriptionDeleteResponse message
Communicating RICsubscriptionDeleteResponse to routing manager
Updated transaction handling
Tracking Mbuf in transaction table

Change-Id: I0d4964b7bd717941a0e50ede3e9a878590079141
Signed-off-by: kalnagy <kalman.nagy@nokia.com>
diff --git a/Dockerfile b/Dockerfile
index d1f9737..4ae45f0 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -22,47 +22,67 @@
 #
 FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:1-u18.04-nng1.1.1 as submgrbuild
 
-COPY . /opt/submgr
+WORKDIR /tmp
 
 # Install RMr shared library
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.6.0_amd64.deb/download.deb && dpkg -i rmr_1.6.0_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.6.0_amd64.deb/download.deb && dpkg -i rmr_1.6.0_amd64.deb && rm -rf rmr_1.6.0_amd64.deb
 # Install RMr development header files
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.6.0_amd64.deb/download.deb && dpkg -i rmr-dev_1.6.0_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.6.0_amd64.deb/download.deb && dpkg -i rmr-dev_1.6.0_amd64.deb && rm -rf rmr-dev_1.6.0_amd64.deb
 
 # "PULLING LOG and COMPILING LOG"
 RUN git clone "https://gerrit.o-ran-sc.org/r/com/log" /opt/log && cd /opt/log && \
  ./autogen.sh && ./configure && make install && ldconfig
 
-# "COMPILING E2AP Wrapper"
-RUN cd /opt/submgr/e2ap && \
- gcc -c -fPIC -Iheaders/ lib/*.c wrapper.c && \
- gcc *.o -shared -o libwrapper.so && \
- cp libwrapper.so /usr/local/lib/ && \
- cp wrapper.h headers/*.h /usr/local/include/ && \
- ldconfig
-
 # "Installing Swagger"
 RUN cd /usr/local/go/bin \
     && wget --quiet https://github.com/go-swagger/go-swagger/releases/download/v0.19.0/swagger_linux_amd64 \
     && mv swagger_linux_amd64 swagger \
     && chmod +x swagger
 
+
+WORKDIR /opt/submgr
+COPY e2ap e2ap
+
+# "COMPILING E2AP Wrapper"
+RUN cd e2ap && \
+    gcc -c -fPIC -Iheaders/ lib/*.c wrapper.c && \
+    gcc *.o -shared -o libwrapper.so && \
+    cp libwrapper.so /usr/local/lib/ && \
+    cp wrapper.h headers/*.h /usr/local/include/ && \
+    ldconfig
+
+COPY api api
+
 # "Getting and generating routing managers api client"
-RUN cd /opt/submgr \
-    && git clone "https://gerrit.o-ran-sc.org/r/ric-plt/rtmgr" \
+RUN git clone "https://gerrit.o-ran-sc.org/r/ric-plt/rtmgr" \
     && cp rtmgr/api/routing_manager.yaml api/ \
     && rm -rf rtmgr
 
-RUN cd /opt/submgr \
-    && mkdir -p /root/go \
-    && /usr/local/go/bin/swagger generate client -f api/routing_manager.yaml -t pkg/ -m rtmgr_models -c rtmgr_client
+RUN mkdir pkg
+
+COPY go.mod go.mod
+
+RUN mkdir -p /root/go && \
+    /usr/local/go/bin/swagger generate client -f api/routing_manager.yaml -t pkg/ -m rtmgr_models -c rtmgr_client
+
+
+COPY pkg pkg
+COPY cmd cmd
+
+RUN /usr/local/go/bin/go mod tidy
+
+RUN git clone -b v0.0.8 "https://gerrit.o-ran-sc.org/r/ric-plt/xapp-frame" /tmp/xapp-frame
+COPY tmp/rmr.go /tmp/xapp-frame/pkg/xapp/rmr.go
+
+RUN /usr/local/go/bin/go mod edit -replace "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame"="/tmp/xapp-frame"
 
 # "COMPILING Subscription manager"
-RUN mkdir -p /opt/bin && cd /opt/submgr && \
- /usr/local/go/bin/go get && \
-  /usr/local/go/bin/go build -o /opt/bin/submgr ./cmd/submgr.go && \
+RUN mkdir -p /opt/bin && \
+  /usr/local/go/bin/go build -o /opt/bin/submgr cmd/submgr.go && \
      mkdir -p /opt/build/container/usr/local
 
+COPY config config
+
 FROM ubuntu:18.04
 
 COPY --from=submgrbuild /opt/bin/submgr /opt/submgr/config/submgr.yaml /
@@ -72,3 +92,4 @@
 RUN ldconfig
 
 RUN chmod 755 /run_submgr.sh
+CMD /run_submgr.sh
diff --git a/RELNOTES b/RELNOTES
index 83b6eca..72ef0b3 100644
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,13 @@
+### v0.10.0
+* Tracking Mbuf in transaction table
+
+### v0.9.0
+* Communicating RICsubscriptionDeleteResponse to routing manager
+* Updated transaction handling
+
+### v0.8.0
+* Handle RICsubscriptionDeleteResponse message
+
 ### v0.7.2
 * Correction for E2AP PDU handling
 * Correction for Transaction Table handling
diff --git a/api/routing_manager.yaml b/api/routing_manager.yaml
index 8fabfb4..7bfbfea 100644
--- a/api/routing_manager.yaml
+++ b/api/routing_manager.yaml
@@ -18,13 +18,13 @@
 #
 #
 #   Abstract:   Routing Manager's RESTful API definition
-#   Date:	29 March 2019
+#   Date:       28 August 2019
 #
 swagger: "2.0"
 info:
   title: Routing Manager
   description: "This is the Swagger/OpenAPI 2.0 definition of Routing Manager's Northbound API."
-  version: "0.3.0"
+  version: "0.4.0"
   license:
     name: "Apache 2.0"
     url: "http://www.apache.org/licenses/LICENSE-2.0.html"
@@ -126,8 +126,58 @@
           description: "Invalid data"
         201:
           description: "Xapp Subscription data received"
+    delete:
+      tags:
+      - "handle"
+      summary: "API for deleting an xapp subscription"
+      description: "By performing the delete operation on xapp-subscription-handle resource, the API caller will be able to update routing manager about the deletion of an xapp's subscription"
+      operationId: "delete_xapp_subscription_handle"
+      consumes:
+      - "application/json"
+      parameters:
+      - in: "body"
+        name: "xapp-subscription-data"
+        description: "xApp related subscription data"
+        required: true
+        schema:
+          $ref: "#/definitions/xapp-subscription-data"
+      responses:
+        204:
+          description: "Content not found"
+        200:
+          description: "Xapp Subscription deleted"
+  /handles/xapp-subscription-handle/{subscription_id}:
+    put:
+      tags:
+      - "handle"
+      summary: "API for updating the subscriber xApp list"
+      description: "By performing a PUT method on a xapp-subscription-handle/{subscription_id} resource, the API caller is able to update the Routing manager about the list of subscriber xApps related to the subscription denoted by the {subsription_id}."
+      operationId: "update_xapp_subscription_handle"
+      consumes:
+      - "application/json"
+#      - "application/yaml"
+      produces:
+      - "application/json"
+#      - "application/yaml"
+      parameters:
+        - in: path
+          name: subscription_id
+          required: true
+          type: integer
+          format: "uint16"
+          description: "Subscription ID"
+        - in: body
+          name: xapp-list
+          description: "xApp list"
+          required: true
           schema:
-            $ref: "#/definitions/xapp-subscription-data-response"
+           $ref: "#/definitions/xapp-list"
+      responses:
+        400:
+          description: "Invalid data"
+        201:
+          description: "Xapp list received"
+  
 definitions:
   health-status:
     type: "object"
@@ -141,8 +191,7 @@
     type: "object"
     properties:
       id:
-        type: "integer"
-        format: "int64"
+        type: "string"
       event:
         type: "string"
       version:
@@ -166,16 +215,27 @@
         maximum: 65535
       subscription_id: #subscription sequence number
         type: "integer"
-        format: "int32" 
-  xapp-subscription-data-response:
+        format: "int32"
+  xapp-list:
+    type: "array"
+    items:
+      $ref: '#/definitions/xapp-element'
+  xapp-element:
     type: "object"
     required:
-      - "location"
+      - "address"
+      - "port"
     properties:
-      location:
-        type: "string"
+      address:
+        type: "string" #This is the xapp instance hostname or ip address
+      port: #xapp instance port address
+        type: "integer"
+        format: "uint16"
+        minimum: 0
+        maximum: 65535
 
 externalDocs:
   description: "Routing Manager"
   url: "http://placeholder"
 
+
diff --git a/container-tag.yaml b/container-tag.yaml
index 540b2fe..68891cd 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.7.2
+tag: 0.10.0
diff --git a/e2ap/wrapper.c b/e2ap/wrapper.c
index 015693d..f31cd94 100644
--- a/e2ap/wrapper.c
+++ b/e2ap/wrapper.c
@@ -24,41 +24,22 @@
     }
 }
 
-ssize_t encode_RIC_subscription_request(RICsubscriptionRequest_t* pdu, void* buffer, size_t buf_size)
-{
-    asn_enc_rval_t encode_result;
-    encode_result = aper_encode_to_buffer(&asn_DEF_RICsubscriptionRequest, NULL, pdu, buffer, buf_size);
-    if(encode_result.encoded == -1) {
-        return -1;
-    }
-    return encode_result.encoded;
-}
-
-RICsubscriptionRequest_t* decode_RIC_subscription_request(const void *buffer, size_t buf_size)
-{
-    asn_dec_rval_t decode_result;
-    RICsubscriptionRequest_t *pdu = 0;
-    decode_result = aper_decode_complete(NULL, &asn_DEF_RICsubscriptionRequest, (void **)&pdu, buffer, buf_size);
-    if(decode_result.code == RC_OK) {
-        return pdu;
-    } else {
-        ASN_STRUCT_FREE(asn_DEF_RICsubscriptionRequest, pdu);
-        return 0;
-    }
-}
-
 long e2ap_get_ric_subscription_request_sequence_number(void *buffer, size_t buf_size)
 {
     E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
     if ( pdu != NULL && pdu->present == E2AP_PDU_PR_initiatingMessage)
     {
         InitiatingMessageE2_t* initiatingMessage = pdu->choice.initiatingMessage;
-        RICsubscriptionRequest_t *ric_subscription_request = &(initiatingMessage->value.choice.RICsubscriptionRequest);
-        for (int i = 0; i < ric_subscription_request->protocolIEs.list.count; ++i )
+        if ( initiatingMessage->procedureCode == ProcedureCode_id_ricSubscription
+            && initiatingMessage->value.present == InitiatingMessageE2__value_PR_RICsubscriptionRequest)
         {
-            if ( ric_subscription_request->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionRequest_t *ric_subscription_request = &(initiatingMessage->value.choice.RICsubscriptionRequest);
+            for (int i = 0; i < ric_subscription_request->protocolIEs.list.count; ++i )
             {
-                return ric_subscription_request->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                if ( ric_subscription_request->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    return ric_subscription_request->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                }
             }
         }
     }
@@ -71,13 +52,17 @@
     if ( pdu != NULL && pdu->present == E2AP_PDU_PR_initiatingMessage)
     {
         InitiatingMessageE2_t* initiatingMessage = pdu->choice.initiatingMessage;
-        RICsubscriptionRequest_t *ric_subscription_request = &(initiatingMessage->value.choice.RICsubscriptionRequest);
-        for (int i = 0; i < ric_subscription_request->protocolIEs.list.count; ++i )
+        if ( initiatingMessage->procedureCode == ProcedureCode_id_ricSubscription
+            && initiatingMessage->value.present == InitiatingMessageE2__value_PR_RICsubscriptionRequest)
         {
-            if ( ric_subscription_request->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionRequest_t *ricSubscriptionRequest = &initiatingMessage->value.choice.RICsubscriptionRequest;
+            for (int i = 0; i < ricSubscriptionRequest->protocolIEs.list.count; ++i )
             {
-                ric_subscription_request->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
-                return encode_E2AP_PDU(pdu, buffer, buf_size);
+                if ( ricSubscriptionRequest->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    ricSubscriptionRequest->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
+                    return encode_E2AP_PDU(pdu, buffer, buf_size);
+                }
             }
         }
     }
@@ -85,40 +70,22 @@
 }
 
 /* RICsubscriptionResponse */
-ssize_t encode_RIC_subscription_response(RICsubscriptionResponse_t* pdu, void* buffer, size_t buf_size)
-{
-    asn_enc_rval_t encode_result;
-    encode_result = aper_encode_to_buffer(&asn_DEF_RICsubscriptionResponse, NULL, pdu, buffer, buf_size);
-    if(encode_result.encoded == -1) {
-        return -1;
-    }
-    return encode_result.encoded;
-}
-
-RICsubscriptionResponse_t* decode_RIC_subscription_response(const void *buffer, size_t buf_size)
-{
-    asn_dec_rval_t decode_result;
-    RICsubscriptionResponse_t *pdu = 0;
-    decode_result = aper_decode_complete(NULL, &asn_DEF_RICsubscriptionResponse, (void **)&pdu, buffer, buf_size);
-    if(decode_result.code == RC_OK) {
-        fprintf(stdout, "decoded bytes: %ld\n", decode_result.consumed);
-        return pdu;
-    } else {
-        ASN_STRUCT_FREE(asn_DEF_RICsubscriptionResponse, pdu);
-        return 0;
-    }
-}
-
 long e2ap_get_ric_subscription_response_sequence_number(void *buffer, size_t buf_size)
 {
-    RICsubscriptionResponse_t *pdu = decode_RIC_subscription_response(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_successfulOutcome )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        SuccessfulOutcomeE2_t* successfulOutcome = pdu->choice.successfulOutcome;
+        if ( successfulOutcome->procedureCode == ProcedureCode_id_ricSubscription
+            && successfulOutcome->value.present == SuccessfulOutcomeE2__value_PR_RICsubscriptionResponse)
         {
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionResponse_t *ricSubscriptionResponse = &successfulOutcome->value.choice.RICsubscriptionResponse;
+            for (int i = 0; i < ricSubscriptionResponse->protocolIEs.list.count; ++i )
             {
-                return pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                if ( ricSubscriptionResponse->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    return ricSubscriptionResponse->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                }
             }
         }
     }
@@ -127,15 +94,21 @@
 
 ssize_t  e2ap_set_ric_subscription_response_sequence_number(void *buffer, size_t buf_size, long sequence_number)
 {
-    RICsubscriptionResponse_t *pdu = decode_RIC_subscription_response(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_successfulOutcome )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        SuccessfulOutcomeE2_t* successfulOutcome = pdu->choice.successfulOutcome;
+        if ( successfulOutcome->procedureCode == ProcedureCode_id_ricSubscription
+            && successfulOutcome->value.present == SuccessfulOutcomeE2__value_PR_RICsubscriptionResponse)
         {
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionResponse_t *ricSubscriptionResponse = &successfulOutcome->value.choice.RICsubscriptionResponse;
+            for (int i = 0; i < ricSubscriptionResponse->protocolIEs.list.count; ++i )
             {
-                pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
-                return encode_RIC_subscription_response(pdu, buffer, buf_size);
+                if ( ricSubscriptionResponse->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    ricSubscriptionResponse->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
+                    return encode_E2AP_PDU(pdu, buffer, buf_size);
+                }
             }
         }
     }
@@ -143,41 +116,22 @@
 }
 
 /* RICsubscriptionDeleteRequest */
-ssize_t encode_RIC_subscription_delete_request(RICsubscriptionDeleteRequest_t* pdu, void* buffer, size_t buf_size)
-{
-    asn_enc_rval_t encode_result;
-    encode_result = aper_encode_to_buffer(&asn_DEF_RICsubscriptionDeleteRequest, NULL, pdu, buffer, buf_size);
-    if(encode_result.encoded == -1) {
-        return -1;
-    }
-    return encode_result.encoded;
-}
-
-RICsubscriptionDeleteRequest_t* decode_RIC_subscription_delete_request(const void *buffer, size_t buf_size)
-{
-    asn_dec_rval_t decode_result;
-    RICsubscriptionDeleteRequest_t *pdu = 0;
-    decode_result = aper_decode_complete(NULL, &asn_DEF_RICsubscriptionDeleteRequest, (void **)&pdu, buffer, buf_size);
-    if(decode_result.code == RC_OK) {
-        fprintf(stdout, "decoded bytes: %ld\n", decode_result.consumed);
-        return pdu;
-    } else {
-        ASN_STRUCT_FREE(asn_DEF_RICsubscriptionDeleteRequest, pdu);
-        return 0;
-    }
-}
-
 long e2ap_get_ric_subscription_delete_request_sequence_number(void *buffer, size_t buf_size)
 {
-    RICsubscriptionDeleteRequest_t *pdu = decode_RIC_subscription_delete_request(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_initiatingMessage )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        InitiatingMessageE2_t* initiatingMessage = pdu->choice.initiatingMessage;
+        if ( initiatingMessage->procedureCode == ProcedureCode_id_ricSubscriptionDelete
+            && initiatingMessage->value.present == InitiatingMessageE2__value_PR_RICsubscriptionDeleteRequest )
         {
-            /* TODO */
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionDeleteRequest_t *subscriptionDeleteRequest = &initiatingMessage->value.choice.RICsubscriptionDeleteRequest;
+            for (int i = 0; i < subscriptionDeleteRequest->protocolIEs.list.count; ++i )
             {
-                return pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                if ( subscriptionDeleteRequest->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    return subscriptionDeleteRequest->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                }
             }
         }
     }
@@ -186,16 +140,21 @@
 
 ssize_t  e2ap_set_ric_subscription_delete_request_sequence_number(void *buffer, size_t buf_size, long sequence_number)
 {
-    RICsubscriptionDeleteRequest_t *pdu = decode_RIC_subscription_delete_request(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_initiatingMessage )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        InitiatingMessageE2_t* initiatingMessage = pdu->choice.initiatingMessage;
+        if ( initiatingMessage->procedureCode == ProcedureCode_id_ricSubscriptionDelete
+            && initiatingMessage->value.present == InitiatingMessageE2__value_PR_RICsubscriptionDeleteRequest )
         {
-            /* TODO */
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionDeleteRequest_t* subscriptionDeleteRequest = &initiatingMessage->value.choice.RICsubscriptionDeleteRequest;
+            for (int i = 0; i < subscriptionDeleteRequest->protocolIEs.list.count; ++i )
             {
-                pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
-                return encode_RIC_subscription_delete_request(pdu, buffer, buf_size);
+                if ( subscriptionDeleteRequest->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    subscriptionDeleteRequest->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
+                    return encode_E2AP_PDU(pdu, buffer, buf_size);
+                }
             }
         }
     }
@@ -203,41 +162,22 @@
 }
 
 /* RICsubscriptionDeleteResponse */
-ssize_t encode_RIC_subscription_delete_response(RICsubscriptionDeleteResponse_t* pdu, void* buffer, size_t buf_size)
-{
-    asn_enc_rval_t encode_result;
-    encode_result = aper_encode_to_buffer(&asn_DEF_RICsubscriptionDeleteResponse, NULL, pdu, buffer, buf_size);
-    if(encode_result.encoded == -1) {
-        return -1;
-    }
-    return encode_result.encoded;
-}
-
-RICsubscriptionDeleteResponse_t* decode_RIC_subscription_delete_response(const void *buffer, size_t buf_size)
-{
-    asn_dec_rval_t decode_result;
-    RICsubscriptionDeleteResponse_t *pdu = 0;
-    decode_result = aper_decode_complete(NULL, &asn_DEF_RICsubscriptionDeleteResponse, (void **)&pdu, buffer, buf_size);
-    if(decode_result.code == RC_OK) {
-        fprintf(stdout, "decoded bytes: %ld\n", decode_result.consumed);
-        return pdu;
-    } else {
-        ASN_STRUCT_FREE(asn_DEF_RICsubscriptionDeleteResponse, pdu);
-        return 0;
-    }
-}
-
 long e2ap_get_ric_subscription_delete_response_sequence_number(void *buffer, size_t buf_size)
 {
-    RICsubscriptionDeleteResponse_t *pdu = decode_RIC_subscription_delete_response(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_successfulOutcome )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        SuccessfulOutcomeE2_t* successfulOutcome = pdu->choice.successfulOutcome;
+        if ( successfulOutcome->procedureCode == ProcedureCode_id_ricSubscriptionDelete
+            && successfulOutcome->value.present == SuccessfulOutcomeE2__value_PR_RICsubscriptionDeleteResponse )
         {
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionDeleteResponse_t* subscriptionDeleteResponse = &successfulOutcome->value.choice.RICsubscriptionDeleteResponse;
+            for (int i = 0; i < subscriptionDeleteResponse->protocolIEs.list.count; ++i )
             {
-                /* TODO */
-                return pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                if ( subscriptionDeleteResponse->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    return subscriptionDeleteResponse->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber;
+                }
             }
         }
     }
@@ -246,16 +186,21 @@
 
 ssize_t  e2ap_set_ric_subscription_delete_response_sequence_number(void *buffer, size_t buf_size, long sequence_number)
 {
-    RICsubscriptionDeleteResponse_t *pdu = decode_RIC_subscription_delete_response(buffer, buf_size);
-    if ( pdu != NULL )
+    E2AP_PDU_t *pdu = decode_E2AP_PDU(buffer, buf_size);
+    if ( pdu != NULL && pdu->present == E2AP_PDU_PR_successfulOutcome )
     {
-        for (int i = 0; i < pdu->protocolIEs.list.count; ++i )
+        SuccessfulOutcomeE2_t* successfulOutcome = pdu->choice.successfulOutcome;
+        if ( successfulOutcome->procedureCode == ProcedureCode_id_ricSubscriptionDelete
+            && successfulOutcome->value.present == SuccessfulOutcomeE2__value_PR_RICsubscriptionDeleteResponse )
         {
-            if ( pdu->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+            RICsubscriptionDeleteResponse_t* subscriptionDeleteResponse;
+            for (int i = 0; i < subscriptionDeleteResponse->protocolIEs.list.count; ++i )
             {
-                /* todo */
-                pdu->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
-                return encode_RIC_subscription_delete_response(pdu, buffer, buf_size);
+                if ( subscriptionDeleteResponse->protocolIEs.list.array[i]->id == ProtocolIE_ID_id_RICrequestID )
+                {
+                    subscriptionDeleteResponse->protocolIEs.list.array[i]->value.choice.RICrequestID.ricRequestSequenceNumber = sequence_number;
+                    return encode_E2AP_PDU(pdu, buffer, buf_size);
+                }
             }
         }
     }
diff --git a/e2ap/wrapper.h b/e2ap/wrapper.h
index 026e676..9bc1549 100644
--- a/e2ap/wrapper.h
+++ b/e2ap/wrapper.h
@@ -7,38 +7,26 @@
 #include "RICsubscriptionDeleteResponse.h"
 #include "E2AP-PDU.h"
 #include "InitiatingMessageE2.h"
+#include "SuccessfulOutcomeE2.h"
 #include "ProtocolIE-Container.h"
 #include "ProtocolIE-Field.h"
 
 size_t encode_E2AP_PDU(E2AP_PDU_t* pdu, void* buffer, size_t buf_size);
 E2AP_PDU_t* decode_E2AP_PDU(const void* buffer, size_t buf_size);
 
-/* RICsubscriptionRequest */
-ssize_t encode_RIC_subscription_request(RICsubscriptionRequest_t* pdu, void* buffer, size_t buf_size);
-RICsubscriptionRequest_t* decode_RIC_subscription_request(const void *buffer, size_t buf_size);
-
 long e2ap_get_ric_subscription_request_sequence_number(void *buffer, size_t buf_size);
 ssize_t  e2ap_set_ric_subscription_request_sequence_number(void *buffer, size_t buf_size, long sequence_number);
 RICsubscription_t* e2ap_get_ric_subscription_request_ric_subscription(void *buffer, size_t buffer_size);
 
 /* RICsubscriptionResponse */
-ssize_t encode_RIC_subscription_response(RICsubscriptionResponse_t* pdu, void* buffer, size_t buf_size);
-RICsubscriptionResponse_t* decode_RIC_subscription_response(const void *buffer, size_t buf_size);
-
 long e2ap_get_ric_subscription_response_sequence_number(void *buffer, size_t buf_size);
 ssize_t  e2ap_set_ric_subscription_response_sequence_number(void *buffer, size_t buf_size, long sequence_number);
 
 /* RICsubscriptionDeleteRequest */
-ssize_t encode_RIC_subscription_delete_request(RICsubscriptionDeleteRequest_t* pdu, void* buffer, size_t buf_size);
-RICsubscriptionDeleteRequest_t* decode_RIC_subscription_delete_request(const void *buffer, size_t buf_size);
-
 long e2ap_get_ric_subscription_delete_request_sequence_number(void *buffer, size_t buf_size);
 ssize_t  e2ap_set_ric_subscription_delete_request_sequence_number(void *buffer, size_t buf_size, long sequence_number);
 
 /* RICsubscriptionDeleteResponse */
-ssize_t encode_RIC_subscription_delete_response(RICsubscriptionDeleteResponse_t* pdu, void* buffer, size_t buf_size);
-RICsubscriptionDeleteResponse_t* decode_RIC_subscription_delete_response(const void *buffer, size_t buf_size);
-
 long e2ap_get_ric_subscription_delete_response_sequence_number(void *buffer, size_t buf_size);
 ssize_t  e2ap_set_ric_subscription_delete_response_sequence_number(void *buffer, size_t buf_size, long sequence_number);
 
diff --git a/pkg/control/client.go b/pkg/control/client.go
index 598c7ef..1c07ad4 100644
--- a/pkg/control/client.go
+++ b/pkg/control/client.go
@@ -32,6 +32,7 @@
 type RtmgrClient struct {
 	rtClient         *rtmgrclient.RoutingManager
 	xappHandleParams *rtmgrhandle.ProvideXappSubscriptionHandleParams
+	xappDeleteParams *rtmgrhandle.DeleteXappSubscriptionHandleParams
 }
 
 func (rc *RtmgrClient) SubscriptionRequestUpdate() error {
@@ -54,6 +55,15 @@
 			xapp.Logger.Info("Succesfully updated routing manager about the subscription: %d", subID)
 			return nil
 		}
+	case DELETE:
+		_, _, deleteErr := rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq))
+		if deleteErr != nil && !(strings.Contains(deleteErr.Error(), "status 200"))  {
+			xapp.Logger.Error("Deleting subscription id = %d  in routing manager, failed with error: %v", subID, deleteErr)
+			return deleteErr
+		} else {
+			xapp.Logger.Info("Succesfully deleted subscription: %d in routing manager.", subID)
+			return nil
+		}
 	default:
 		return nil
 	}
diff --git a/pkg/control/control.go b/pkg/control/control.go
index f6cd771..ec6419e 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -53,17 +53,6 @@
 	EnbID  string
 }
 
-type RMRParams struct {
-	Mtype           int
-	Payload         []byte
-	PayloadLen      int
-	Meid            *RMRMeid
-	Xid             string
-	SubId           int
-	Src             string
-	Mbuf            *C.rmr_mbuf_t
-}
-
 var SEEDSN uint16
 var SubscriptionReqChan = make(chan subRouteInfo, 10)
 
@@ -98,7 +87,8 @@
 	transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
 	client := rtmgrclient.New(transport, strfmt.Default)
 	handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
-	rtmgrClient := RtmgrClient{client, handle}
+	delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+	rtmgrClient := RtmgrClient{client, handle, delete_handle}
 
 	return Control{new(E2ap), registry, &rtmgrClient, tracker}
 }
@@ -115,6 +105,8 @@
 		err = c.handleSubscriptionResponse(rp)
 	case C.RIC_SUB_DEL_REQ:
 		err = c.handleSubscriptionDeleteRequest(rp)
+	case C.RIC_SUB_DEL_RESP:
+		err = c.handleSubscriptionDeleteResponse(rp)
 	default:
 		err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
 	}
@@ -128,6 +120,13 @@
 	return
 }
 
+func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
+	if !xapp.Rmr.Send(params, true) {
+		err = errors.New("rmr.Send() failed")
+	}
+	return
+}
+
 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
 	payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
 	if err != nil {
@@ -153,7 +152,7 @@
 
 	/* Create transatcion records for every subscription request */
 	xact_key := Transaction_key{new_sub_id, CREATE}
-	xact_value := Transaction{*src_addr, *src_port, params.Payload}
+	xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
 	err = c.tracker.Track_transaction(xact_key, xact_value)
 	if err != nil {
 		xapp.Logger.Error("Failed to create a transaction record due to %v", err)
@@ -170,6 +169,7 @@
 
 	xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
 	c.rmrSend(params)
+	xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
 	return
 }
 
@@ -186,7 +186,14 @@
 	}
 	c.registry.setSubscriptionToConfirmed(payload_seq_num)
 	xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
-	c.rmrSend(params)
+	transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
+	if err != nil {
+		xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+		return
+	}
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
+	params.Mbuf = transaction.Mbuf
+	c.rmrReplyToSender(params)
 	return
 }
 
@@ -221,8 +228,47 @@
 	xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
 	if c.registry.IsValidSequenceNumber(payload_seq_num) {
 		c.registry.deleteSubscription(payload_seq_num)
+		trackErr := c.trackDeleteTransaction(params, payload_seq_num)
+		if trackErr != nil {
+			xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+			return trackErr
+		}
 	}
 	xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
 	c.rmrSend(params)
 	return
 }
+
+func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
+	src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
+	xact_key := Transaction_key{payload_seq_num, DELETE}
+	xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
+	err = c.tracker.Track_transaction(xact_key, xact_value)
+	return
+}
+
+func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
+	payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
+	if err != nil {
+		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+		return
+	}
+    var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
+	sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
+	go c.rtmgrClient.SubscriptionRequestUpdate()
+	SubscriptionReqChan <- sub_route_action
+
+	xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
+	if c.registry.releaseSequenceNumber(payload_seq_num) {
+		transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
+		if err != nil {
+			xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+			return
+		}
+		xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
+		//params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
+		params.Mbuf = transaction.Mbuf
+		c.rmrReplyToSender(params)
+	}
+	return
+}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index c349921..2b04f38 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -71,6 +71,11 @@
 }
 
 //This function releases the given id as unused in the register
-//func (r *Registry) releaseSequenceNumber(sn uint16) {
-//	delete(r.register, sn)
-//}
\ No newline at end of file
+func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+	if r.register[sn] {
+		return false
+	} else {
+		delete(r.register, sn)
+		return true
+	}
+}
\ No newline at end of file
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index fdbbeaf..e08f8db 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -21,7 +21,6 @@
 
 import (
 	"fmt"
-//	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 )
 
 /*
@@ -53,6 +52,21 @@
 Retreives the transaction table entry for the given request.
 Returns error in case the transaction cannot be found.
 */
+func (t *Tracker) Update_transaction(SubID uint16, trans_type Action, xact Transaction) error{
+	key := Transaction_key{SubID, trans_type}
+	if _, ok := t.transaction_table[key]; ok {
+		// TODO: Implement merge related check here. If the key is same but the value is different.
+		err := fmt.Errorf("Transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.trans_type )
+		return err
+	}
+	t.transaction_table[key] = xact
+	return nil
+}
+
+/*
+Retreives the transaction table entry for the given request.
+Returns error in case the transaction cannot be found.
+*/
 func (t *Tracker) Retrive_transaction(subID uint16, act Action) (Transaction, error){
 	key := Transaction_key{subID, act}
 	var xact Transaction
@@ -67,14 +81,13 @@
 Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference.
 Returns error in case the transaction cannot be found.
 */
-func (t *Tracker) complete_transaction(subID uint16, act Action) (string, uint16, error){
+func (t *Tracker) complete_transaction(subID uint16, act Action) (Transaction, error){
 	key := Transaction_key{subID, act}
-	var empty_address string
-	var empty_port uint16
+	var empty_transaction Transaction
 	if xact, ok := t.transaction_table[key]; ok {
 		delete(t.transaction_table, key)
-		return xact.Xapp_instance_address, xact.Xapp_port, nil
+		return xact, nil
 	}
 	err := fmt.Errorf("Tranaction record for Subscription ID %d and action %s does not exist", subID, act)
-	return empty_address, empty_port, err
+	return empty_transaction, err
 }
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 1a2c92f..d12233c 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -19,6 +19,10 @@
 
 package control
 
+import (
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
 type RmrDatagram struct {
 	MessageType    int
 	SubscriptionId uint16
@@ -44,4 +48,5 @@
 	Xapp_instance_address string
 	Xapp_port             uint16
 	Ric_sub_req           []byte
+	Mbuf                  *xapp.RMRMbuf
 }
diff --git a/test/e2t/container/Dockerfile b/test/e2t/container/Dockerfile
index 0f726e7..7b9b3da 100644
--- a/test/e2t/container/Dockerfile
+++ b/test/e2t/container/Dockerfile
@@ -77,4 +77,4 @@
 
 RUN ldconfig
 
-RUN chmod 755 /run_e2t.sh
+RUN chmod 755 /run_e2t.sh
\ No newline at end of file
diff --git a/test/rco/container/Dockerfile b/test/rco/container/Dockerfile
index 328c85a..557afd2 100644
--- a/test/rco/container/Dockerfile
+++ b/test/rco/container/Dockerfile
@@ -76,4 +76,4 @@
 
 RUN ldconfig
 RUN chmod 755 /run_rco.sh
-RUN chmod 755 /rco
+RUN chmod 755 /rco
\ No newline at end of file
diff --git a/test/rco/manifests/rco-dep.yaml b/test/rco/manifests/rco-dep.yaml
index 25926b8..871c3a8 100644
--- a/test/rco/manifests/rco-dep.yaml
+++ b/test/rco/manifests/rco-dep.yaml
@@ -37,7 +37,7 @@
     spec:
       containers:
       - name: rco
-        image: rco:builder
+        image: jenkins:5000/rco:test
         command: ["/run_rco.sh"]
         env:
         - name: DBAAS_SERVICE_HOST
diff --git a/test/rco/rco.go b/test/rco/rco.go
index 6ba36d0..21062eb 100644
--- a/test/rco/rco.go
+++ b/test/rco/rco.go
@@ -59,7 +59,11 @@
 	if SEEDSN == 0 || SEEDSN > 65535 {
 		SEEDSN = 12345
 	}
-	DELETESEEDSN = SEEDSN
+	DELETESEEDSN = uint16(viper.GetInt("delete_seed_sn"))
+	if DELETESEEDSN == 0 || DELETESEEDSN > 65535 {
+		DELETESEEDSN = SEEDSN
+	}
+
 	xapp.Logger.Info("Initial SEQUENCE NUMBER: %v", SEEDSN)
 }
 
@@ -77,9 +81,7 @@
 	if err != nil {
 		return make([]byte, 0), errors.New("Unable to decode data provided in RCO_DELETE RAWDATA environment variable")
 	}
-	xapp.Logger.Info("SetSubscriptionDeleteRequestSequenceNumber1")
 	payload, err = r.SetSubscriptionDeleteRequestSequenceNumber(skeleton, sub_id)
-	xapp.Logger.Info("SetSubscriptionDeleteRequestSequenceNumber2")
 	return
 }
 
@@ -106,6 +108,7 @@
 	for {
 		time.Sleep(2 * time.Second)
 		c <- submgr.RmrDatagram{12010, SEEDSN, message}
+		SEEDSN++
 		time.Sleep(2 * time.Second)
 		c <- submgr.RmrDatagram{12020, DELETESEEDSN, deletemessage}
 		DELETESEEDSN++
@@ -118,7 +121,7 @@
 		message := <-c
 		payload_seq_num, err := r.GetSubscriptionRequestSequenceNumber(message.Payload)
 		if err != nil {
-			xapp.Logger.Debug("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())	
+			xapp.Logger.Debug("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
 		}
 		params.SubId = int(message.SubscriptionId)
 		params.Mtype = message.MessageType
diff --git a/tmp/rmr.go b/tmp/rmr.go
new file mode 100644
index 0000000..49cdf41
--- /dev/null
+++ b/tmp/rmr.go
@@ -0,0 +1,284 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+/*
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <rmr/rmr.h>
+#include <rmr/RIC_message_types.h>
+
+void write_bytes_array(unsigned char *dst, void *data, int len) {
+    memcpy((void *)dst, (void *)data, len);
+}
+
+#cgo CFLAGS: -I../
+#cgo LDFLAGS: -lrmr_nng -lnng
+*/
+import "C"
+
+import (
+	"github.com/spf13/viper"
+	"strconv"
+	"strings"
+	"time"
+	"unsafe"
+)
+
+type RMRMbuf C.rmr_mbuf_t
+
+var RMRCounterOpts = []CounterOpts{
+	{Name: "Transmitted", Help: "The total number of transmited RMR messages"},
+	{Name: "Received", Help: "The total number of received RMR messages"},
+	{Name: "TransmitError", Help: "The total number of RMR transmission errors"},
+	{Name: "ReceiveError", Help: "The total number of RMR receive errors"},
+}
+
+type RMRParams struct {
+	Mtype 		int
+	Payload 	[]byte
+	PayloadLen 	int
+	Meid 		*RMRMeid
+	Xid  		string
+	SubId  		int
+	Src  		string
+	Mbuf 		*RMRMbuf
+}
+
+func NewRMRClient() *RMRClient {
+	p := C.CString(viper.GetString("rmr.protPort"))
+	m := C.int(viper.GetInt("rmr.maxSize"))
+	defer C.free(unsafe.Pointer(p))
+
+	ctx := C.rmr_init(p, m, C.int(0))
+	if ctx == nil {
+		Logger.Error("rmrClient: Initializing RMR context failed, bailing out!")
+	}
+
+	return &RMRClient{
+		context:   ctx,
+		consumers: make([]MessageConsumer, 0),
+		stat:      Metric.RegisterCounterGroup(RMRCounterOpts, "RMR"),
+	}
+}
+
+func (m *RMRClient) Start(c MessageConsumer) {
+	if c != nil {
+		m.consumers = append(m.consumers, c)
+	}
+
+	for {
+		Logger.Info("rmrClient: Waiting for RMR to be ready ...")
+
+		if m.ready = int(C.rmr_ready(m.context)); m.ready == 1 {
+			break
+		}
+		time.Sleep(10 * time.Second)
+	}
+	m.wg.Add(viper.GetInt("rmr.numWorkers"))
+
+	if m.readyCb != nil {
+		go m.readyCb(m.readyCbParams)
+	}
+
+	for w := 0; w < viper.GetInt("rmr.numWorkers"); w++ {
+		go m.Worker("worker-"+strconv.Itoa(w), 0)
+	}
+	m.Wait()
+}
+
+func (m *RMRClient) Worker(taskName string, msgSize int) {
+	p := viper.GetString("rmr.protPort")
+	Logger.Info("rmrClient: '%s': receiving messages on [%s]", taskName, p)
+
+	defer m.wg.Done()
+	for {
+		rxBuffer := (*RMRMbuf)(C.rmr_rcv_msg(m.context, nil))
+		if rxBuffer == nil {
+			m.UpdateStatCounter("ReceiveError")
+			continue
+		}
+		m.UpdateStatCounter("Received")
+
+		go m.parseMessage(rxBuffer)
+	}
+}
+
+func (m *RMRClient) parseMessage(rxBuffer *RMRMbuf) {
+	if len(m.consumers) == 0 {
+		Logger.Info("rmrClient: No message handlers defined, message discarded!")
+		return
+	}
+
+	params := &RMRParams{}
+	params.Mbuf = rxBuffer
+	params.Mtype = int(rxBuffer.mtype)
+	params.SubId = int(rxBuffer.sub_id)
+	params.Meid = &RMRMeid{}
+
+	meidBuf := make([]byte, int(C.RMR_MAX_MEID))
+	if meidCstr := C.rmr_get_meid((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&meidBuf[0]))); meidCstr != nil {
+		params.Meid.PlmnID = strings.TrimRight(string(meidBuf[0:16]), "\000")
+		params.Meid.EnbID = strings.TrimRight(string(meidBuf[16:32]), "\000")
+	}
+
+	xidBuf := make([]byte, int(C.RMR_MAX_XID))
+	if xidCstr := C.rmr_get_xact((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&xidBuf[0]))); xidCstr != nil {
+		params.Xid = strings.TrimRight(string(xidBuf[0:32]), "\000")
+	}
+
+	srcBuf := make([]byte, int(C.RMR_MAX_SRC))
+	if srcStr := C.rmr_get_src((*C.rmr_mbuf_t)(rxBuffer), (*C.uchar)(unsafe.Pointer(&srcBuf[0]))); srcStr != nil {
+		params.Src = strings.TrimRight(string(srcBuf[0:64]), "\000")
+	}
+
+	for _, c := range m.consumers {
+		cptr := unsafe.Pointer(rxBuffer.payload)
+		params.Payload = C.GoBytes(cptr, C.int(rxBuffer.len))
+		params.PayloadLen = int(rxBuffer.len)
+
+		err := c.Consume(params)
+		if err != nil {
+			Logger.Warn("rmrClient: Consumer returned error: %v", err)
+		}
+	}
+}
+
+func (m *RMRClient) Allocate() *RMRMbuf {
+	buf := C.rmr_alloc_msg(m.context, 0)
+	if buf == nil {
+		Logger.Error("rmrClient: Allocating message buffer failed!")
+	}
+
+	return (*RMRMbuf)(buf)
+}
+
+func (m *RMRClient) SendMsg(params *RMRParams) bool {
+	return m.Send(params, false)
+}
+
+func (m *RMRClient) SendRts(params *RMRParams) bool {
+	return m.Send(params, true)
+}
+
+func (m *RMRClient) Send(params *RMRParams, isRts bool) bool {
+	buf := params.Mbuf
+	if buf == nil {
+		buf = m.Allocate()
+	}
+
+	buf.mtype = C.int(params.Mtype)
+	buf.sub_id = C.int(params.SubId)
+	buf.len = C.int(len(params.Payload))
+	datap := C.CBytes(params.Payload)
+	defer C.free(datap)
+
+	if params != nil {
+		if params.Meid != nil {
+			b := make([]byte, int(C.RMR_MAX_MEID))
+			copy(b, []byte(params.Meid.PlmnID))
+			copy(b[16:], []byte(params.Meid.EnbID))
+			C.rmr_bytes2meid((*C.rmr_mbuf_t)(buf), (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
+		}
+		xidLen := len(params.Xid)
+		if xidLen > 0 && xidLen <= C.RMR_MAX_XID {
+			b := make([]byte, int(C.RMR_MAX_MEID))
+			copy(b, []byte(params.Xid))
+			C.rmr_bytes2xact((*C.rmr_mbuf_t)(buf), (*C.uchar)(unsafe.Pointer(&b[0])), C.int(len(b)))
+		}
+	}
+	C.write_bytes_array(buf.payload, datap, buf.len)
+
+	return m.SendBuf(buf, isRts)
+}
+
+func (m *RMRClient) SendBuf(txBuffer *RMRMbuf, isRts bool) bool {
+	for i := 0; i < 10; i++ {
+		txBuffer.state = 0
+		if isRts {
+			txBuffer = (*RMRMbuf)(C.rmr_rts_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
+		} else {
+			txBuffer = (*RMRMbuf)(C.rmr_send_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
+		}
+
+		if txBuffer == nil {
+			break
+		} else if txBuffer.state != C.RMR_OK {
+			if txBuffer.state != C.RMR_ERR_RETRY {
+				time.Sleep(100 * time.Microsecond)
+				m.UpdateStatCounter("TransmitError")
+			}
+			for j := 0; j < 100 && txBuffer.state == C.RMR_ERR_RETRY; j++ {
+				txBuffer = (*RMRMbuf)(C.rmr_send_msg(m.context, (*C.rmr_mbuf_t)(txBuffer)))
+			}
+		}
+
+		if txBuffer.state == C.RMR_OK {
+			m.UpdateStatCounter("Transmitted")
+			return true
+		}
+	}
+	m.UpdateStatCounter("TransmitError")
+	return false
+}
+
+func (m *RMRClient) UpdateStatCounter(name string) {
+	m.mux.Lock()
+	m.stat[name].Inc()
+	m.mux.Unlock()
+}
+
+func (m *RMRClient) RegisterMetrics() {
+	m.stat = Metric.RegisterCounterGroup(RMRCounterOpts, "RMR")
+}
+
+func (m *RMRClient) Wait() {
+	m.wg.Wait()
+}
+
+func (m *RMRClient) IsReady() bool {
+	return m.ready != 0
+}
+
+func (m *RMRClient) SetReadyCB(cb ReadyCB, params interface{}) {
+	m.readyCb = cb
+	m.readyCbParams = params
+}
+
+func (m *RMRClient) GetRicMessageId(name string) (int, bool) {
+	id, ok := RICMessageTypes[name]
+	return id, ok
+}
+
+func (m *RMRClient) GetRicMessageName(id int) (s string) {
+	for k, v := range RICMessageTypes {
+		if id == v {
+			return k
+		}
+	}
+	return
+}
+
+// To be removed ...
+func (m *RMRClient) GetStat() (r RMRStatistics) {
+	return
+}