Added duplicate detection changes

Change-Id: I85582e38a4bbe6c722a59439da30784f8039de3d
Signed-off-by: Markku Virtanen <markku.virtanen@nokia.com>
diff --git a/pkg/control/duplicate.go b/pkg/control/duplicate.go
index c6d0c8c..6900941 100644
--- a/pkg/control/duplicate.go
+++ b/pkg/control/duplicate.go
@@ -36,64 +36,141 @@
 }
 
 type duplicateCtrl struct {
-	mutex      sync.Mutex
-	retransMap map[string]retransEntry
-	collCount  int
+	mutex              sync.Mutex
+	ongoingRequestMap  map[string]retransEntry
+	previousRequestMap map[string]string
+	collCount          int
 }
 
 func (d *duplicateCtrl) Init() {
-	d.retransMap = make(map[string]retransEntry)
+	d.ongoingRequestMap = make(map[string]retransEntry)
+	d.previousRequestMap = make(map[string]string)
 }
 
-func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, payload interface{}) (error, bool, string) {
+func (d *duplicateCtrl) SetMd5sumFromLastOkRequest(restSubsId string, md5sum string) {
 
+	d.mutex.Lock()
+	defer d.mutex.Unlock()
+
+	if md5sum == "" {
+		xapp.Logger.Error("Attempt to store empty md5sum for restubsId %s retransmission map skipped", restSubsId)
+		return
+	}
+
+	d.removeOngoingTransaction(md5sum)
+
+	prevRestSubsId, exists := d.previousRequestMap[md5sum]
+
+	if exists {
+		if prevRestSubsId != restSubsId {
+			xapp.Logger.Error("Storing md5sum for a processed request for restSubsId %s md5sum %s over a previous restSubsId %s", restSubsId, md5sum, prevRestSubsId)
+		} else {
+			return
+		}
+	} else {
+		xapp.Logger.Debug("Storing md5sum for a processed request for restSubsId %s md5sum %s", restSubsId, md5sum)
+	}
+
+	d.previousRequestMap[md5sum] = restSubsId
+}
+
+func (d *duplicateCtrl) GetLastKnownRestSubsIdBasedOnMd5sum(md5sum string) (string, bool) {
+
+	d.mutex.Lock()
+	defer d.mutex.Unlock()
+
+	if md5sum == "" {
+		return "", false
+	}
+
+	m, e := d.previousRequestMap[md5sum]
+
+	return m, e
+}
+
+func (d *duplicateCtrl) DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum string) {
+
+	d.mutex.Lock()
+	defer d.mutex.Unlock()
+
+	restSubsId, exists := d.previousRequestMap[md5sum]
+
+	if !exists {
+		if md5sum == "" {
+			xapp.Logger.Info("Attempted to delete a cached md5sum, md5sum not set yet")
+		} else {
+			xapp.Logger.Error("Attempted to delete a cached md5sum %s, but the value was not found", md5sum)
+		}
+	} else {
+		xapp.Logger.Debug("Deleted a cached md5sum %s for restSubsId %s", md5sum, restSubsId)
+		delete(d.previousRequestMap, md5sum)
+	}
+}
+
+func CalculateRequestMd5sum(payload interface{}) (string, error) {
 	var data bytes.Buffer
 	enc := gob.NewEncoder(&data)
 
 	if err := enc.Encode(payload); err != nil {
-		xapp.Logger.Error("Failed to encode %v\n", payload)
-		return err, false, ""
+		xapp.Logger.Error("%s", err.Error())
+		return "", err
 	}
 
 	hash := md5.Sum(data.Bytes())
 
-	md5sum := hex.EncodeToString(hash[:])
+	return hex.EncodeToString(hash[:]), nil
+}
+
+func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, md5sum string) bool {
+
+	if md5sum == "" {
+		return false
+	}
 
 	d.mutex.Lock()
 	defer d.mutex.Unlock()
 
-	entry, present := d.retransMap[md5sum]
+	entry, present := d.ongoingRequestMap[md5sum]
 
 	if present {
-		xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with MD5SUM : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC))
+		xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with md5sum : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC))
 		d.collCount++
-		return nil, true, md5sum
+		return true
 	}
 
 	entry = retransEntry{restSubsId: restSubsId, startTime: time.Now()}
 
-	xapp.Logger.Debug("Added Md5SUM %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime)
+	xapp.Logger.Debug("No collision detected against ongoing transaction. Added md5sum %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime)
 
-	d.retransMap[md5sum] = entry
+	d.ongoingRequestMap[md5sum] = entry
 
-	return nil, false, md5sum
+	return false
 }
 
 func (d *duplicateCtrl) TransactionComplete(md5sum string) error {
 
+	if md5sum == "" {
+		return nil
+	}
+
 	d.mutex.Lock()
 	defer d.mutex.Unlock()
 
-	entry, present := d.retransMap[md5sum]
+	return d.removeOngoingTransaction(md5sum)
+}
+
+func (d *duplicateCtrl) removeOngoingTransaction(md5sum string) error {
+
+	entry, present := d.ongoingRequestMap[md5sum]
 
 	if !present {
-		xapp.Logger.Error("MD5SUM : %s NOT found from table (%v)\n", md5sum, entry)
-		return fmt.Errorf("Retransmission entry not found for MD5SUM %s", md5sum)
+		xapp.Logger.Error("md5sum : %s NOT found from retransmission table", md5sum)
+		return fmt.Errorf("Retransmission entry not found for md5sum %s", md5sum)
 	}
 
-	xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, MD5SUM : %s\n", entry.restSubsId, md5sum)
+	xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, md5sum : %s\n", entry.restSubsId, md5sum)
 
-	delete(d.retransMap, md5sum)
+	delete(d.ongoingRequestMap, md5sum)
 
 	return nil
 }