General capabilities for xapps

- rmrendpoint helper structs
- rmr SendRetry to retry sending for given time (seconds).
  Tries every 500ms until given timeout is reached or message was successfully sent
- few String() functions

Change-Id: I6bb163d85bb35a5375cd0b053d8968ebb0042bc3
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/ci/Dockerfile b/ci/Dockerfile
index 01e768d..a37b995 100755
--- a/ci/Dockerfile
+++ b/ci/Dockerfile
@@ -17,7 +17,7 @@
 #
 #----------------------------------------------------------
 
-FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:8-u18.04 as xapp-base
+FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:9-u18.04 as xapp-base
 RUN apt-get update -y \
     &&apt-get install -y \
     apt-utils \
diff --git a/pkg/xapp/metrics.go b/pkg/xapp/metrics.go
index aa4b0f7..c943db6 100644
--- a/pkg/xapp/metrics.go
+++ b/pkg/xapp/metrics.go
@@ -42,6 +42,18 @@
 	Gauges   map[string]Gauge
 }
 
+func (met *MetricGroupsCache) CInc(metric string) {
+	met.Counters[metric].Inc()
+}
+
+func (met *MetricGroupsCache) CAdd(metric string, val float64) {
+	met.Counters[metric].Add(val)
+}
+
+func (met *MetricGroupsCache) GSet(metric string, val float64) {
+	met.Gauges[metric].Set(val)
+}
+
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go
index 90bc64b..9b91e1a 100755
--- a/pkg/xapp/rmr.go
+++ b/pkg/xapp/rmr.go
@@ -64,6 +64,8 @@
 import "C"
 
 import (
+	"bytes"
+	"crypto/md5"
 	"fmt"
 	"github.com/spf13/viper"
 	"strings"
@@ -98,6 +100,9 @@
 	C.RMR_ERR_NOTSUPP:    "the request is not supported, or RMr was not initialized for the request",
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type RMRParams struct {
 	Mtype      int
 	Payload    []byte
@@ -113,6 +118,15 @@
 	status     int
 }
 
+func (params *RMRParams) String() string {
+	var b bytes.Buffer
+	fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
+	return b.String()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
 	p := C.CString(protPort)
 	m := C.int(maxSize)
@@ -268,6 +282,25 @@
 	return m.Send(params, true)
 }
 
+func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
+	status := m.Send(params, isRts)
+	i := 0
+	for ; i < int(to)*2 && status == false; i++ {
+		status = m.Send(params, isRts)
+		if status == false {
+			time.Sleep(500 * time.Millisecond)
+		}
+	}
+	if status == false {
+		err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
+		if params.Mbuf != nil {
+			m.Free(params.Mbuf)
+			params.Mbuf = nil
+		}
+	}
+	return
+}
+
 func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
 	if params.Mbuf != nil {
 		m.Free(params.Mbuf)
diff --git a/pkg/xapp/rmrendpoint.go b/pkg/xapp/rmrendpoint.go
new file mode 100644
index 0000000..cddcc4a
--- /dev/null
+++ b/pkg/xapp/rmrendpoint.go
@@ -0,0 +1,75 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+	"strconv"
+	"strings"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type RmrEndpoint struct {
+	Addr string // xapp addr
+	Port uint16 // xapp port
+}
+
+func (endpoint RmrEndpoint) String() string {
+	return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10)
+}
+
+func (endpoint *RmrEndpoint) Equal(ep *RmrEndpoint) bool {
+	if (endpoint.Addr == ep.Addr) &&
+		(endpoint.Port == ep.Port) {
+		return true
+	}
+	return false
+}
+
+func (endpoint *RmrEndpoint) GetAddr() string {
+	return endpoint.Addr
+}
+
+func (endpoint *RmrEndpoint) GetPort() uint16 {
+	return endpoint.Port
+}
+
+func (endpoint *RmrEndpoint) Set(src string) bool {
+	elems := strings.Split(src, ":")
+	if len(elems) == 2 {
+		srcAddr := elems[0]
+		srcPort, err := strconv.ParseUint(elems[1], 10, 16)
+		if err == nil {
+			endpoint.Addr = srcAddr
+			endpoint.Port = uint16(srcPort)
+			return true
+		}
+	}
+	return false
+}
+
+func NewRmrEndpoint(src string) *RmrEndpoint {
+	ep := &RmrEndpoint{}
+	if ep.Set(src) == false {
+		return nil
+	}
+	return ep
+}
diff --git a/pkg/xapp/rmrendpoint_test.go b/pkg/xapp/rmrendpoint_test.go
new file mode 100644
index 0000000..bea0f0b
--- /dev/null
+++ b/pkg/xapp/rmrendpoint_test.go
@@ -0,0 +1,54 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+	"testing"
+)
+
+func TestRmrEndpoint(t *testing.T) {
+	Logger.Info("CASE: TestRmrEndpoint")
+
+	testEp := func(t *testing.T, val string, expect *RmrEndpoint) {
+		res := NewRmrEndpoint(val)
+
+		if expect == nil && res == nil {
+			return
+		}
+		if res == nil {
+			t.Errorf("Endpoint elems for value %s expected addr %s port %d got nil", val, expect.GetAddr(), expect.GetPort())
+			return
+		}
+		if expect.GetAddr() != res.GetAddr() || expect.GetPort() != res.GetPort() {
+			t.Errorf("Endpoint elems for value %s expected addr %s port %d got addr %s port %d", val, expect.GetAddr(), expect.GetPort(), res.GetAddr(), res.GetPort())
+		}
+		if expect.String() != res.String() {
+			t.Errorf("Endpoint string for value %s expected %s got %s", val, expect.String(), res.String())
+		}
+
+	}
+
+	testEp(t, "localhost:8080", &RmrEndpoint{"localhost", 8080})
+	testEp(t, "127.0.0.1:8080", &RmrEndpoint{"127.0.0.1", 8080})
+	testEp(t, "localhost:70000", nil)
+	testEp(t, "localhost?8080", nil)
+	testEp(t, "abcdefghijklmnopqrstuvwxyz", nil)
+	testEp(t, "", nil)
+}
diff --git a/pkg/xapp/rmrendpointlist.go b/pkg/xapp/rmrendpointlist.go
new file mode 100644
index 0000000..2afc004
--- /dev/null
+++ b/pkg/xapp/rmrendpointlist.go
@@ -0,0 +1,94 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+	"strings"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type RmrEndpointList struct {
+	Endpoints []RmrEndpoint
+}
+
+func (eplist *RmrEndpointList) String() string {
+	valuesText := eplist.StringList()
+	return strings.Join(valuesText, ",")
+}
+
+func (eplist *RmrEndpointList) StringList() []string {
+	tmpList := eplist.Endpoints
+	valuesText := []string{}
+	for i := range tmpList {
+		valuesText = append(valuesText, tmpList[i].String())
+	}
+	return valuesText
+}
+
+func (eplist *RmrEndpointList) Size() int {
+	return len(eplist.Endpoints)
+}
+
+func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool {
+	for i := range eplist.Endpoints {
+		if eplist.Endpoints[i].Equal(ep) {
+			return false
+		}
+	}
+	eplist.Endpoints = append(eplist.Endpoints, *ep)
+	return true
+}
+
+func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool {
+	for i := range eplist.Endpoints {
+		if eplist.Endpoints[i].Equal(ep) {
+			eplist.Endpoints[i] = eplist.Endpoints[len(eplist.Endpoints)-1]
+			eplist.Endpoints[len(eplist.Endpoints)-1] = RmrEndpoint{"", 0}
+			eplist.Endpoints = eplist.Endpoints[:len(eplist.Endpoints)-1]
+			return true
+		}
+	}
+	return false
+}
+
+func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool {
+	var retval bool = false
+	for i := range otheplist.Endpoints {
+		if eplist.DelEndpoint(&otheplist.Endpoints[i]) {
+			retval = true
+		}
+	}
+	return retval
+}
+
+func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool {
+	for i := range eplist.Endpoints {
+		if eplist.Endpoints[i].Equal(ep) {
+			return true
+		}
+	}
+	return false
+}
+
+func NewRmrEndpointList() *RmrEndpointList {
+	return &RmrEndpointList{}
+}
diff --git a/pkg/xapp/rmrendpointlist_test.go b/pkg/xapp/rmrendpointlist_test.go
new file mode 100644
index 0000000..cbef96e
--- /dev/null
+++ b/pkg/xapp/rmrendpointlist_test.go
@@ -0,0 +1,95 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+	"testing"
+)
+
+func TestRmrEndpointList(t *testing.T) {
+	Logger.Info("CASE: TestRmrEndpointList")
+
+	epl := &RmrEndpointList{}
+
+	// Simple add / has / delete
+	if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+		t.Errorf("RmrEndpointList: 8080 add failed")
+	}
+	if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == true {
+		t.Errorf("RmrEndpointList: 8080 duplicate add success")
+	}
+	if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+		t.Errorf("RmrEndpointList: 8081 add failed")
+	}
+	if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+		t.Errorf("RmrEndpointList: 8081 has failed")
+	}
+	if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+		t.Errorf("RmrEndpointList: 8081 del failed")
+	}
+	if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+		t.Errorf("RmrEndpointList: 8081 has non existing success")
+	}
+	if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+		t.Errorf("RmrEndpointList: 8081 del non existing success")
+	}
+	if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+		t.Errorf("RmrEndpointList: 8080 del failed")
+	}
+
+	// list delete
+	if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+		t.Errorf("RmrEndpointList: 8080 add failed")
+	}
+	if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+		t.Errorf("RmrEndpointList: 8081 add failed")
+	}
+	if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+		t.Errorf("RmrEndpointList: 8082 add failed")
+	}
+
+	epl2 := &RmrEndpointList{}
+	if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:9080")) == false {
+		t.Errorf("RmrEndpointList: othlist add 9080 failed")
+	}
+
+	if epl.DelEndpoints(epl2) == true {
+		t.Errorf("RmrEndpointList: delete list not existing successs")
+	}
+
+	if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+		t.Errorf("RmrEndpointList: othlist add 8080 failed")
+	}
+	if epl.DelEndpoints(epl2) == false {
+		t.Errorf("RmrEndpointList: delete list 8080,9080 failed")
+	}
+
+	if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+		t.Errorf("RmrEndpointList: othlist add 8081 failed")
+	}
+	if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+		t.Errorf("RmrEndpointList: othlist add 8082 failed")
+	}
+
+	if epl.DelEndpoints(epl2) == false {
+		t.Errorf("RmrEndpointList: delete list 8080,8081,8082,9080 failed")
+	}
+
+}
diff --git a/pkg/xapp/types.go b/pkg/xapp/types.go
index f3c47eb..571dcba 100755
--- a/pkg/xapp/types.go
+++ b/pkg/xapp/types.go
@@ -26,6 +26,9 @@
 // To be removed ...
 type RMRStatistics struct{}
 
+//
+//
+//
 type RMRClient struct {
 	protPort      string
 	contextMux    sync.Mutex
@@ -39,12 +42,46 @@
 	readyCbParams interface{}
 }
 
+//
+//
+//
 type RMRMeid struct {
 	PlmnID  string
 	EnbID   string
 	RanName string
 }
 
+func (meid *RMRMeid) String() string {
+	str := "meid("
+	pad := ""
+	if len(meid.PlmnID) > 0 {
+		str += pad + "PlmnID=" + meid.PlmnID
+		pad = " "
+	}
+	if len(meid.EnbID) > 0 {
+		str += pad + "EnbID=" + meid.EnbID
+		pad = " "
+	}
+	if len(meid.RanName) > 0 {
+		str += pad + "RanName=" + meid.RanName
+		pad = " "
+	}
+	str += ")"
+	return str
+}
+
+//
+//
+//
+type MessageConsumerFunc func(*RMRParams) error
+
+func (fn MessageConsumerFunc) Consume(params *RMRParams) error {
+	return fn(params)
+}
+
+//
+//
+//
 type MessageConsumer interface {
 	Consume(params *RMRParams) error
 }