Added kafka interface stub

Simplification of http proxy setup

Issue-ID: NONRTRIC-618

Signed-off-by: BjornMagnussonXA <bjorn.magnusson@est.tech>
Change-Id: I51ae1f81ba966f7fa570feac8953d8b14b8b2ab5
diff --git a/test/kafka-procon/.gitignore b/test/kafka-procon/.gitignore
new file mode 100644
index 0000000..6703e3c
--- /dev/null
+++ b/test/kafka-procon/.gitignore
@@ -0,0 +1,4 @@
+.tmp.json
+.dockererr
+.env
+.payload
diff --git a/test/kafka-procon/Dockerfile b/test/kafka-procon/Dockerfile
new file mode 100644
index 0000000..97a09cb
--- /dev/null
+++ b/test/kafka-procon/Dockerfile
@@ -0,0 +1,43 @@
+#==================================================================================
+#   Copyright (C) 2021: Nordix Foundation
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+#   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+#   platform project (RICP).
+#==================================================================================
+
+ARG NEXUS_PROXY_REPO
+
+##
+## Build
+##
+
+FROM ${NEXUS_PROXY_REPO}golang:1.17-bullseye AS build
+WORKDIR /app
+COPY go.mod .
+COPY go.sum .
+RUN go mod download
+COPY main.go .
+RUN go build -o /kafkaprocon
+
+##
+## Deploy
+##
+
+FROM gcr.io/distroless/base-debian11
+WORKDIR /
+## Copy from "build" stage
+COPY --from=build /kafkaprocon .
+USER nonroot:nonroot
+ENTRYPOINT ["/kafkaprocon"]
\ No newline at end of file
diff --git a/test/kafka-procon/basic_test.sh b/test/kafka-procon/basic_test.sh
new file mode 100755
index 0000000..f3a602e
--- /dev/null
+++ b/test/kafka-procon/basic_test.sh
@@ -0,0 +1,632 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2020 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# Automated test script for Kafka procon container
+
+# NOTE: Need a running instance of kafka
+
+
+export PORT=8096
+export HTTPX="http"
+export REQ_CONTENT=""
+export RESP_CONTENT="text/plain"
+
+# source function to do curl and check result
+. ../common/do_curl_function.sh
+
+echo "Requires a running kafka"
+
+payload=".payload"
+
+echo "=== hello world ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="OK"
+do_curl GET / 200
+
+echo "=== reset ==="
+REQ_CONTENT=""
+RESP_CONTENT=""
+RESULT="*"
+do_curl POST /reset 200
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[]"
+do_curl GET /topics 200
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/received 200
+
+echo "=== get topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic 404
+
+echo "=== get topic counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/counters/sent 404
+
+echo "=== get topic counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/counters/received 404
+
+echo "=== create a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT=""
+RESULT="*"
+do_curl PUT /topics/test-topic 405
+
+echo "=== start to send on a topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/startsend 404
+
+echo "=== start to receive from a  topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/startreceive 404
+
+echo "=== send a msg on a  topic ==="
+echo "TEST1" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/msg 404 $payload
+
+echo "=== receive a msg  from a  topic ==="
+echo "TEST1" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/test-topic/msg 404 $payload
+
+echo "=== stop to send on a  topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/stopsend 404
+
+echo "=== stop to receive from a  topic ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/test-topic/stopreceive 404
+
+# Create 4 topics
+
+echo "=== create topic1 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic1?type=text/plain 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\"]"
+do_curl GET /topics 200
+
+echo "=== create topic2 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic2?type=text/plain 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\"]"
+do_curl GET /topics 200
+
+echo "=== create topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic3?type=application/json 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\",\"topic3\"]"
+do_curl GET /topics 200
+
+echo "=== create topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl PUT /topics/topic4?type=application/json 201
+
+echo "=== get topics ==="
+REQ_CONTENT=""
+RESP_CONTENT="application/json"
+RESULT="json:[\"topic1\",\"topic2\",\"topic3\",\"topic4\"]"
+do_curl GET /topics 200
+
+echo "=== get topic1 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="text/plain"
+do_curl GET /topics/topic1 200
+
+echo "=== get topic2 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="text/plain"
+do_curl GET /topics/topic2 200
+
+echo "=== get topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="application/json"
+do_curl GET /topics/topic3 200
+
+echo "=== get topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="application/json"
+do_curl GET /topics/topic4 200
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 400  $payload
+
+echo "=== receive a msg  from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic1/msg 400
+
+echo "=== send a msg on topic2 ==="
+echo "TEST22" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/msg 400 $payload
+
+echo "=== receive a msg  from topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic2/msg 400
+
+
+
+echo "=== send a msg on topic3 ==="
+echo "{\"test\":\"33\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/msg 400 $payload
+
+echo "=== receive a msg  from topic3 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic3/msg 400
+
+echo "=== send a msg on topic4 ==="
+echo "{\"test\":\"44\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/msg 400 $payload
+
+echo "=== receive a msg  from topic4 ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic2/msg 400
+
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /counters/received 200
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== get topic2 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic2/counters/sent 200
+
+echo "=== get topic2 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic2/counters/received 200
+
+echo "=== get topic3 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic3/counters/sent 200
+
+echo "=== get topic3 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic3/counters/received 200
+
+echo "=== get topic4 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic4/counters/sent 200
+
+echo "=== get topic4 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic4/counters/received 200
+
+# Begins send and receive
+
+echo "=== set topic1 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startsend 200
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 400  $payload
+
+echo "=== send a msg on topic1 ==="
+echo "TEST11" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 200  $payload
+
+echo "sleep 2  to allow sending the msg to kafka"
+sleep 2
+
+echo "=== receive a msg  from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl GET /topics/topic1/msg 400
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="0"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== set topic1 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startreceive 200
+
+echo "sleep 60 to allow kafka to process the msg, unclear why first message takes a long time..."
+sleep 60
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /topics/topic1/counters/received 200
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="1"
+do_curl GET /counters/received 200
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST11"
+do_curl GET /topics/topic1/msg 200
+
+echo "=== receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic1/msg 204
+
+
+echo "=== set topic1 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startsend 200
+
+echo "=== set topic2 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/startsend 200
+
+echo "=== set topic3 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/startsend 200
+
+echo "=== set topic4 start sending ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/startsend 200
+
+echo "=== set topic1 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/startreceive 200
+
+echo "=== set topic2 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/startreceive 200
+
+echo "=== set topic3 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/startreceive 200
+
+echo "=== set topic4 start receiving ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/startreceive 200
+
+
+# Send and receive on all topics
+
+echo "=== send a msg on topic1 ==="
+echo "TEST101" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic1/msg 200  $payload
+
+echo "=== send two msg on topic2 ==="
+echo "TEST201" > $payload
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic2/msg 200  $payload
+echo "TEST202" > $payload
+do_curl POST /topics/topic2/msg 200  $payload
+
+echo "=== send three msg on topic3 ==="
+echo "{\"a\":\"msg301\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic3/msg 200  $payload
+echo "{\"a\":\"msg302\"}" > $payload
+do_curl POST /topics/topic3/msg 200  $payload
+echo "{\"a\":\"msg303\"}" > $payload
+do_curl POST /topics/topic3/msg 200  $payload
+
+
+echo "=== send four msg on topic4 ==="
+echo "{\"a\":\"msg401\"}" > $payload
+REQ_CONTENT="application/json"
+RESP_CONTENT="text/plain"
+RESULT="*"
+do_curl POST /topics/topic4/msg 200  $payload
+echo "{\"a\":\"msg402\"}" > $payload
+do_curl POST /topics/topic4/msg 200  $payload
+echo "{\"a\":\"msg403\"}" > $payload
+do_curl POST /topics/topic4/msg 200  $payload
+echo "{\"a\":\"msg404\"}" > $payload
+do_curl POST /topics/topic4/msg 200  $payload
+
+echo "sleep 10 to allow kafka to process msg"
+sleep 10
+
+echo "=== get global counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="11"
+do_curl GET /counters/sent 200
+
+echo "=== get global counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="11"
+do_curl GET /counters/received 200
+
+
+echo "=== get topic1 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic1/counters/sent 200
+
+echo "=== get topic1 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic1/counters/received 200
+
+
+echo "=== get topic2 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic2/counters/sent 200
+
+echo "=== get topic2 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="2"
+do_curl GET /topics/topic2/counters/received 200
+
+
+echo "=== get topic3 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="3"
+do_curl GET /topics/topic3/counters/sent 200
+
+echo "=== get topic3 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="3"
+do_curl GET /topics/topic3/counters/received 200
+
+
+echo "=== get topic4 counter sent ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="4"
+do_curl GET /topics/topic4/counters/sent 200
+
+echo "=== get topic4 counter received ==="
+REQ_CONTENT=""
+RESP_CONTENT="text/plain"
+RESULT="4"
+do_curl GET /topics/topic4/counters/received 200
+
+
+echo "=== get a msg on topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST101"
+do_curl GET /topics/topic1/msg 200
+
+
+echo "=== attempt to receive a msg from topic1 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic1/msg 204
+
+echo "=== get a two msg on topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="text/plain"
+RESULT="TEST201"
+do_curl GET /topics/topic2/msg 200
+RESULT="TEST202"
+do_curl GET /topics/topic2/msg 200
+
+
+echo "=== attempt to receive a msg from topic2 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic2/msg 204
+
+echo "=== get three msg on topic3 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="application/json"
+RESULT="json:{\"a\":\"msg301\"}"
+do_curl GET /topics/topic3/msg 200
+RESULT="json:{\"a\":\"msg302\"}"
+do_curl GET /topics/topic3/msg 200
+RESULT="json:{\"a\":\"msg303\"}"
+do_curl GET /topics/topic3/msg 200
+
+echo "=== attempt to receive a msg from topic3 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic3/msg 204
+
+echo "=== send four msg on topic4 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT="application/json"
+RESULT="json:{\"a\":\"msg401\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg402\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg403\"}"
+do_curl GET /topics/topic4/msg 200
+RESULT="json:{\"a\":\"msg404\"}"
+do_curl GET /topics/topic4/msg 200
+
+echo "=== attempt to receive a msg from topic4 ==="
+REQ_CONTENT="text/plain"
+RESP_CONTENT=""
+RESULT="*"
+do_curl GET /topics/topic4/msg 204
+
+echo "********************"
+echo "*** All tests ok ***"
+echo "********************"
+
diff --git a/test/kafka-procon/build-and-start.sh b/test/kafka-procon/build-and-start.sh
new file mode 100755
index 0000000..4e4a550
--- /dev/null
+++ b/test/kafka-procon/build-and-start.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2021 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+echo "This script requires running kafka instance in a docker private network"
+
+# Script to build and start the container
+if [ $# -ne 2 ]; then
+    echo "usage: ./build-and-start.sh <docker-network> <kafka-boostrapserver-host>:<kafka-boostrapserver-port>"
+    echo "example: ./build-and-start.sh nonrtric-docker-net message-router-kafka:9092"
+    exit 1
+fi
+IMAGE="kafka-procon:latest"
+#Build the image
+docker build -t $IMAGE .
+
+if [ $? -ne 0 ]; then
+    echo "Build failed, exiting..."
+    exit 1
+fi
+
+echo "Starting kafka-procon"
+#Run the container in interactive mode o port 8090.
+docker run --rm -it -p "8090:8090" --network $1 -e KAFKA_BOOTSTRAP_SERVER=$2 --name kafka-procon $IMAGE
+
diff --git a/test/kafka-procon/go.mod b/test/kafka-procon/go.mod
new file mode 100644
index 0000000..31ccc7c
--- /dev/null
+++ b/test/kafka-procon/go.mod
@@ -0,0 +1,9 @@
+module kafkaprocon
+
+go 1.17
+
+require (
+	github.com/confluentinc/confluent-kafka-go v1.7.0 // indirect
+	github.com/enriquebris/goconcurrentqueue v0.6.0 // indirect
+	github.com/gorilla/mux v1.8.0 // indirect
+)
diff --git a/test/kafka-procon/go.sum b/test/kafka-procon/go.sum
new file mode 100644
index 0000000..34a6358
--- /dev/null
+++ b/test/kafka-procon/go.sum
@@ -0,0 +1,6 @@
+github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
+github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
+github.com/enriquebris/goconcurrentqueue v0.6.0 h1:DJ97cgoPVoqlC4tTGBokn/omaB3o16yIs5QdAm6YEjc=
+github.com/enriquebris/goconcurrentqueue v0.6.0/go.mod h1:wGJhQNFI4wLNHleZLo5ehk1puj8M6OIl0tOjs3kwJus=
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
diff --git a/test/kafka-procon/main.go b/test/kafka-procon/main.go
new file mode 100644
index 0000000..6f8bad2
--- /dev/null
+++ b/test/kafka-procon/main.go
@@ -0,0 +1,595 @@
+// Writing a basic HTTP server is easy using the
+// `net/http` package.
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/confluentinc/confluent-kafka-go/kafka"
+	"github.com/enriquebris/goconcurrentqueue"
+	"github.com/gorilla/mux"
+)
+
+// Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
+//    globalCounters      var holding the "global counters"
+//      recieved          number of received messages from all topics                             (int)
+//      sent              number of sent messages to all topics                                   (int)
+//    topics              var holding all topic related info
+//      <topic-name>      name of a topic (present after topic is created)
+//        content-type    data type of the topic                                                  (string)
+//        counters
+//          recieved      number of received messages from the topic                              (int)
+//          sent          number of sent messages to the topic                                    (int)
+//        messages
+//          send          messages waiting to be sent (set when sending is started)               (fifo)
+//          received      received messages waiting to be fetched (set when reception is started) (fifo)
+
+type counter struct {
+	c uint64
+}
+
+func (c *counter) step() {
+	atomic.AddUint64(&c.c, 1)
+}
+
+func (c counter) get() uint64 {
+	return atomic.LoadUint64(&c.c)
+}
+
+type counters struct {
+	received counter
+	sent     counter
+}
+
+func newCounters() counters {
+	return counters{
+		received: counter{},
+		sent:     counter{},
+	}
+}
+
+type messages struct {
+	send     *goconcurrentqueue.FIFO
+	received *goconcurrentqueue.FIFO
+}
+
+func (m *messages) startSend() bool {
+	if m.send == nil {
+		m.send = goconcurrentqueue.NewFIFO()
+		return true
+	}
+	return false
+}
+
+func (m *messages) stopSend() {
+	m.send = nil
+}
+
+func (m *messages) addToSend(msg string) error {
+	if m.send == nil {
+		return fmt.Errorf("sending not started")
+	}
+	m.send.Lock()
+	defer m.send.Unlock()
+	return m.send.Enqueue(msg)
+}
+
+func (m *messages) getToSend() (interface{}, error) {
+	if m.send == nil {
+		return "", fmt.Errorf("sending not started")
+	}
+	m.send.Lock()
+	defer m.send.Unlock()
+	return m.send.Dequeue()
+}
+
+func (m *messages) startReceive() bool {
+	if m.received == nil {
+		m.received = goconcurrentqueue.NewFIFO()
+		return true
+	}
+	return false
+}
+
+func (m *messages) stopReceive() {
+	m.send = nil
+}
+
+type topic struct {
+	contentType string
+	counters    counters
+	messages    messages
+}
+
+func newTopic(ct string) *topic {
+	return &topic{
+		contentType: ct,
+		counters:    counters{},
+		messages:    messages{},
+	}
+}
+
+var globalCounters counters
+var topics map[string]*topic = make(map[string]*topic)
+
+var bootstrapserver = ""
+
+func initApp() {
+	bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
+	if len(bootstrapserver) == 0 {
+		fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
+		fmt.Println("Exiting... ")
+		os.Exit(1)
+	}
+	fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
+}
+
+//Helper function to get a created topic, if it exists
+func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
+	topicId := mux.Vars(req)["topic"]
+	t, exist := topics[topicId]
+	if exist == false {
+		w.WriteHeader(http.StatusNotFound)
+		fmt.Fprintf(w, "Topic %v does not exist", topicId)
+		return nil, "", false
+	}
+	return t, topicId, true
+}
+
+// Alive check
+// GET on /
+func healthCheck(w http.ResponseWriter, req *http.Request) {
+	fmt.Fprintf(w, "OK")
+}
+
+// Deep reset of this interface stub - no removal of msgs or topics in kafka
+// POST on /reset
+func allreset(w http.ResponseWriter, req *http.Request) {
+	for _, v := range topics {
+		v.messages.stopSend()
+		v.messages.stopReceive()
+	}
+	time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
+	globalCounters = newCounters()
+	topics = make(map[string]*topic)
+	fmt.Fprintf(w, "OK")
+}
+
+// Get topics, return json array of strings of topics created by this interface stub
+// Returns json array of strings, array is empty if no topics exist
+// GET on /topics
+
+func getTopics(w http.ResponseWriter, req *http.Request) {
+	topicKeys := make([]string, 0, len(topics))
+	fmt.Printf("len topics: %v\n", len(topics))
+	for k := range topics {
+		topicKeys = append(topicKeys, k)
+	}
+	var j, err = json.Marshal(topicKeys)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
+		return
+	} else {
+		w.Header().Set("Content-Type", "application/json")
+		w.WriteHeader(http.StatusOK)
+		w.Write(j)
+	}
+}
+
+func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
+	w.WriteHeader(httpStatus)
+	w.Header().Set("Content-Type", "text/plain")
+	fmt.Fprintf(w, msg)
+}
+
+// Get a counter value
+// GET /topics/counters/{counter}
+func getCounter(w http.ResponseWriter, req *http.Request) {
+	ctr := mux.Vars(req)["counter"]
+	var ctrvalue = -1
+	if ctr == "received" {
+		ctrvalue = int(globalCounters.received.get())
+	} else if ctr == "sent" {
+		ctrvalue = int(globalCounters.sent.get())
+	}
+
+	if ctrvalue == -1 {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprintf(w, "Counter %v does not exist", ctr)
+		return
+	}
+	writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
+	return
+
+}
+
+// Create a topic
+// PUT on /topics/<topic>?type=<type>    type shall be 'application/json' or 'text/plain'
+func createTopic(w http.ResponseWriter, req *http.Request) {
+	topicId := mux.Vars(req)["topic"]
+	topicType := mux.Vars(req)["type"]
+
+	fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
+
+	if len(topicType) == 0 {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprintf(w, "Type not specified")
+		return
+	}
+
+	tid, exist := topics[topicId]
+	if exist == true {
+		if tid.contentType != topicType {
+			w.WriteHeader(http.StatusBadRequest)
+			fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
+			return
+		}
+		writeOkRepsonse(w, http.StatusOK, "Topic exist")
+		return
+	}
+
+	t := newTopic(topicType)
+
+	a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+	defer func() { a.Close() }()
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
+		return
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	maxDur := 10 * time.Second
+
+	_, err = a.CreateTopics(
+		ctx,
+		[]kafka.TopicSpecification{{
+			Topic:             topicId,
+			NumPartitions:     1,
+			ReplicationFactor: 1}},
+		kafka.SetAdminOperationTimeout(maxDur))
+
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
+		return
+	}
+	topics[topicId] = t
+	w.WriteHeader(http.StatusCreated)
+	fmt.Fprintf(w, "Topic created")
+}
+
+// Get topic type
+// GET on /topic/<topic>
+func getTopic(w http.ResponseWriter, req *http.Request) {
+	t, _, exist := getTopicFromRequest(w, req)
+	if !exist {
+		return
+	}
+	w.WriteHeader(http.StatusOK)
+	fmt.Fprintf(w, t.contentType)
+}
+
+// Get a topics counter value
+// GET /topics/{topic}/counters/{counter}
+func getTopicCounter(w http.ResponseWriter, req *http.Request) {
+	t, _, exist := getTopicFromRequest(w, req)
+	if !exist {
+		return
+	}
+	ctr := mux.Vars(req)["counter"]
+
+	var ctrvalue = -1
+	if ctr == "received" {
+		ctrvalue = int(t.counters.received.get())
+	} else if ctr == "sent" {
+		ctrvalue = int(t.counters.sent.get())
+	}
+
+	if ctrvalue == -1 {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprintf(w, "Counter %v does not exist", ctr)
+		return
+	}
+	w.WriteHeader(http.StatusOK)
+	fmt.Fprintf(w, strconv.Itoa(ctrvalue))
+	return
+}
+
+func startToSend(w http.ResponseWriter, req *http.Request) {
+	t, topicId, exist := getTopicFromRequest(w, req)
+	if !exist {
+		return
+	}
+
+	if !t.messages.startSend() {
+		w.WriteHeader(http.StatusOK)
+		fmt.Fprintf(w, "Already started sending")
+		return
+	}
+	go func() {
+		p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+		if err != nil {
+			fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
+			return
+		}
+		defer func() { p.Close() }()
+		for {
+			q := t.messages.send
+			if q == nil {
+				return
+			}
+			m, err := q.Get(0)
+			if err == nil {
+				err = p.Produce(&kafka.Message{
+					TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
+					Value:          []byte(fmt.Sprintf("%v", m)),
+				}, nil)
+				if err == nil {
+					q.Remove(0)
+					t.counters.sent.step()
+					globalCounters.sent.step()
+					msg := fmt.Sprintf("%v", m)
+					if len(msg) < 500 {
+						fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg)
+					} else {
+						fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg))
+					}
+				} else {
+					fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err)
+					q.Remove(0)
+				}
+			} else {
+				time.Sleep(10 * time.Millisecond)
+			}
+		}
+	}()
+
+	w.WriteHeader(http.StatusOK)
+	fmt.Fprintf(w, "Sending started")
+}
+
+func startToReceive(w http.ResponseWriter, req *http.Request) {
+	t, topicId, exist := getTopicFromRequest(w, req)
+	if !exist {
+		return
+	}
+
+	if !t.messages.startReceive() {
+		w.WriteHeader(http.StatusOK)
+		fmt.Fprintf(w, "Already started receiving")
+		return
+	}
+
+	go func() {
+
+		defer func() { t.messages.stopReceive() }()
+
+		groudId := "kafkaprocon"
+
+		c, err := kafka.NewConsumer(&kafka.ConfigMap{
+			"bootstrap.servers":       bootstrapserver,
+			"group.id":                groudId,
+			"auto.offset.reset":       "earliest",
+			"enable.auto.commit":      true,
+			"auto.commit.interval.ms": 5000,
+		})
+		if err != nil {
+			fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
+			t.messages.stopReceive()
+			return
+		}
+		c.Commit()
+		defer func() { c.Close() }()
+		for {
+			que := t.messages.received
+			if que == nil {
+				fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
+				return
+			}
+			fmt.Printf("Start subscribing on topic: %v\n", topicId)
+			err = c.SubscribeTopics([]string{topicId}, nil)
+			if err != nil {
+				fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
+				return
+			}
+			maxDur := 1 * time.Second
+			for {
+				msg, err := c.ReadMessage(maxDur)
+				if err == nil {
+					if len(msg.Value) < 500 {
+						fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
+					} else {
+						fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value))
+					}
+					err = t.messages.received.Enqueue(string(msg.Value))
+					if err != nil {
+						w.WriteHeader(http.StatusInternalServerError)
+						fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
+						return
+					}
+					t.counters.received.step()
+					globalCounters.received.step()
+				} else {
+					fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
+				}
+			}
+		}
+	}()
+
+	w.WriteHeader(http.StatusOK)
+	fmt.Fprintf(w, "Receiving started")
+}
+
+// Post a message to a topic
+// POST /send    content type is specified in content type
+func sendToTopic(w http.ResponseWriter, req *http.Request) {
+	t, topicId, exist := getTopicFromRequest(w, req)
+	if !exist {
+		return
+	}
+	q := t.messages.send
+	if q == nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
+		return
+	}
+	ct := req.Header.Get("Content-Type")
+	if ct != t.contentType {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
+		return
+	}
+
+	if ct == "application/json" {
+		// decoder := json.NewDecoder(req.Body)
+		// var j :=
+		// err := decoder.Decode(&j)
+		// if err != nil {
+		// 	w.WriteHeader(http.StatusBadRequest)
+		// 	w.Header().Set("Content-Type", "text/plain")
+		// 	fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
+		// 	return
+		// }
+		//m = mux.Vars(req)[""]
+		if err := req.ParseForm(); err != nil {
+			w.WriteHeader(http.StatusBadRequest)
+			fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
+			return
+		}
+		b, err := ioutil.ReadAll(req.Body)
+		if err == nil {
+			if len(b) < 500 {
+				fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b))
+			} else {
+				fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+			}
+		}
+		err = q.Enqueue(string(b))
+		if err != nil {
+			w.WriteHeader(http.StatusInternalServerError)
+			fmt.Fprintf(w, "Json message to send cannot be put in queue")
+			return
+		}
+	} else if ct == "text/plain" {
+		if err := req.ParseForm(); err != nil {
+			w.WriteHeader(http.StatusBadRequest)
+			fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
+			return
+		}
+		b, err := ioutil.ReadAll(req.Body)
+		if err == nil {
+			if len(b) < 500 {
+				fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b))
+			} else {
+				fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+			}
+		}
+		err = q.Enqueue(string(b))
+		if err != nil {
+			w.WriteHeader(http.StatusInternalServerError)
+			fmt.Fprintf(w, "Text message to send cannot be put in queue")
+			return
+		}
+	} else {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
+		return
+	}
+
+	w.WriteHeader(http.StatusOK)
+	w.Header().Set("Content-Type", "text/plain")
+	fmt.Fprintf(w, "Message to send put in queue")
+}
+
+// Get zero or one message from a topic
+// GET /receive
+func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
+	t, topicId, exist := getTopicFromRequest(w, req)
+	if !exist {
+		return
+	}
+	if t.messages.received == nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
+		return
+	}
+
+	m, err := t.messages.received.Dequeue()
+	if err != nil {
+		w.WriteHeader(http.StatusNoContent)
+		return
+	}
+
+	w.Header().Set("Content-Type", t.contentType)
+	w.WriteHeader(http.StatusOK)
+	fmt.Fprintf(w, "%v", m)
+}
+
+// Remove the send queue to stop sending
+func stopToSend(w http.ResponseWriter, req *http.Request) {
+	fmt.Printf("Stop sending\n")
+	t, _, exist := getTopicFromRequest(w, req)
+	if !exist {
+		return
+	}
+	t.messages.stopSend()
+	w.WriteHeader(http.StatusNoContent)
+}
+
+// Remove the receive queue to stop receiving
+func stopToReceive(w http.ResponseWriter, req *http.Request) {
+	fmt.Printf("Stop receiving\n")
+	t, _, exist := getTopicFromRequest(w, req)
+	if !exist {
+		return
+	}
+	t.messages.stopReceive()
+	w.WriteHeader(http.StatusNoContent)
+}
+
+func HelloServer(w http.ResponseWriter, r *http.Request) {
+	fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
+}
+
+func main() {
+
+	initApp()
+
+	r := mux.NewRouter()
+
+	r.HandleFunc("/", healthCheck).Methods("GET")
+	r.HandleFunc("/reset", allreset).Methods("POST")
+	r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
+	r.HandleFunc("/topics", getTopics).Methods("GET")
+	r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
+	r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
+	r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
+	r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
+	r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
+	r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
+	r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
+	r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
+	r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
+
+	port := "8090"
+	srv := &http.Server{
+		Handler:      r,
+		Addr:         ":" + port,
+		WriteTimeout: 15 * time.Second,
+		ReadTimeout:  15 * time.Second,
+	}
+	fmt.Println("Running on port: " + port)
+	fmt.Printf(srv.ListenAndServe().Error())
+}
diff --git a/test/kafka-procon/start_local.sh b/test/kafka-procon/start_local.sh
new file mode 100755
index 0000000..bfc1a1b
--- /dev/null
+++ b/test/kafka-procon/start_local.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2021 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+echo "This script requires golang to be installed and a running kafka instance on (or availble to) localhost"
+
+# Script to build and start app locally
+if [ $# -ne 1 ]; then
+    echo "usage: ./start-local.sh <kafka-boostrapserver-port>"
+    echo "example: ./start-local.sh 30098"
+    exit 1
+fi
+
+export KAFKA_BOOTSTRAP_SERVER=localhost:$1
+
+echo "Starting kafka-procon on local machine"
+go run main.go