Added duplicate detection changes

Change-Id: I85582e38a4bbe6c722a59439da30784f8039de3d
Signed-off-by: Markku Virtanen <markku.virtanen@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index c74b1a5..4df7676 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -46,8 +46,12 @@
 	var retval string = ""
 	var filler string = ""
 	for _, entry := range entries {
-		retval += filler + entry.String()
-		filler = " "
+		if entry != nil {
+			retval += filler + entry.String()
+			filler = " "
+		} else {
+			retval += filler + "(NIL)"
+		}
 	}
 	if err != nil {
 		retval += filler + "err(" + err.Error() + ")"
@@ -240,6 +244,63 @@
 //-------------------------------------------------------------------
 //
 //-------------------------------------------------------------------
+func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
+
+	var restSubId string
+	var restSubscription *RESTSubscription
+	var err error
+
+	prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
+	if p.SubscriptionID == "" {
+		if exists {
+			restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
+			if restSubscription != nil {
+				restSubId = prevRestSubsId
+				if err == nil {
+					xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
+				} else {
+					xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
+				}
+			} else {
+				xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
+				restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
+			}
+		}
+
+		if restSubscription == nil {
+			restSubId = ksuid.New().String()
+			restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
+			if err != nil {
+				xapp.Logger.Error("%s", err.Error())
+				c.UpdateCounter(cRestSubFailToXapp)
+				return nil, "", err
+			}
+		}
+	} else {
+		restSubId = p.SubscriptionID
+
+		xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
+
+		restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
+		if err != nil {
+			xapp.Logger.Error("%s", err.Error())
+			c.UpdateCounter(cRestSubFailToXapp)
+			return nil, "", err
+		}
+
+		if !exists {
+			xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
+		} else {
+			xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
+		}
+	}
+
+	return restSubscription, restSubId, nil
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
 
 	c.CntRecvMsg++
@@ -264,25 +325,16 @@
 		c.UpdateCounter(cRestSubFailToXapp)
 		return nil, err
 	}
-	var restSubId string
-	var restSubscription *RESTSubscription
-	if p.SubscriptionID == "" {
-		restSubId = ksuid.New().String()
-		restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
-		if err != nil {
-			xapp.Logger.Error("%s", err.Error())
-			c.UpdateCounter(cRestSubFailToXapp)
-			return nil, err
-		}
 
-	} else {
-		restSubId = p.SubscriptionID
-		restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
-		if err != nil {
-			xapp.Logger.Error("%s", err.Error())
-			c.UpdateCounter(cRestSubFailToXapp)
-			return nil, err
-		}
+	md5sum, err := CalculateRequestMd5sum(params)
+	if err != nil {
+		xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
+	}
+
+	restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
+	if err != nil {
+		xapp.Logger.Error("Failed to get/allocate REST subscription")
+		return nil, err
 	}
 
 	subResp.SubscriptionID = &restSubId
@@ -290,103 +342,38 @@
 	err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
 	if err != nil {
 		xapp.Logger.Error("%s", err.Error())
+		restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
 		c.registry.DeleteRESTSubscription(&restSubId)
 		c.UpdateCounter(cRestSubFailToXapp)
 		return nil, err
 	}
 
-	err, duplicate, md5sum := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, params)
-
-	if err != nil {
-		// We were unable to detect whether this request was duplicate or not, proceed
-		xapp.Logger.Info("%s - proceeding with the request", err.Error())
-	} else {
-		if duplicate {
-			if *p.SubscriptionDetails[0].ActionToBeSetupList[0].ActionType == "report" {
-				xapp.Logger.Info("Retransmission blocker dropped for report typer of request")
-				c.UpdateCounter(cRestSubRespToXapp)
-				return &subResp, nil
-			}
-		}
-		restSubscription.Md5sum = md5sum
+	duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
+	if duplicate {
+		xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
+		c.UpdateCounter(cRestSubRespToXapp)
+		return &subResp, nil
 	}
 
-	c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
-
-	go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint)
+	go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
 
 	c.UpdateCounter(cRestSubRespToXapp)
 	return &subResp, nil
 }
 
-func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
-	clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
-
-	// Send notification to xApp that prosessing of a Subscription Request has failed.
-	e2EventInstanceID := (int64)(0)
-	errorCause := err.Error()
-	resp := &models.SubscriptionResponse{
-		SubscriptionID: restSubId,
-		SubscriptionInstances: []*models.SubscriptionInstance{
-			&models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
-				ErrorCause:          &errorCause,
-				XappEventInstanceID: &xAppEventInstanceID},
-		},
-	}
-	// Mark REST subscription request processed.
-	restSubscription.SetProcessed()
-	c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
-	if trans != nil {
-		xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
-			errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
-	} else {
-		xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
-			errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
-	}
-
-	c.UpdateCounter(cRestSubFailNotifToXapp)
-	xapp.Subscription.Notify(resp, *clientEndpoint)
-}
-
-func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
-	clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
-
-	// Store successfully processed InstanceId for deletion
-	restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
-	restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
-
-	// Send notification to xApp that a Subscription Request has been processed.
-	resp := &models.SubscriptionResponse{
-		SubscriptionID: restSubId,
-		SubscriptionInstances: []*models.SubscriptionInstance{
-			&models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
-				ErrorCause:          nil,
-				XappEventInstanceID: &xAppEventInstanceID},
-		},
-	}
-	// Mark REST subscription request processesd.
-	restSubscription.SetProcessed()
-	c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
-	xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
-		clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
-
-	c.UpdateCounter(cRestSubNotifToXapp)
-	xapp.Subscription.Notify(resp, *clientEndpoint)
-}
-
 //-------------------------------------------------------------------
 //
 //-------------------------------------------------------------------
 
 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
-	clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string) {
+	clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
 
 	xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
 
 	var xAppEventInstanceID int64
 	var e2EventInstanceID int64
 
-	defer restDuplicateCtrl.TransactionComplete(restSubscription.Md5sum)
+	defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
 
 	for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
 		subReqMsg := subReqList.E2APSubscriptionRequests[index]
@@ -402,13 +389,17 @@
 
 		xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
 
-		subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
+		subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
+
+		xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
+
 		if err != nil {
 			c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
 		} else {
 			e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
-			xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
-				index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
+			restSubscription.AddMd5Sum(md5sum)
+			xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+				index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
 			c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
 		}
 		trans.Release()
@@ -419,7 +410,7 @@
 //
 //------------------------------------------------------------------
 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
-	restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
+	restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
 
 	err := c.tracker.Track(trans)
 	if err != nil {
@@ -465,6 +456,67 @@
 //-------------------------------------------------------------------
 //
 //-------------------------------------------------------------------
+func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
+	clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
+
+	// Send notification to xApp that prosessing of a Subscription Request has failed.
+	e2EventInstanceID := (int64)(0)
+	errorCause := err.Error()
+	resp := &models.SubscriptionResponse{
+		SubscriptionID: restSubId,
+		SubscriptionInstances: []*models.SubscriptionInstance{
+			&models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
+				ErrorCause:          &errorCause,
+				XappEventInstanceID: &xAppEventInstanceID},
+		},
+	}
+	// Mark REST subscription request processed.
+	restSubscription.SetProcessed(err)
+	c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
+	if trans != nil {
+		xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+			errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
+	} else {
+		xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
+			errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
+	}
+
+	c.UpdateCounter(cRestSubFailNotifToXapp)
+	xapp.Subscription.Notify(resp, *clientEndpoint)
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
+	clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
+
+	// Store successfully processed InstanceId for deletion
+	restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
+	restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
+
+	// Send notification to xApp that a Subscription Request has been processed.
+	resp := &models.SubscriptionResponse{
+		SubscriptionID: restSubId,
+		SubscriptionInstances: []*models.SubscriptionInstance{
+			&models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
+				ErrorCause:          nil,
+				XappEventInstanceID: &xAppEventInstanceID},
+		},
+	}
+	// Mark REST subscription request processesd.
+	restSubscription.SetProcessed(nil)
+	c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
+	xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+		clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
+
+	c.UpdateCounter(cRestSubNotifToXapp)
+	xapp.Subscription.Notify(resp, *clientEndpoint)
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
 
 	c.CntRecvMsg++
@@ -492,6 +544,7 @@
 
 	xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
 	go func() {
+		xapp.Logger.Info("Deleteting instances = %v", restSubscription.InstanceIds)
 		for _, instanceId := range restSubscription.InstanceIds {
 			xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
 
@@ -503,6 +556,7 @@
 			restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
 			restSubscription.DeleteE2InstanceId(instanceId)
 		}
+		restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
 		c.registry.DeleteRESTSubscription(&restSubId)
 		c.RemoveRESTSubscriptionFromDb(restSubId)
 	}()
@@ -828,14 +882,14 @@
 		case *e2ap.E2APSubscriptionFailure:
 			removeSubscriptionFromDb = true
 			subRfMsg, valid = subs.SetCachedResponse(event, false)
-			xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
+			xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
 			c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
 		case *SubmgrRestartTestEvent:
 			// This simulates that no response has been received and after restart subscriptions are restored from db
 			xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
 			return
 		default:
-			xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
+			xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
 			removeSubscriptionFromDb = true
 			subRfMsg, valid = subs.SetCachedResponse(nil, false)
 			c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)