Added duplicate detection changes
Change-Id: I85582e38a4bbe6c722a59439da30784f8039de3d
Signed-off-by: Markku Virtanen <markku.virtanen@nokia.com>
diff --git a/pkg/control/duplicate.go b/pkg/control/duplicate.go
index c6d0c8c..6900941 100644
--- a/pkg/control/duplicate.go
+++ b/pkg/control/duplicate.go
@@ -36,64 +36,141 @@
}
type duplicateCtrl struct {
- mutex sync.Mutex
- retransMap map[string]retransEntry
- collCount int
+ mutex sync.Mutex
+ ongoingRequestMap map[string]retransEntry
+ previousRequestMap map[string]string
+ collCount int
}
func (d *duplicateCtrl) Init() {
- d.retransMap = make(map[string]retransEntry)
+ d.ongoingRequestMap = make(map[string]retransEntry)
+ d.previousRequestMap = make(map[string]string)
}
-func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, payload interface{}) (error, bool, string) {
+func (d *duplicateCtrl) SetMd5sumFromLastOkRequest(restSubsId string, md5sum string) {
+ d.mutex.Lock()
+ defer d.mutex.Unlock()
+
+ if md5sum == "" {
+ xapp.Logger.Error("Attempt to store empty md5sum for restubsId %s retransmission map skipped", restSubsId)
+ return
+ }
+
+ d.removeOngoingTransaction(md5sum)
+
+ prevRestSubsId, exists := d.previousRequestMap[md5sum]
+
+ if exists {
+ if prevRestSubsId != restSubsId {
+ xapp.Logger.Error("Storing md5sum for a processed request for restSubsId %s md5sum %s over a previous restSubsId %s", restSubsId, md5sum, prevRestSubsId)
+ } else {
+ return
+ }
+ } else {
+ xapp.Logger.Debug("Storing md5sum for a processed request for restSubsId %s md5sum %s", restSubsId, md5sum)
+ }
+
+ d.previousRequestMap[md5sum] = restSubsId
+}
+
+func (d *duplicateCtrl) GetLastKnownRestSubsIdBasedOnMd5sum(md5sum string) (string, bool) {
+
+ d.mutex.Lock()
+ defer d.mutex.Unlock()
+
+ if md5sum == "" {
+ return "", false
+ }
+
+ m, e := d.previousRequestMap[md5sum]
+
+ return m, e
+}
+
+func (d *duplicateCtrl) DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum string) {
+
+ d.mutex.Lock()
+ defer d.mutex.Unlock()
+
+ restSubsId, exists := d.previousRequestMap[md5sum]
+
+ if !exists {
+ if md5sum == "" {
+ xapp.Logger.Info("Attempted to delete a cached md5sum, md5sum not set yet")
+ } else {
+ xapp.Logger.Error("Attempted to delete a cached md5sum %s, but the value was not found", md5sum)
+ }
+ } else {
+ xapp.Logger.Debug("Deleted a cached md5sum %s for restSubsId %s", md5sum, restSubsId)
+ delete(d.previousRequestMap, md5sum)
+ }
+}
+
+func CalculateRequestMd5sum(payload interface{}) (string, error) {
var data bytes.Buffer
enc := gob.NewEncoder(&data)
if err := enc.Encode(payload); err != nil {
- xapp.Logger.Error("Failed to encode %v\n", payload)
- return err, false, ""
+ xapp.Logger.Error("%s", err.Error())
+ return "", err
}
hash := md5.Sum(data.Bytes())
- md5sum := hex.EncodeToString(hash[:])
+ return hex.EncodeToString(hash[:]), nil
+}
+
+func (d *duplicateCtrl) IsDuplicateToOngoingTransaction(restSubsId string, md5sum string) bool {
+
+ if md5sum == "" {
+ return false
+ }
d.mutex.Lock()
defer d.mutex.Unlock()
- entry, present := d.retransMap[md5sum]
+ entry, present := d.ongoingRequestMap[md5sum]
if present {
- xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with MD5SUM : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC))
+ xapp.Logger.Info("Collision detected. REST subs ID %s has ongoing transaction with md5sum : %s started at %s\n", entry.restSubsId, md5sum, entry.startTime.Format(time.ANSIC))
d.collCount++
- return nil, true, md5sum
+ return true
}
entry = retransEntry{restSubsId: restSubsId, startTime: time.Now()}
- xapp.Logger.Debug("Added Md5SUM %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime)
+ xapp.Logger.Debug("No collision detected against ongoing transaction. Added md5sum %s for restSubsId %s at %s\n", md5sum, entry.restSubsId, entry.startTime)
- d.retransMap[md5sum] = entry
+ d.ongoingRequestMap[md5sum] = entry
- return nil, false, md5sum
+ return false
}
func (d *duplicateCtrl) TransactionComplete(md5sum string) error {
+ if md5sum == "" {
+ return nil
+ }
+
d.mutex.Lock()
defer d.mutex.Unlock()
- entry, present := d.retransMap[md5sum]
+ return d.removeOngoingTransaction(md5sum)
+}
+
+func (d *duplicateCtrl) removeOngoingTransaction(md5sum string) error {
+
+ entry, present := d.ongoingRequestMap[md5sum]
if !present {
- xapp.Logger.Error("MD5SUM : %s NOT found from table (%v)\n", md5sum, entry)
- return fmt.Errorf("Retransmission entry not found for MD5SUM %s", md5sum)
+ xapp.Logger.Error("md5sum : %s NOT found from retransmission table", md5sum)
+ return fmt.Errorf("Retransmission entry not found for md5sum %s", md5sum)
}
- xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, MD5SUM : %s\n", entry.restSubsId, md5sum)
+ xapp.Logger.Debug("Releasing transaction duplicate blocker for %s, md5sum : %s\n", entry.restSubsId, md5sum)
- delete(d.retransMap, md5sum)
+ delete(d.ongoingRequestMap, md5sum)
return nil
}