General capabilities for xapps
- rmrendpoint helper structs
- rmr SendRetry to retry sending for given time (seconds).
Tries every 500ms until given timeout is reached or message was successfully sent
- few String() functions
Change-Id: I6bb163d85bb35a5375cd0b053d8968ebb0042bc3
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/ci/Dockerfile b/ci/Dockerfile
index 01e768d..a37b995 100755
--- a/ci/Dockerfile
+++ b/ci/Dockerfile
@@ -17,7 +17,7 @@
#
#----------------------------------------------------------
-FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:8-u18.04 as xapp-base
+FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:9-u18.04 as xapp-base
RUN apt-get update -y \
&&apt-get install -y \
apt-utils \
diff --git a/pkg/xapp/metrics.go b/pkg/xapp/metrics.go
index aa4b0f7..c943db6 100644
--- a/pkg/xapp/metrics.go
+++ b/pkg/xapp/metrics.go
@@ -42,6 +42,18 @@
Gauges map[string]Gauge
}
+func (met *MetricGroupsCache) CInc(metric string) {
+ met.Counters[metric].Inc()
+}
+
+func (met *MetricGroupsCache) CAdd(metric string, val float64) {
+ met.Counters[metric].Add(val)
+}
+
+func (met *MetricGroupsCache) GSet(metric string, val float64) {
+ met.Gauges[metric].Set(val)
+}
+
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
diff --git a/pkg/xapp/rmr.go b/pkg/xapp/rmr.go
index 90bc64b..9b91e1a 100755
--- a/pkg/xapp/rmr.go
+++ b/pkg/xapp/rmr.go
@@ -64,6 +64,8 @@
import "C"
import (
+ "bytes"
+ "crypto/md5"
"fmt"
"github.com/spf13/viper"
"strings"
@@ -98,6 +100,9 @@
C.RMR_ERR_NOTSUPP: "the request is not supported, or RMr was not initialized for the request",
}
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
type RMRParams struct {
Mtype int
Payload []byte
@@ -113,6 +118,15 @@
status int
}
+func (params *RMRParams) String() string {
+ var b bytes.Buffer
+ fmt.Fprintf(&b, "params(Src=%s Mtype=%d SubId=%d Xid=%s Meid=%s Paylens=%d/%d Paymd5=%x)", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid, params.PayloadLen, len(params.Payload), md5.Sum(params.Payload))
+ return b.String()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
func NewRMRClientWithParams(protPort string, maxSize int, threadType int, statDesc string) *RMRClient {
p := C.CString(protPort)
m := C.int(maxSize)
@@ -268,6 +282,25 @@
return m.Send(params, true)
}
+func (m *RMRClient) SendWithRetry(params *RMRParams, isRts bool, to time.Duration) (err error) {
+ status := m.Send(params, isRts)
+ i := 0
+ for ; i < int(to)*2 && status == false; i++ {
+ status = m.Send(params, isRts)
+ if status == false {
+ time.Sleep(500 * time.Millisecond)
+ }
+ }
+ if status == false {
+ err = fmt.Errorf("Failed with retries(%d) %s", i, params.String())
+ if params.Mbuf != nil {
+ m.Free(params.Mbuf)
+ params.Mbuf = nil
+ }
+ }
+ return
+}
+
func (m *RMRClient) CopyBuffer(params *RMRParams) *C.rmr_mbuf_t {
if params.Mbuf != nil {
m.Free(params.Mbuf)
diff --git a/pkg/xapp/rmrendpoint.go b/pkg/xapp/rmrendpoint.go
new file mode 100644
index 0000000..cddcc4a
--- /dev/null
+++ b/pkg/xapp/rmrendpoint.go
@@ -0,0 +1,75 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+ "strconv"
+ "strings"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type RmrEndpoint struct {
+ Addr string // xapp addr
+ Port uint16 // xapp port
+}
+
+func (endpoint RmrEndpoint) String() string {
+ return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10)
+}
+
+func (endpoint *RmrEndpoint) Equal(ep *RmrEndpoint) bool {
+ if (endpoint.Addr == ep.Addr) &&
+ (endpoint.Port == ep.Port) {
+ return true
+ }
+ return false
+}
+
+func (endpoint *RmrEndpoint) GetAddr() string {
+ return endpoint.Addr
+}
+
+func (endpoint *RmrEndpoint) GetPort() uint16 {
+ return endpoint.Port
+}
+
+func (endpoint *RmrEndpoint) Set(src string) bool {
+ elems := strings.Split(src, ":")
+ if len(elems) == 2 {
+ srcAddr := elems[0]
+ srcPort, err := strconv.ParseUint(elems[1], 10, 16)
+ if err == nil {
+ endpoint.Addr = srcAddr
+ endpoint.Port = uint16(srcPort)
+ return true
+ }
+ }
+ return false
+}
+
+func NewRmrEndpoint(src string) *RmrEndpoint {
+ ep := &RmrEndpoint{}
+ if ep.Set(src) == false {
+ return nil
+ }
+ return ep
+}
diff --git a/pkg/xapp/rmrendpoint_test.go b/pkg/xapp/rmrendpoint_test.go
new file mode 100644
index 0000000..bea0f0b
--- /dev/null
+++ b/pkg/xapp/rmrendpoint_test.go
@@ -0,0 +1,54 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+ "testing"
+)
+
+func TestRmrEndpoint(t *testing.T) {
+ Logger.Info("CASE: TestRmrEndpoint")
+
+ testEp := func(t *testing.T, val string, expect *RmrEndpoint) {
+ res := NewRmrEndpoint(val)
+
+ if expect == nil && res == nil {
+ return
+ }
+ if res == nil {
+ t.Errorf("Endpoint elems for value %s expected addr %s port %d got nil", val, expect.GetAddr(), expect.GetPort())
+ return
+ }
+ if expect.GetAddr() != res.GetAddr() || expect.GetPort() != res.GetPort() {
+ t.Errorf("Endpoint elems for value %s expected addr %s port %d got addr %s port %d", val, expect.GetAddr(), expect.GetPort(), res.GetAddr(), res.GetPort())
+ }
+ if expect.String() != res.String() {
+ t.Errorf("Endpoint string for value %s expected %s got %s", val, expect.String(), res.String())
+ }
+
+ }
+
+ testEp(t, "localhost:8080", &RmrEndpoint{"localhost", 8080})
+ testEp(t, "127.0.0.1:8080", &RmrEndpoint{"127.0.0.1", 8080})
+ testEp(t, "localhost:70000", nil)
+ testEp(t, "localhost?8080", nil)
+ testEp(t, "abcdefghijklmnopqrstuvwxyz", nil)
+ testEp(t, "", nil)
+}
diff --git a/pkg/xapp/rmrendpointlist.go b/pkg/xapp/rmrendpointlist.go
new file mode 100644
index 0000000..2afc004
--- /dev/null
+++ b/pkg/xapp/rmrendpointlist.go
@@ -0,0 +1,94 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+ "strings"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type RmrEndpointList struct {
+ Endpoints []RmrEndpoint
+}
+
+func (eplist *RmrEndpointList) String() string {
+ valuesText := eplist.StringList()
+ return strings.Join(valuesText, ",")
+}
+
+func (eplist *RmrEndpointList) StringList() []string {
+ tmpList := eplist.Endpoints
+ valuesText := []string{}
+ for i := range tmpList {
+ valuesText = append(valuesText, tmpList[i].String())
+ }
+ return valuesText
+}
+
+func (eplist *RmrEndpointList) Size() int {
+ return len(eplist.Endpoints)
+}
+
+func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool {
+ for i := range eplist.Endpoints {
+ if eplist.Endpoints[i].Equal(ep) {
+ return false
+ }
+ }
+ eplist.Endpoints = append(eplist.Endpoints, *ep)
+ return true
+}
+
+func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool {
+ for i := range eplist.Endpoints {
+ if eplist.Endpoints[i].Equal(ep) {
+ eplist.Endpoints[i] = eplist.Endpoints[len(eplist.Endpoints)-1]
+ eplist.Endpoints[len(eplist.Endpoints)-1] = RmrEndpoint{"", 0}
+ eplist.Endpoints = eplist.Endpoints[:len(eplist.Endpoints)-1]
+ return true
+ }
+ }
+ return false
+}
+
+func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool {
+ var retval bool = false
+ for i := range otheplist.Endpoints {
+ if eplist.DelEndpoint(&otheplist.Endpoints[i]) {
+ retval = true
+ }
+ }
+ return retval
+}
+
+func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool {
+ for i := range eplist.Endpoints {
+ if eplist.Endpoints[i].Equal(ep) {
+ return true
+ }
+ }
+ return false
+}
+
+func NewRmrEndpointList() *RmrEndpointList {
+ return &RmrEndpointList{}
+}
diff --git a/pkg/xapp/rmrendpointlist_test.go b/pkg/xapp/rmrendpointlist_test.go
new file mode 100644
index 0000000..cbef96e
--- /dev/null
+++ b/pkg/xapp/rmrendpointlist_test.go
@@ -0,0 +1,95 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package xapp
+
+import (
+ "testing"
+)
+
+func TestRmrEndpointList(t *testing.T) {
+ Logger.Info("CASE: TestRmrEndpointList")
+
+ epl := &RmrEndpointList{}
+
+ // Simple add / has / delete
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+ t.Errorf("RmrEndpointList: 8080 add failed")
+ }
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == true {
+ t.Errorf("RmrEndpointList: 8080 duplicate add success")
+ }
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ t.Errorf("RmrEndpointList: 8081 add failed")
+ }
+ if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ t.Errorf("RmrEndpointList: 8081 has failed")
+ }
+ if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ t.Errorf("RmrEndpointList: 8081 del failed")
+ }
+ if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+ t.Errorf("RmrEndpointList: 8081 has non existing success")
+ }
+ if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+ t.Errorf("RmrEndpointList: 8081 del non existing success")
+ }
+ if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+ t.Errorf("RmrEndpointList: 8080 del failed")
+ }
+
+ // list delete
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+ t.Errorf("RmrEndpointList: 8080 add failed")
+ }
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ t.Errorf("RmrEndpointList: 8081 add failed")
+ }
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+ t.Errorf("RmrEndpointList: 8082 add failed")
+ }
+
+ epl2 := &RmrEndpointList{}
+ if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:9080")) == false {
+ t.Errorf("RmrEndpointList: othlist add 9080 failed")
+ }
+
+ if epl.DelEndpoints(epl2) == true {
+ t.Errorf("RmrEndpointList: delete list not existing successs")
+ }
+
+ if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+ t.Errorf("RmrEndpointList: othlist add 8080 failed")
+ }
+ if epl.DelEndpoints(epl2) == false {
+ t.Errorf("RmrEndpointList: delete list 8080,9080 failed")
+ }
+
+ if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ t.Errorf("RmrEndpointList: othlist add 8081 failed")
+ }
+ if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+ t.Errorf("RmrEndpointList: othlist add 8082 failed")
+ }
+
+ if epl.DelEndpoints(epl2) == false {
+ t.Errorf("RmrEndpointList: delete list 8080,8081,8082,9080 failed")
+ }
+
+}
diff --git a/pkg/xapp/types.go b/pkg/xapp/types.go
index f3c47eb..571dcba 100755
--- a/pkg/xapp/types.go
+++ b/pkg/xapp/types.go
@@ -26,6 +26,9 @@
// To be removed ...
type RMRStatistics struct{}
+//
+//
+//
type RMRClient struct {
protPort string
contextMux sync.Mutex
@@ -39,12 +42,46 @@
readyCbParams interface{}
}
+//
+//
+//
type RMRMeid struct {
PlmnID string
EnbID string
RanName string
}
+func (meid *RMRMeid) String() string {
+ str := "meid("
+ pad := ""
+ if len(meid.PlmnID) > 0 {
+ str += pad + "PlmnID=" + meid.PlmnID
+ pad = " "
+ }
+ if len(meid.EnbID) > 0 {
+ str += pad + "EnbID=" + meid.EnbID
+ pad = " "
+ }
+ if len(meid.RanName) > 0 {
+ str += pad + "RanName=" + meid.RanName
+ pad = " "
+ }
+ str += ")"
+ return str
+}
+
+//
+//
+//
+type MessageConsumerFunc func(*RMRParams) error
+
+func (fn MessageConsumerFunc) Consume(params *RMRParams) error {
+ return fn(params)
+}
+
+//
+//
+//
type MessageConsumer interface {
Consume(params *RMRParams) error
}