Added kafka interface stub
Simplification of http proxy setup
Issue-ID: NONRTRIC-618
Signed-off-by: BjornMagnussonXA <bjorn.magnusson@est.tech>
Change-Id: I51ae1f81ba966f7fa570feac8953d8b14b8b2ab5
diff --git a/test/kafka-procon/.gitignore b/test/kafka-procon/.gitignore
new file mode 100644
index 0000000..6703e3c
--- /dev/null
+++ b/test/kafka-procon/.gitignore
@@ -0,0 +1,4 @@
+.tmp.json
+.dockererr
+.env
+.payload
diff --git a/test/kafka-procon/Dockerfile b/test/kafka-procon/Dockerfile
new file mode 100644
index 0000000..97a09cb
--- /dev/null
+++ b/test/kafka-procon/Dockerfile
@@ -0,0 +1,43 @@
+#==================================================================================
+# Copyright (C) 2021: Nordix Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#==================================================================================
+
+ARG NEXUS_PROXY_REPO
+
+##
+## Build
+##
+
+FROM ${NEXUS_PROXY_REPO}golang:1.17-bullseye AS build
+WORKDIR /app
+COPY go.mod .
+COPY go.sum .
+RUN go mod download
+COPY main.go .
+RUN go build -o /kafkaprocon
+
+##
+## Deploy
+##
+
+FROM gcr.io/distroless/base-debian11
+WORKDIR /
+## Copy from "build" stage
+COPY --from=build /kafkaprocon .
+USER nonroot:nonroot
+ENTRYPOINT ["/kafkaprocon"]
\ No newline at end of file
diff --git a/test/kafka-procon/basic_test.sh b/test/kafka-procon/basic_test.sh
new file mode 100755
index 0000000..f3a602e
--- /dev/null
+++ b/test/kafka-procon/basic_test.sh
@@ -0,0 +1,632 @@
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# Automated test script for Kafka procon container
+
+# NOTE: Need a running instance of kafka
+
+
+export PORT=8096
+export HTTPX="http"
+export REQ_CONTENT=""
+export RESP_CONTENT="text/plain"
+
+# source function to do curl and check result
+. ../common/do_curl_function.sh
+
+echo "Requires a running kafka"
+
+payload=".payload"
+
+echo "=== hello world ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="OK"
+do_curl GET / 200
+
+echo "=== reset ==="
+REQ_CONTENT=""
+RESP_CONTENT=""
+RESULT="*"
+do_curl POST /reset 200
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[]"
+do_curl GET /topics 200
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/received 200
+
+echo "=== get topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic 404
+
+echo "=== get topic counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/counters/sent 404
+
+echo "=== get topic counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/counters/received 404
+
+echo "=== create a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT=""
+RESULT="*"
+do_curl PUT /topics/test-topic 405
+
+echo "=== start to send on a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/startsend 404
+
+echo "=== start to receive from a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/startreceive 404
+
+echo "=== send a msg on a topic ==="
+echo "TEST1" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/msg 404 $payload
+
+echo "=== receive a msg from a topic ==="
+echo "TEST1" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/msg 404 $payload
+
+echo "=== stop to send on a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/stopsend 404
+
+echo "=== stop to receive from a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/stopreceive 404
+
+# Create 4 topics
+
+echo "=== create topic1 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic1?type=text/plain 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\"]"
+do_curl GET /topics 200
+
+echo "=== create topic2 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic2?type=text/plain 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\"]"
+do_curl GET /topics 200
+
+echo "=== create topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic3?type=application/json 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\",\"topic3\"]"
+do_curl GET /topics 200
+
+echo "=== create topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic4?type=application/json 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\",\"topic3\",\"topic4\"]"
+do_curl GET /topics 200
+
+echo "=== get topic1 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="text/plain"
+do_curl GET /topics/topic1 200
+
+echo "=== get topic2 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="text/plain"
+do_curl GET /topics/topic2 200
+
+echo "=== get topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="application/json"
+do_curl GET /topics/topic3 200
+
+echo "=== get topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="application/json"
+do_curl GET /topics/topic4 200
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 400 $payload
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic1/msg 400
+
+echo "=== send a msg on topic2 ==="
+echo "TEST22" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/msg 400 $payload
+
+echo "=== receive a msg from topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic2/msg 400
+
+
+
+echo "=== send a msg on topic3 ==="
+echo "{\"test\":\"33\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/msg 400 $payload
+
+echo "=== receive a msg from topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic3/msg 400
+
+echo "=== send a msg on topic4 ==="
+echo "{\"test\":\"44\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/msg 400 $payload
+
+echo "=== receive a msg from topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic2/msg 400
+
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/received 200
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== get topic2 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic2/counters/sent 200
+
+echo "=== get topic2 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic2/counters/received 200
+
+echo "=== get topic3 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic3/counters/sent 200
+
+echo "=== get topic3 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic3/counters/received 200
+
+echo "=== get topic4 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic4/counters/sent 200
+
+echo "=== get topic4 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic4/counters/received 200
+
+# Begins send and receive
+
+echo "=== set topic1 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startsend 200
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 400 $payload
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 200 $payload
+
+echo "sleep 2 to allow sending the msg to kafka"
+sleep 2
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic1/msg 400
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== set topic1 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startreceive 200
+
+echo "sleep 60 to allow kafka to process the msg, unclear why first message takes a long time..."
+sleep 60
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /counters/received 200
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST11"
+do_curl GET /topics/topic1/msg 200
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic1/msg 204
+
+
+echo "=== set topic1 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startsend 200
+
+echo "=== set topic2 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/startsend 200
+
+echo "=== set topic3 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/startsend 200
+
+echo "=== set topic4 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/startsend 200
+
+echo "=== set topic1 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startreceive 200
+
+echo "=== set topic2 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/startreceive 200
+
+echo "=== set topic3 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/startreceive 200
+
+echo "=== set topic4 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/startreceive 200
+
+
+# Send and receive on all topics
+
+echo "=== send a msg on topic1 ==="
+echo "TEST101" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 200 $payload
+
+echo "=== send two msg on topic2 ==="
+echo "TEST201" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/msg 200 $payload
+echo "TEST202" > $payload
+do_curl POST /topics/topic2/msg 200 $payload
+
+echo "=== send three msg on topic3 ==="
+echo "{\"a\":\"msg301\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/msg 200 $payload
+echo "{\"a\":\"msg302\"}" > $payload
+do_curl POST /topics/topic3/msg 200 $payload
+echo "{\"a\":\"msg303\"}" > $payload
+do_curl POST /topics/topic3/msg 200 $payload
+
+
+echo "=== send four msg on topic4 ==="
+echo "{\"a\":\"msg401\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/msg 200 $payload
+echo "{\"a\":\"msg402\"}" > $payload
+do_curl POST /topics/topic4/msg 200 $payload
+echo "{\"a\":\"msg403\"}" > $payload
+do_curl POST /topics/topic4/msg 200 $payload
+echo "{\"a\":\"msg404\"}" > $payload
+do_curl POST /topics/topic4/msg 200 $payload
+
+echo "sleep 10 to allow kafka to process msg"
+sleep 10
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="11"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="11"
+do_curl GET /counters/received 200
+
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic1/counters/received 200
+
+
+echo "=== get topic2 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic2/counters/sent 200
+
+echo "=== get topic2 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic2/counters/received 200
+
+
+echo "=== get topic3 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="3"
+do_curl GET /topics/topic3/counters/sent 200
+
+echo "=== get topic3 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="3"
+do_curl GET /topics/topic3/counters/received 200
+
+
+echo "=== get topic4 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="4"
+do_curl GET /topics/topic4/counters/sent 200
+
+echo "=== get topic4 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="4"
+do_curl GET /topics/topic4/counters/received 200
+
+
+echo "=== get a msg on topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST101"
+do_curl GET /topics/topic1/msg 200
+
+
+echo "=== attempt to receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic1/msg 204
+
+echo "=== get a two msg on topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST201"
+do_curl GET /topics/topic2/msg 200
+RESULT="TEST202"
+do_curl GET /topics/topic2/msg 200
+
+
+echo "=== attempt to receive a msg from topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic2/msg 204
+
+echo "=== get three msg on topic3 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="application/json"
+RESULT="json:{\"a\":\"msg301\"}"
+do_curl GET /topics/topic3/msg 200
+RESULT="json:{\"a\":\"msg302\"}"
+do_curl GET /topics/topic3/msg 200
+RESULT="json:{\"a\":\"msg303\"}"
+do_curl GET /topics/topic3/msg 200
+
+echo "=== attempt to receive a msg from topic3 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic3/msg 204
+
+echo "=== send four msg on topic4 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="application/json"
+RESULT="json:{\"a\":\"msg401\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg402\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg403\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg404\"}"
+do_curl GET /topics/topic4/msg 200
+
+echo "=== attempt to receive a msg from topic4 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic4/msg 204
+
+echo "********************"
+echo "*** All tests ok ***"
+echo "********************"
+
diff --git a/test/kafka-procon/build-and-start.sh b/test/kafka-procon/build-and-start.sh
new file mode 100755
index 0000000..4e4a550
--- /dev/null
+++ b/test/kafka-procon/build-and-start.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+echo "This script requires running kafka instance in a docker private network"
+
+# Script to build and start the container
+if [ $# -ne 2 ]; then
+ echo "usage: ./build-and-start.sh <docker-network> <kafka-boostrapserver-host>:<kafka-boostrapserver-port>"
+ echo "example: ./build-and-start.sh nonrtric-docker-net message-router-kafka:9092"
+ exit 1
+fi
+IMAGE="kafka-procon:latest"
+#Build the image
+docker build -t $IMAGE .
+
+if [ $? -ne 0 ]; then
+ echo "Build failed, exiting..."
+ exit 1
+fi
+
+echo "Starting kafka-procon"
+#Run the container in interactive mode o port 8090.
+docker run --rm -it -p "8090:8090" --network $1 -e KAFKA_BOOTSTRAP_SERVER=$2 --name kafka-procon $IMAGE
+
diff --git a/test/kafka-procon/go.mod b/test/kafka-procon/go.mod
new file mode 100644
index 0000000..31ccc7c
--- /dev/null
+++ b/test/kafka-procon/go.mod
@@ -0,0 +1,9 @@
+module kafkaprocon
+
+go 1.17
+
+require (
+ github.com/confluentinc/confluent-kafka-go v1.7.0 // indirect
+ github.com/enriquebris/goconcurrentqueue v0.6.0 // indirect
+ github.com/gorilla/mux v1.8.0 // indirect
+)
diff --git a/test/kafka-procon/go.sum b/test/kafka-procon/go.sum
new file mode 100644
index 0000000..34a6358
--- /dev/null
+++ b/test/kafka-procon/go.sum
@@ -0,0 +1,6 @@
+github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
+github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
+github.com/enriquebris/goconcurrentqueue v0.6.0 h1:DJ97cgoPVoqlC4tTGBokn/omaB3o16yIs5QdAm6YEjc=
+github.com/enriquebris/goconcurrentqueue v0.6.0/go.mod h1:wGJhQNFI4wLNHleZLo5ehk1puj8M6OIl0tOjs3kwJus=
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
diff --git a/test/kafka-procon/main.go b/test/kafka-procon/main.go
new file mode 100644
index 0000000..6f8bad2
--- /dev/null
+++ b/test/kafka-procon/main.go
@@ -0,0 +1,595 @@
+// Writing a basic HTTP server is easy using the
+// `net/http` package.
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/enriquebris/goconcurrentqueue"
+ "github.com/gorilla/mux"
+)
+
+// Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
+// globalCounters var holding the "global counters"
+// recieved number of received messages from all topics (int)
+// sent number of sent messages to all topics (int)
+// topics var holding all topic related info
+// <topic-name> name of a topic (present after topic is created)
+// content-type data type of the topic (string)
+// counters
+// recieved number of received messages from the topic (int)
+// sent number of sent messages to the topic (int)
+// messages
+// send messages waiting to be sent (set when sending is started) (fifo)
+// received received messages waiting to be fetched (set when reception is started) (fifo)
+
+type counter struct {
+ c uint64
+}
+
+func (c *counter) step() {
+ atomic.AddUint64(&c.c, 1)
+}
+
+func (c counter) get() uint64 {
+ return atomic.LoadUint64(&c.c)
+}
+
+type counters struct {
+ received counter
+ sent counter
+}
+
+func newCounters() counters {
+ return counters{
+ received: counter{},
+ sent: counter{},
+ }
+}
+
+type messages struct {
+ send *goconcurrentqueue.FIFO
+ received *goconcurrentqueue.FIFO
+}
+
+func (m *messages) startSend() bool {
+ if m.send == nil {
+ m.send = goconcurrentqueue.NewFIFO()
+ return true
+ }
+ return false
+}
+
+func (m *messages) stopSend() {
+ m.send = nil
+}
+
+func (m *messages) addToSend(msg string) error {
+ if m.send == nil {
+ return fmt.Errorf("sending not started")
+ }
+ m.send.Lock()
+ defer m.send.Unlock()
+ return m.send.Enqueue(msg)
+}
+
+func (m *messages) getToSend() (interface{}, error) {
+ if m.send == nil {
+ return "", fmt.Errorf("sending not started")
+ }
+ m.send.Lock()
+ defer m.send.Unlock()
+ return m.send.Dequeue()
+}
+
+func (m *messages) startReceive() bool {
+ if m.received == nil {
+ m.received = goconcurrentqueue.NewFIFO()
+ return true
+ }
+ return false
+}
+
+func (m *messages) stopReceive() {
+ m.send = nil
+}
+
+type topic struct {
+ contentType string
+ counters counters
+ messages messages
+}
+
+func newTopic(ct string) *topic {
+ return &topic{
+ contentType: ct,
+ counters: counters{},
+ messages: messages{},
+ }
+}
+
+var globalCounters counters
+var topics map[string]*topic = make(map[string]*topic)
+
+var bootstrapserver = ""
+
+func initApp() {
+ bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
+ if len(bootstrapserver) == 0 {
+ fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
+ fmt.Println("Exiting... ")
+ os.Exit(1)
+ }
+ fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
+}
+
+//Helper function to get a created topic, if it exists
+func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
+ topicId := mux.Vars(req)["topic"]
+ t, exist := topics[topicId]
+ if exist == false {
+ w.WriteHeader(http.StatusNotFound)
+ fmt.Fprintf(w, "Topic %v does not exist", topicId)
+ return nil, "", false
+ }
+ return t, topicId, true
+}
+
+// Alive check
+// GET on /
+func healthCheck(w http.ResponseWriter, req *http.Request) {
+ fmt.Fprintf(w, "OK")
+}
+
+// Deep reset of this interface stub - no removal of msgs or topics in kafka
+// POST on /reset
+func allreset(w http.ResponseWriter, req *http.Request) {
+ for _, v := range topics {
+ v.messages.stopSend()
+ v.messages.stopReceive()
+ }
+ time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
+ globalCounters = newCounters()
+ topics = make(map[string]*topic)
+ fmt.Fprintf(w, "OK")
+}
+
+// Get topics, return json array of strings of topics created by this interface stub
+// Returns json array of strings, array is empty if no topics exist
+// GET on /topics
+
+func getTopics(w http.ResponseWriter, req *http.Request) {
+ topicKeys := make([]string, 0, len(topics))
+ fmt.Printf("len topics: %v\n", len(topics))
+ for k := range topics {
+ topicKeys = append(topicKeys, k)
+ }
+ var j, err = json.Marshal(topicKeys)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
+ return
+ } else {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(j)
+ }
+}
+
+func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
+ w.WriteHeader(httpStatus)
+ w.Header().Set("Content-Type", "text/plain")
+ fmt.Fprintf(w, msg)
+}
+
+// Get a counter value
+// GET /topics/counters/{counter}
+func getCounter(w http.ResponseWriter, req *http.Request) {
+ ctr := mux.Vars(req)["counter"]
+ var ctrvalue = -1
+ if ctr == "received" {
+ ctrvalue = int(globalCounters.received.get())
+ } else if ctr == "sent" {
+ ctrvalue = int(globalCounters.sent.get())
+ }
+
+ if ctrvalue == -1 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Counter %v does not exist", ctr)
+ return
+ }
+ writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
+ return
+
+}
+
+// Create a topic
+// PUT on /topics/<topic>?type=<type> type shall be 'application/json' or 'text/plain'
+func createTopic(w http.ResponseWriter, req *http.Request) {
+ topicId := mux.Vars(req)["topic"]
+ topicType := mux.Vars(req)["type"]
+
+ fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
+
+ if len(topicType) == 0 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Type not specified")
+ return
+ }
+
+ tid, exist := topics[topicId]
+ if exist == true {
+ if tid.contentType != topicType {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
+ return
+ }
+ writeOkRepsonse(w, http.StatusOK, "Topic exist")
+ return
+ }
+
+ t := newTopic(topicType)
+
+ a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+ defer func() { a.Close() }()
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
+ return
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ maxDur := 10 * time.Second
+
+ _, err = a.CreateTopics(
+ ctx,
+ []kafka.TopicSpecification{{
+ Topic: topicId,
+ NumPartitions: 1,
+ ReplicationFactor: 1}},
+ kafka.SetAdminOperationTimeout(maxDur))
+
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
+ return
+ }
+ topics[topicId] = t
+ w.WriteHeader(http.StatusCreated)
+ fmt.Fprintf(w, "Topic created")
+}
+
+// Get topic type
+// GET on /topic/<topic>
+func getTopic(w http.ResponseWriter, req *http.Request) {
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, t.contentType)
+}
+
+// Get a topics counter value
+// GET /topics/{topic}/counters/{counter}
+func getTopicCounter(w http.ResponseWriter, req *http.Request) {
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ ctr := mux.Vars(req)["counter"]
+
+ var ctrvalue = -1
+ if ctr == "received" {
+ ctrvalue = int(t.counters.received.get())
+ } else if ctr == "sent" {
+ ctrvalue = int(t.counters.sent.get())
+ }
+
+ if ctrvalue == -1 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Counter %v does not exist", ctr)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, strconv.Itoa(ctrvalue))
+ return
+}
+
+func startToSend(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+
+ if !t.messages.startSend() {
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Already started sending")
+ return
+ }
+ go func() {
+ p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+ if err != nil {
+ fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
+ return
+ }
+ defer func() { p.Close() }()
+ for {
+ q := t.messages.send
+ if q == nil {
+ return
+ }
+ m, err := q.Get(0)
+ if err == nil {
+ err = p.Produce(&kafka.Message{
+ TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
+ Value: []byte(fmt.Sprintf("%v", m)),
+ }, nil)
+ if err == nil {
+ q.Remove(0)
+ t.counters.sent.step()
+ globalCounters.sent.step()
+ msg := fmt.Sprintf("%v", m)
+ if len(msg) < 500 {
+ fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg)
+ } else {
+ fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg))
+ }
+ } else {
+ fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err)
+ q.Remove(0)
+ }
+ } else {
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+ }()
+
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Sending started")
+}
+
+func startToReceive(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+
+ if !t.messages.startReceive() {
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Already started receiving")
+ return
+ }
+
+ go func() {
+
+ defer func() { t.messages.stopReceive() }()
+
+ groudId := "kafkaprocon"
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "group.id": groudId,
+ "auto.offset.reset": "earliest",
+ "enable.auto.commit": true,
+ "auto.commit.interval.ms": 5000,
+ })
+ if err != nil {
+ fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
+ t.messages.stopReceive()
+ return
+ }
+ c.Commit()
+ defer func() { c.Close() }()
+ for {
+ que := t.messages.received
+ if que == nil {
+ fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
+ return
+ }
+ fmt.Printf("Start subscribing on topic: %v\n", topicId)
+ err = c.SubscribeTopics([]string{topicId}, nil)
+ if err != nil {
+ fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
+ return
+ }
+ maxDur := 1 * time.Second
+ for {
+ msg, err := c.ReadMessage(maxDur)
+ if err == nil {
+ if len(msg.Value) < 500 {
+ fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
+ } else {
+ fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value))
+ }
+ err = t.messages.received.Enqueue(string(msg.Value))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
+ return
+ }
+ t.counters.received.step()
+ globalCounters.received.step()
+ } else {
+ fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
+ }
+ }
+ }
+ }()
+
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Receiving started")
+}
+
+// Post a message to a topic
+// POST /send content type is specified in content type
+func sendToTopic(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ q := t.messages.send
+ if q == nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
+ return
+ }
+ ct := req.Header.Get("Content-Type")
+ if ct != t.contentType {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
+ return
+ }
+
+ if ct == "application/json" {
+ // decoder := json.NewDecoder(req.Body)
+ // var j :=
+ // err := decoder.Decode(&j)
+ // if err != nil {
+ // w.WriteHeader(http.StatusBadRequest)
+ // w.Header().Set("Content-Type", "text/plain")
+ // fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
+ // return
+ // }
+ //m = mux.Vars(req)[""]
+ if err := req.ParseForm(); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
+ return
+ }
+ b, err := ioutil.ReadAll(req.Body)
+ if err == nil {
+ if len(b) < 500 {
+ fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b))
+ } else {
+ fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+ }
+ }
+ err = q.Enqueue(string(b))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Json message to send cannot be put in queue")
+ return
+ }
+ } else if ct == "text/plain" {
+ if err := req.ParseForm(); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
+ return
+ }
+ b, err := ioutil.ReadAll(req.Body)
+ if err == nil {
+ if len(b) < 500 {
+ fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b))
+ } else {
+ fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+ }
+ }
+ err = q.Enqueue(string(b))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Text message to send cannot be put in queue")
+ return
+ }
+ } else {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "text/plain")
+ fmt.Fprintf(w, "Message to send put in queue")
+}
+
+// Get zero or one message from a topic
+// GET /receive
+func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ if t.messages.received == nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
+ return
+ }
+
+ m, err := t.messages.received.Dequeue()
+ if err != nil {
+ w.WriteHeader(http.StatusNoContent)
+ return
+ }
+
+ w.Header().Set("Content-Type", t.contentType)
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "%v", m)
+}
+
+// Remove the send queue to stop sending
+func stopToSend(w http.ResponseWriter, req *http.Request) {
+ fmt.Printf("Stop sending\n")
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ t.messages.stopSend()
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// Remove the receive queue to stop receiving
+func stopToReceive(w http.ResponseWriter, req *http.Request) {
+ fmt.Printf("Stop receiving\n")
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ t.messages.stopReceive()
+ w.WriteHeader(http.StatusNoContent)
+}
+
+func HelloServer(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
+}
+
+func main() {
+
+ initApp()
+
+ r := mux.NewRouter()
+
+ r.HandleFunc("/", healthCheck).Methods("GET")
+ r.HandleFunc("/reset", allreset).Methods("POST")
+ r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
+ r.HandleFunc("/topics", getTopics).Methods("GET")
+ r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
+ r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
+ r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
+ r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
+ r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
+ r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
+ r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
+ r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
+ r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
+
+ port := "8090"
+ srv := &http.Server{
+ Handler: r,
+ Addr: ":" + port,
+ WriteTimeout: 15 * time.Second,
+ ReadTimeout: 15 * time.Second,
+ }
+ fmt.Println("Running on port: " + port)
+ fmt.Printf(srv.ListenAndServe().Error())
+}
diff --git a/test/kafka-procon/start_local.sh b/test/kafka-procon/start_local.sh
new file mode 100755
index 0000000..bfc1a1b
--- /dev/null
+++ b/test/kafka-procon/start_local.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+echo "This script requires golang to be installed and a running kafka instance on (or availble to) localhost"
+
+# Script to build and start app locally
+if [ $# -ne 1 ]; then
+ echo "usage: ./start-local.sh <kafka-boostrapserver-port>"
+ echo "example: ./start-local.sh 30098"
+ exit 1
+fi
+
+export KAFKA_BOOTSTRAP_SERVER=localhost:$1
+
+echo "Starting kafka-procon on local machine"
+go run main.go