Status handler for DMaaP mediator producer
Issue-ID: NONRTRIC-595
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Ia97075ba6532c040659e7781ee6e753511d7b91a
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index a3e3a11..764e89c 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -25,10 +25,12 @@
)
type Config struct {
- LogLevel string
- InfoJobCallbackUrl string
- InfoCoordinatorAddress string
- InfoProducerSupervisionCallbackUrl string
+ LogLevel string
+ InfoProducerSupervisionCallbackHost string
+ InfoProducerSupervisionCallbackPort string
+ InfoJobCallbackHost string
+ InfoJobCallbackPort string
+ InfoCoordinatorAddress string
}
type ProducerRegistrationInfo struct {
@@ -39,10 +41,12 @@
func New() *Config {
return &Config{
- LogLevel: getEnv("LOG_LEVEL", "Info"),
- InfoJobCallbackUrl: getEnv("INFO_JOB_CALLBACK_URL", ""),
- InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
- InfoProducerSupervisionCallbackUrl: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", ""),
+ LogLevel: getEnv("LOG_LEVEL", "Info"),
+ InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""),
+ InfoProducerSupervisionCallbackPort: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8085"),
+ InfoJobCallbackHost: getEnv("INFO_JOB_CALLBACK_HOST", ""),
+ InfoJobCallbackPort: getEnv("INFO_JOB_CALLBACK_PORT", "8086"),
+ InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
}
}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index 88ba1d8..6b02e42 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -28,15 +28,19 @@
func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
os.Setenv("LOG_LEVEL", "Debug")
- os.Setenv("INFO_JOB_CALLBACK_URL", "jobCallbackUrl")
+ os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost")
+ os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "supervisionCallbackPort")
+ os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
+ os.Setenv("INFO_JOB_CALLBACK_PORT", "jobCallbackPort")
os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
- os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", "supervisionCallbackUrl")
defer os.Clearenv()
wantConfig := Config{
- LogLevel: "Debug",
- InfoJobCallbackUrl: "jobCallbackUrl",
- InfoCoordinatorAddress: "infoCoordAddr",
- InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
+ LogLevel: "Debug",
+ InfoProducerSupervisionCallbackHost: "supervisionCallbackHost",
+ InfoProducerSupervisionCallbackPort: "supervisionCallbackPort",
+ InfoJobCallbackHost: "jobCallbackHost",
+ InfoJobCallbackPort: "jobCallbackPort",
+ InfoCoordinatorAddress: "infoCoordAddr",
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -45,10 +49,12 @@
func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
wantConfig := Config{
- LogLevel: "Info",
- InfoJobCallbackUrl: "",
- InfoCoordinatorAddress: "http://enrichmentservice:8083",
- InfoProducerSupervisionCallbackUrl: "",
+ LogLevel: "Info",
+ InfoProducerSupervisionCallbackHost: "",
+ InfoProducerSupervisionCallbackPort: "8085",
+ InfoJobCallbackHost: "",
+ InfoJobCallbackPort: "8086",
+ InfoCoordinatorAddress: "http://enrichmentservice:8083",
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go
new file mode 100644
index 0000000..ca30d73
--- /dev/null
+++ b/dmaap-mediator-producer/internal/server/server.go
@@ -0,0 +1,40 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package server
+
+import (
+ "fmt"
+ "net/http"
+)
+
+func StatusHandler(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/" {
+ http.Error(w, "404 not found.", http.StatusNotFound)
+ return
+ }
+
+ if r.Method != "GET" {
+ http.Error(w, "Method is not supported.", http.StatusNotFound)
+ return
+ }
+
+ fmt.Fprintf(w, "All is well!")
+}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
new file mode 100644
index 0000000..9213216
--- /dev/null
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -0,0 +1,90 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package server
+
+import (
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestStatusHandler(t *testing.T) {
+ assertions := require.New(t)
+ type args struct {
+ responseRecorder *httptest.ResponseRecorder
+ r *http.Request
+ }
+ tests := []struct {
+ name string
+ args args
+ wantedStatus int
+ wantedBody string
+ }{
+ {
+ name: "StatusHandler with correct path and method, should return OK",
+ args: args{
+ responseRecorder: httptest.NewRecorder(),
+ r: newRequest("GET", "/", nil, t),
+ },
+ wantedStatus: http.StatusOK,
+ wantedBody: "All is well!",
+ },
+ {
+ name: "StatusHandler with incorrect path, should return NotFound",
+ args: args{
+ responseRecorder: httptest.NewRecorder(),
+ r: newRequest("GET", "/wrong", nil, t),
+ },
+ wantedStatus: http.StatusNotFound,
+ wantedBody: "404 not found.\n",
+ },
+ {
+ name: "StatusHandler with incorrect method, should return NotFound",
+ args: args{
+ responseRecorder: httptest.NewRecorder(),
+ r: newRequest("PUT", "/", nil, t),
+ },
+ wantedStatus: http.StatusNotFound,
+ wantedBody: "Method is not supported.\n",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ handler := http.HandlerFunc(StatusHandler)
+ handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
+ assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code)
+
+ assertions.Equal(tt.wantedBody, tt.args.responseRecorder.Body.String())
+ })
+ }
+}
+
+func newRequest(method string, url string, body io.Reader, t *testing.T) *http.Request {
+ if req, err := http.NewRequest(method, url, body); err == nil {
+ return req
+ } else {
+ t.Fatalf("Could not create request due to: %v", err)
+ return nil
+ }
+}
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index d38496f..b357f69 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -21,14 +21,18 @@
package main
import (
- "time"
+ "fmt"
+ "net/http"
log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
)
var configuration *config.Config
+var supervisionCallbackAddress string
+var jobInfoCallbackAddress string
func init() {
configuration = config.New()
@@ -39,12 +43,15 @@
}
log.Debug("Initializing DMaaP Mediator Producer")
- if configuration.InfoJobCallbackUrl == "" {
- log.Fatal("Missing INFO_JOB_CALLBACK_URL")
+ if configuration.InfoProducerSupervisionCallbackHost == "" {
+ log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST")
}
- if configuration.InfoProducerSupervisionCallbackUrl == "" {
- log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_URL")
+ supervisionCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerSupervisionCallbackHost, configuration.InfoProducerSupervisionCallbackPort)
+
+ if configuration.InfoJobCallbackHost == "" {
+ log.Fatal("Missing INFO_JOB_CALLBACK_HOST")
}
+ jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort)
registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
if types, err := jobtypes.GetTypes(); err == nil {
@@ -55,9 +62,9 @@
log.Fatalf("Unable to get types to register due to: %v", err)
}
producer := config.ProducerRegistrationInfo{
- InfoProducerSupervisionCallbackUrl: configuration.InfoProducerSupervisionCallbackUrl,
+ InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress,
SupportedInfoTypes: jobtypes.GetSupportedTypes(),
- InfoJobCallbackUrl: configuration.InfoJobCallbackUrl,
+ InfoJobCallbackUrl: jobInfoCallbackAddress,
}
if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
log.Fatalf("Unable to register producer due to: %v", err)
@@ -66,5 +73,11 @@
func main() {
log.Debug("Starting DMaaP Mediator Producer")
- time.Sleep(1000 * time.Millisecond)
+ log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
+ http.HandleFunc("/", server.StatusHandler)
+
+ if err := http.ListenAndServe(":"+configuration.InfoProducerSupervisionCallbackPort, nil); err != nil {
+ log.Fatal(err)
+ }
+ log.Debug("Stopping DMaaP Mediator Producer")
}