Helm charts and apps for pm-setup

Helm charts and scripts for installation
Apps - https-server, pm-rapp and kafka-pm-producer

Issue-ID: NONRTRIC-854
Signed-off-by: BjornMagnussonXA <bjorn.magnusson@est.tech>
Change-Id: I167b377f7d1a54923a040b05c4177afbb87ad7ef
diff --git a/pm-rapp/.gitignore b/pm-rapp/.gitignore
new file mode 100644
index 0000000..d39c18f
--- /dev/null
+++ b/pm-rapp/.gitignore
@@ -0,0 +1 @@
+REM_*
diff --git a/pm-rapp/Dockerfile b/pm-rapp/Dockerfile
new file mode 100644
index 0000000..1d138c5
--- /dev/null
+++ b/pm-rapp/Dockerfile
@@ -0,0 +1,35 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+FROM golang:1.19-bullseye AS build
+WORKDIR /app
+COPY go.mod .
+COPY go.sum .
+RUN go mod download
+COPY main.go .
+RUN go build -o /pm-rapp
+
+#Replaced distroless image with ubuntu for debug purposes (seem to be cert problems with distroless image...?)
+FROM gcr.io/distroless/base-debian11
+#FROM ubuntu
+WORKDIR /
+## Copy from "build" stage
+COPY --from=build /pm-rapp .
+
+##Uncomment this when using distroless image
+USER nonroot:nonroot
+ENTRYPOINT ["/pm-rapp"]
diff --git a/pm-rapp/README.md b/pm-rapp/README.md
new file mode 100644
index 0000000..99a8260
--- /dev/null
+++ b/pm-rapp/README.md
@@ -0,0 +1,16 @@
+
+
+## Basic rAPP for demo purpose - starts a subscription and prints out received data on the topic to stdout
+
+
+### Manual build, tag and push to image repo
+
+Build for docker or local kubernetes\
+`./build.sh no-push [<image-tag>]`
+
+Build for remote kubernetes - an externally accessible image repo (e.g. docker hub) is needed  \
+`./build.sh <external-image-repo> [<image-tag>]`
+
+
+### Configuration
+
diff --git a/pm-rapp/TODO.txt b/pm-rapp/TODO.txt
new file mode 100644
index 0000000..a4a0b45
--- /dev/null
+++ b/pm-rapp/TODO.txt
@@ -0,0 +1,10 @@
+
+pm-rapp
+=======
+Dockerfile
+- Remove ref to ubuntu image (if distroless image work ok)
+
+
+
+install/helm
+============
diff --git a/pm-rapp/build.sh b/pm-rapp/build.sh
new file mode 100755
index 0000000..581b5c2
--- /dev/null
+++ b/pm-rapp/build.sh
@@ -0,0 +1,78 @@
+#!/bin/bash
+
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# Build image from Dockerfile with/without custom image tag
+# Optionally push to external docker hub repo
+
+print_usage() {
+    echo "Usage: build.sh no-push|<docker-hub-repo-name> [<image-tag>]"
+    exit 1
+}
+
+if [ $# -ne 1 ] && [ $# -ne 2 ]; then
+    print_usage
+fi
+
+IMAGE_NAME="pm-rapp"
+IMAGE_TAG="latest"
+REPO=""
+if [ $1 == "no-push" ]; then
+    echo "Only local image build"
+else
+    REPO=$1
+    echo "Attempt to push built image to: "$REPO
+fi
+
+if [ "$2" != "" ]; then
+    IMAGE_TAG=$2
+fi
+ echo "Setting image tag to: "$IMAGE_TAG
+
+IMAGE=$IMAGE_NAME:$IMAGE_TAG
+echo "Building image $IMAGE"
+docker build -t $IMAGE_NAME:$IMAGE_TAG .
+if [ $? -ne 0 ]; then
+    echo "BUILD FAILED"
+    exit 1
+fi
+echo "BUILD OK"
+
+if [ "$REPO" != "" ]; then
+    echo "Tagging image"
+    NEW_IMAGE=$REPO/$IMAGE_NAME:$IMAGE_TAG
+    docker tag $IMAGE $NEW_IMAGE
+    if [ $? -ne 0 ]; then
+        echo "RE-TAGGING FAILED"
+        exit 1
+    fi
+    echo "RE-TAG OK"
+
+    echo "Pushing image $NEW_IMAGE"
+    docker push $NEW_IMAGE
+    if [ $? -ne 0 ]; then
+        echo "PUSHED FAILED"
+        echo " Perhaps not logged into docker-hub repo $REPO?"
+        exit 1
+    fi
+    IMAGE=$NEW_IMAGE
+    echo "PUSH OK"
+fi
+
+echo "IMAGE OK: $IMAGE"
+echo "DONE"
diff --git a/pm-rapp/container.yaml b/pm-rapp/container.yaml
new file mode 100644
index 0000000..658fb57
--- /dev/null
+++ b/pm-rapp/container.yaml
@@ -0,0 +1,22 @@
+#  ============LICENSE_START===============================================
+#  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+#  ========================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=================================================
+#
+
+# The Jenkins job requires a tag to build the Docker image.
+# By default this file is in the docker build directory,
+# but the location can configured in the JJB template.
+---
+tag: 1.0.0
\ No newline at end of file
diff --git a/pm-rapp/go.mod b/pm-rapp/go.mod
new file mode 100644
index 0000000..7b5edbe
--- /dev/null
+++ b/pm-rapp/go.mod
@@ -0,0 +1,21 @@
+module main
+
+go 1.19
+
+require (
+	github.com/confluentinc/confluent-kafka-go v1.9.3-RC3
+	github.com/gorilla/mux v1.8.0
+	github.com/json-iterator/go v1.1.11
+	github.com/sirupsen/logrus v1.9.0
+	golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
+)
+
+require (
+	github.com/golang/protobuf v1.5.2 // indirect
+	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+	github.com/modern-go/reflect2 v1.0.1 // indirect
+	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
+	golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
+	google.golang.org/appengine v1.4.0 // indirect
+	google.golang.org/protobuf v1.28.0 // indirect
+)
diff --git a/pm-rapp/go.sum b/pm-rapp/go.sum
new file mode 100644
index 0000000..79c006a
--- /dev/null
+++ b/pm-rapp/go.sum
@@ -0,0 +1,231 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
+github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
+github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
+github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
+github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
+github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/confluentinc/confluent-kafka-go v1.9.3-RC3 h1:urDeBIsNr0hgRf3nZn66SHtdBO/4DmEDSsMWquaolmo=
+github.com/confluentinc/confluent-kafka-go v1.9.3-RC3/go.mod h1:NlSjbxG73kJM0iKUC6/CDbMnY3H4WF6e+YFBlfLffi8=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/frankban/quicktest v1.2.2/go.mod h1:Qh/WofXFeiAFII1aEBu529AtJo6Zg2VHscnEsbBnJ20=
+github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
+github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y=
+github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.2.1-0.20190312032427-6f77996f0c42/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
+github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
+github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8=
+github.com/heetch/avro v0.3.1/go.mod h1:4xn38Oz/+hiEUTpbVfGVLfvOg0yKLlRP7Q9+gJJILgA=
+github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA=
+github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
+github.com/invopop/jsonschema v0.4.0/go.mod h1:O9uiLokuu0+MGFlyiaqtWxwqJm41/+8Nj0lD7A36YH0=
+github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
+github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
+github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
+github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
+github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
+github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
+github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJMErpKy6A4=
+github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
+github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
+github.com/linkedin/goavro/v2 v2.10.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
+github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g=
+github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ=
+github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
+github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
+github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
+github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
+github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
+github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
+google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
+google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
+google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
+google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/errgo.v1 v1.0.0/go.mod h1:CxwszS/Xz1C49Ucd2i6Zil5UToP1EmyrFhKaMVbg1mk=
+gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/httprequest.v1 v1.2.1/go.mod h1:x2Otw96yda5+8+6ZeWwHIJTFkEHWP/qP8pJOzqEtWPM=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/retry.v1 v1.0.3/go.mod h1:FJkXmWiMaAo7xB+xhvDF59zhfjDWyzmyAxiT4dB688g=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/pm-rapp/main.go b/pm-rapp/main.go
new file mode 100644
index 0000000..4113a42
--- /dev/null
+++ b/pm-rapp/main.go
@@ -0,0 +1,829 @@
+//  ============LICENSE_START===============================================
+//  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+//  ========================================================================
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//  ============LICENSE_END=================================================
+//
+
+package main
+
+import (
+	"bytes"
+	"compress/gzip"
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/http/pprof"
+	"os"
+	"os/signal"
+	"runtime"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/confluentinc/confluent-kafka-go/kafka"
+	"github.com/gorilla/mux"
+	jsoniter "github.com/json-iterator/go"
+	log "github.com/sirupsen/logrus"
+	"golang.org/x/oauth2/clientcredentials"
+)
+
+type JobDefinition struct {
+	InfoTypeID    string `json:"info_type_id"`
+	JobOwner      string `json:"job_owner"`
+	JobResultURI  string `json:"job_result_uri"`
+	JobDefinition struct {
+		KafkaOutputTopic string          `json:"kafkaOutputTopic"`
+		FilterType       string          `json:"filterType"`
+		Filter           json.RawMessage `json:"filter"`
+		DeliveryInfo     struct {
+			Topic            string `json:"topic"`
+			BootStrapServers string `json:"bootStrapServers"`
+		} `json:"deliveryInfo"`
+	} `json:"job_definition"`
+}
+
+const jobdef = "/config/jobDefinition.json"
+
+var rapp_id = os.Getenv("APPID")
+
+var rapp_ns = os.Getenv("APPNS")
+
+var bootstrapserver = os.Getenv("KAFKA_SERVER")
+
+var topic = os.Getenv("TOPIC")
+
+var ics_server = os.Getenv("ICS")
+
+var jwt_file = os.Getenv("JWT_FILE")
+
+var ssl_path = os.Getenv("SSLPATH")
+
+var gzipped_data = os.Getenv("GZIP")
+
+var log_payload = os.Getenv("LOG_PAYLOAD")
+
+// This are optional - if rapp is fethcing the token instead of the side car
+var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
+var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
+var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
+var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
+
+var gid = ""
+var cid = "cid-0"
+
+var msg_count int = 0
+var msg_corrupted_count int = 0
+
+var jobid = "<not-set>"
+var consumer_type = "<not-set>"
+
+var currentToken = ""
+
+var appStatus = "INIT"
+
+var msg_per_sec int = 0
+
+var httpclient = &http.Client{}
+
+// == Main ==//
+func main() {
+
+	log.SetLevel(log.InfoLevel)
+	log.SetLevel(log.DebugLevel)
+
+	log.Info("Server starting...")
+
+	if creds_service_url != "" {
+		log.Warn("Disabling jwt retrieval from side car")
+		jwt_file = ""
+	}
+
+	if rapp_id == "" {
+		log.Error("Env APPID not set")
+		os.Exit(1)
+	}
+
+	if rapp_ns == "" {
+		log.Error("Env APPNS not set")
+		os.Exit(1)
+	}
+
+	if bootstrapserver == "" {
+		log.Error("Env KAFKA_SERVER not set")
+		os.Exit(1)
+	}
+
+	if topic == "" {
+		log.Error("Env TOPIC not set")
+		os.Exit(1)
+	}
+
+	if ics_server == "" {
+		log.Error("Env ICS not set")
+		os.Exit(1)
+	}
+
+	rtr := mux.NewRouter()
+	rtr.HandleFunc("/statistics", statistics)
+	rtr.HandleFunc("/status", status)
+	rtr.HandleFunc("/logging/{level}", logging_level)
+	rtr.HandleFunc("/logging", logging_level)
+	rtr.HandleFunc("/", alive)
+
+	//For perf/mem profiling
+	rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
+
+	http.Handle("/", rtr)
+
+	fileBytes, err := os.ReadFile(jobdef)
+	if err != nil {
+		log.Error("Cannot read job defintion file: ", jobdef, err)
+		os.Exit(1)
+	}
+	fmt.Println("FROM FILE")
+	fmt.Println(string(fileBytes))
+
+	job_json := JobDefinition{}
+	err = jsoniter.Unmarshal([]byte(fileBytes), &job_json)
+	if err != nil {
+		log.Error("Cannot parse job defintion file: ", jobdef, err)
+		os.Exit(1)
+	}
+	job_type := job_json.InfoTypeID
+	job_json.JobDefinition.KafkaOutputTopic = topic
+	job_json.JobDefinition.DeliveryInfo.Topic = topic
+	job_json.JobDefinition.DeliveryInfo.BootStrapServers = bootstrapserver
+
+	gid = "pm-rapp-" + job_type + "-" + rapp_id
+
+	jobid = "rapp-job-" + job_type + "-" + rapp_id
+
+	json_bytes, err := json.Marshal(job_json)
+	if err != nil {
+		log.Error("Cannot marshal job json", err)
+		os.Exit(1)
+	}
+
+	json_str := string(json_bytes)
+
+	if strings.HasPrefix(bootstrapserver, "http://") {
+		if creds_service_url != "" {
+			consumer_type = "accesstoken strimzi bridge consumer"
+			retrive_token_strimzi()
+		}
+	} else {
+		go read_kafka_messages()
+	}
+
+	ok := false
+	if ics_server != "" {
+		for !ok {
+			log.Debug("Registring job: ", jobid, " json: ", json_str)
+			ok, _ = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
+			if !ok {
+				log.Info("Failed to register job: ", jobid, " - retrying")
+				time.Sleep(time.Second)
+			}
+		}
+	} else {
+		log.Info("No job registered - read from topic only")
+	}
+	if strings.HasPrefix(bootstrapserver, "http://") {
+		go read_bridge_messages()
+	}
+
+	go calc_average()
+
+	http_port := "80"
+	http_server := &http.Server{Addr: ":" + http_port, Handler: nil}
+
+	sigs := make(chan os.Signal, 1)
+	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+	go func() {
+		fmt.Println("Setting handler for signal sigint and sigterm")
+		sig := <-sigs
+		appStatus = "TERMINATING"
+		fmt.Printf("Received signal %s - application will terminate\n", sig)
+
+		if strings.HasPrefix(bootstrapserver, "http://") {
+			log.Debug("stopping strimzi consumer for job: ", jobid)
+			ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
+			if !ok {
+				log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - retrying")
+			}
+		}
+
+		ok := false
+		if ics_server != "" {
+			for !ok {
+				log.Debug("stopping job: ", jobid, " json: ", json_str)
+				ok, _ = send_http_request(nil, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
+				if !ok {
+					log.Info("Failed to stop job: ", jobid, " - retrying")
+					time.Sleep(time.Second)
+				}
+			}
+		}
+		http_server.Shutdown(context.Background())
+	}()
+	appStatus = "RUNNING"
+	log.Info("Starting http service...")
+	err = http_server.ListenAndServe()
+	if err == http.ErrServerClosed { // graceful shutdown
+		log.Info("http server shutdown...")
+		os.Exit(1)
+	} else if err != nil {
+		log.Error("http server error: ", err)
+		log.Info("http server shutdown...")
+		os.Exit(1)
+	}
+
+	//Wait until all go routines has exited
+	runtime.Goexit()
+
+	log.Warn("main routine exit")
+	log.Warn("server is stopping...")
+}
+
+// Simple alive check
+func alive(w http.ResponseWriter, req *http.Request) {
+	//Alive check
+}
+
+// Get/Set logging level
+func logging_level(w http.ResponseWriter, req *http.Request) {
+	vars := mux.Vars(req)
+	if level, ok := vars["level"]; ok {
+		if req.Method == http.MethodPut {
+			switch level {
+			case "trace":
+				log.SetLevel(log.TraceLevel)
+			case "debug":
+				log.SetLevel(log.DebugLevel)
+			case "info":
+				log.SetLevel(log.InfoLevel)
+			case "warn":
+				log.SetLevel(log.WarnLevel)
+			case "error":
+				log.SetLevel(log.ErrorLevel)
+			case "fatal":
+				log.SetLevel(log.FatalLevel)
+			case "panic":
+				log.SetLevel(log.PanicLevel)
+			default:
+				w.WriteHeader(http.StatusNotFound)
+			}
+		} else {
+			w.WriteHeader(http.StatusMethodNotAllowed)
+		}
+	} else {
+		if req.Method == http.MethodGet {
+			msg := "none"
+			if log.IsLevelEnabled(log.PanicLevel) {
+				msg = "panic"
+			} else if log.IsLevelEnabled(log.FatalLevel) {
+				msg = "fatal"
+			} else if log.IsLevelEnabled(log.ErrorLevel) {
+				msg = "error"
+			} else if log.IsLevelEnabled(log.WarnLevel) {
+				msg = "warn"
+			} else if log.IsLevelEnabled(log.InfoLevel) {
+				msg = "info"
+			} else if log.IsLevelEnabled(log.DebugLevel) {
+				msg = "debug"
+			} else if log.IsLevelEnabled(log.TraceLevel) {
+				msg = "trace"
+			}
+			w.Header().Set("Content-Type", "application/text")
+			w.Write([]byte(msg))
+		} else {
+			w.WriteHeader(http.StatusMethodNotAllowed)
+		}
+	}
+}
+
+// Get app state
+func status(w http.ResponseWriter, req *http.Request) {
+	if req.Method != http.MethodGet {
+		w.WriteHeader(http.StatusMethodNotAllowed)
+		return
+	}
+
+	_, err := w.Write([]byte(appStatus))
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		log.Error("Cannot send statistics json")
+		return
+	}
+}
+
+// producer statictics, all jobs
+func statistics(w http.ResponseWriter, req *http.Request) {
+	if req.Method != http.MethodGet {
+		w.WriteHeader(http.StatusMethodNotAllowed)
+		return
+	}
+	m := make(map[string]interface{})
+	log.Debug("rapp statictics")
+
+	req.Header.Set("Content-Type", "application/json; charset=utf-8")
+	m["number-of-messages"] = strconv.Itoa(msg_count)
+	m["number-of-corrupted-messages"] = strconv.Itoa(msg_corrupted_count)
+	m["job id"] = jobid
+	m["group id"] = gid
+	m["client id"] = cid
+	m["kafka consumer type"] = consumer_type
+	m["server"] = bootstrapserver
+	m["topic"] = topic
+	m["messages per sec"] = msg_per_sec
+
+	json, err := json.Marshal(m)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		log.Error("Cannot marshal statistics json")
+		return
+	}
+	_, err = w.Write(json)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		log.Error("Cannot send statistics json")
+		return
+	}
+}
+
+func calc_average() {
+
+	for true {
+		v := msg_count
+		time.Sleep(60 * time.Second)
+		msg_per_sec = (msg_count - v) / 60
+	}
+}
+
+func send_http_request(jsonData []byte, method string, url string, contentType string, accessToken string, alt_ok_response int, returnJson bool) (bool, map[string]interface{}) {
+
+	var req *http.Request
+	var err error
+	if jsonData != nil {
+		req, err = http.NewRequest(method, url, bytes.NewBuffer(jsonData))
+		if err != nil {
+			log.Error("Cannot create http request method: ", method, " url: ", url)
+			return false, nil
+		}
+		if contentType == "" {
+			req.Header.Set("Content-Type", "application/json; charset=utf-8")
+		} else {
+			req.Header.Set("Content-Type", contentType)
+		}
+	} else {
+		req, err = http.NewRequest(method, url, nil)
+		if err != nil {
+			log.Error("Cannot create http request method: ", method, " url: ", url)
+			return false, nil
+		}
+	}
+	if jwt_file != "" || creds_service_url != "" {
+		if accessToken != "" {
+			req.Header.Add("authorization", accessToken)
+		} else {
+			log.Error("Cannot create http request for url: ", url, " - token missing")
+			return false, nil
+		}
+	}
+	log.Debug("Http request: ", req)
+	resp, err2 := httpclient.Do(req)
+	if err2 != nil {
+		log.Error("Cannot send http request, method: ", method, "url: ", url)
+	} else {
+		if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
+
+			if returnJson {
+				defer resp.Body.Close()
+				body, err3 := ioutil.ReadAll(resp.Body)
+				if err3 != nil {
+					log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
+					return false, nil
+				} else {
+					var responseJson map[string]interface{}
+					err := json.Unmarshal(body, &responseJson)
+					if err != nil {
+						log.Error("Received msg not json? - cannot unmarshal")
+						return false, nil
+					}
+					fmt.Println(string(body))
+					log.Debug("Accepted response code: ", resp.StatusCode)
+					return true, responseJson
+				}
+			}
+
+			log.Debug("Accepted response code: ", resp.StatusCode)
+			return true, nil
+		} else {
+			if alt_ok_response != 0 && resp.StatusCode == alt_ok_response {
+
+				if returnJson {
+					defer resp.Body.Close()
+					body, err3 := ioutil.ReadAll(resp.Body)
+					if err3 != nil {
+						log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
+						return false, nil
+					} else {
+						var responseJson map[string]interface{}
+						err := json.Unmarshal(body, &responseJson)
+						if err != nil {
+							log.Error("Received msg not json? - cannot unmarshal")
+							return false, nil
+						}
+						fmt.Println(string(body))
+						log.Debug("Accepted alternative response code: ", resp.StatusCode)
+						return true, responseJson
+					}
+				}
+			} else {
+				log.Error("Bad response, method: ", method, " url: ", url, " resp: ", resp.StatusCode)
+			}
+		}
+	}
+	return false, nil
+
+}
+
+func retrive_token_strimzi() {
+	log.Debug("Get token inline - strimzi comm")
+
+	conf := &clientcredentials.Config{
+		ClientID:     creds_client_id,
+		ClientSecret: creds_client_secret,
+		TokenURL:     creds_service_url,
+	}
+	var modExpiry = time.Now()
+	ok := false
+	for !ok {
+		token, err := conf.Token(context.Background())
+		if err != nil {
+			log.Warning("Cannot fetch access token: ", err, " - retrying ")
+			time.Sleep(time.Second)
+			continue
+		}
+		log.Debug("token: ", token)
+		log.Debug("TokenValue: ", token.AccessToken)
+		log.Debug("Expiration: ", token.Expiry)
+		modExpiry = token.Expiry.Add(-time.Minute)
+		log.Debug("Modified expiration: ", modExpiry)
+		currentToken = token.AccessToken
+		ok = true
+	}
+	log.Debug("Initial token ok")
+	diff := modExpiry.Sub(time.Now())
+	go func() {
+		select {
+		case <-time.After(diff):
+			for !ok {
+				token, err := conf.Token(context.Background())
+				if err != nil {
+					log.Warning("Cannot fetch access token: ", err, " - retrying ")
+					time.Sleep(time.Second)
+					continue
+				}
+				log.Debug("token: ", token)
+				log.Debug("TokenValue: ", token.AccessToken)
+				log.Debug("Expiration: ", token.Expiry)
+				modExpiry = token.Expiry.Add(-time.Minute)
+				log.Debug("Modified expiration: ", modExpiry)
+				currentToken = token.AccessToken
+				ok = true
+			}
+			diff = modExpiry.Sub(time.Now())
+		}
+	}()
+}
+
+func retrive_token(c *kafka.Consumer) {
+	log.Debug("Get token inline")
+	conf := &clientcredentials.Config{
+		ClientID:     creds_client_id,
+		ClientSecret: creds_client_secret,
+		TokenURL:     creds_service_url,
+	}
+	token, err := conf.Token(context.Background())
+	if err != nil {
+		log.Warning("Cannot fetch access token: ", err)
+		c.SetOAuthBearerTokenFailure(err.Error())
+		return
+	}
+	extensions := map[string]string{}
+	log.Debug("token: ", token)
+	log.Debug("TokenValue: ", token.AccessToken)
+	log.Debug("Expiration: ", token.Expiry)
+	t := token.Expiry.Add(-time.Minute)
+	log.Debug("Modified expiration: ", t)
+	oauthBearerToken := kafka.OAuthBearerToken{
+		TokenValue: token.AccessToken,
+		Expiration: t,
+		Extensions: extensions,
+	}
+	log.Debug("Setting new token to consumer")
+	setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
+	currentToken = token.AccessToken
+	if setTokenError != nil {
+		log.Warning("Cannot cannot set token in client: ", setTokenError)
+		c.SetOAuthBearerTokenFailure(setTokenError.Error())
+	}
+}
+
+func gzipWrite(w io.Writer, data *[]byte) error {
+	gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
+
+	if err1 != nil {
+		return err1
+	}
+	defer gw.Close()
+	_, err2 := gw.Write(*data)
+	return err2
+}
+
+func read_bridge_messages() {
+
+	consumer_type = "unsecure strimzi bridge consumer"
+	if creds_service_url != "" {
+		consumer_type = "accesstoken strimzi bridge consumer"
+	}
+	ok := false
+	log.Debug("Cleaning consumer "+cid+" in group: ", gid)
+	ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
+	if !ok {
+		log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - it may not exist - ok")
+	}
+	var bridge_base_url = ""
+	ok = false
+	json_str := "{\"name\": \"" + cid + "\", \"auto.offset.reset\": \"latest\",\"format\": \"json\"}"
+	for !ok {
+		log.Debug("Creating consumer "+cid+" in group: ", gid)
+		var respJson map[string]interface{}
+		ok, respJson = send_http_request([]byte(json_str), http.MethodPost, bootstrapserver+"/consumers/"+gid, "application/vnd.kafka.v2+json", currentToken, 409, true) //409 if consumer already exists
+		if ok {
+			bridge_base_url = fmt.Sprintf("%s", respJson["base_uri"])
+		} else {
+			log.Info("Failed create consumer "+cid+" in group: ", gid, " - retrying")
+			time.Sleep(time.Second)
+		}
+	}
+
+	ok = false
+	json_str = "{\"topics\": [\"" + topic + "\"]}"
+
+	for !ok {
+		log.Debug("Subscribing to topic: ", topic)
+		ok, _ = send_http_request([]byte(json_str), http.MethodPost, bridge_base_url+"/subscription", "application/vnd.kafka.v2+json", currentToken, 0, false)
+		if !ok {
+			log.Info("Failed subscribe to topic: ", topic, " - retrying")
+			time.Sleep(time.Second)
+		}
+	}
+
+	for true {
+		log.Debug("Reading messages on topic: ", topic)
+
+		var req *http.Request
+		var err error
+		url := bridge_base_url + "/records"
+
+		req, err = http.NewRequest(http.MethodGet, url, nil)
+		if err != nil {
+			log.Error("Cannot create http request method: GET, url: ", url)
+			time.Sleep(1 * time.Second)
+			continue
+		}
+		req.Header.Set("accept", "application/vnd.kafka.json.v2+json")
+
+		if creds_service_url != "" {
+			if currentToken != "" {
+				req.Header.Add("authorization", currentToken)
+			} else {
+				log.Error("Cannot create http request for url: ", url, " - token missing")
+				time.Sleep(1 * time.Second)
+				continue
+			}
+		}
+
+		values := req.URL.Query()
+		values.Add("timeout", "10000")
+		req.URL.RawQuery = values.Encode()
+
+		log.Debug(req)
+
+		resp, err2 := httpclient.Do(req)
+		if err2 != nil {
+			log.Error("Cannot send http request, method: GET, url: ", url)
+			time.Sleep(1 * time.Second)
+			continue
+		} else {
+			body, err := ioutil.ReadAll(resp.Body)
+			resp.Body.Close()
+			if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
+				log.Debug("Accepted response code: ", resp.StatusCode)
+
+				if err != nil {
+					log.Error("Cannot read body, method: GET, url: ", url, " resp: ", resp.StatusCode)
+				} else {
+					var responseJson []interface{}
+					err := json.Unmarshal(body, &responseJson)
+					if err != nil {
+						log.Error("Received msg not json? - cannot unmarshal")
+						msg_corrupted_count++
+					} else {
+						if len(responseJson) == 0 {
+							log.Debug("No message")
+							continue
+						}
+						for _, item := range responseJson {
+							j, err := json.MarshalIndent(item, "", " ")
+							if err != nil {
+								log.Error("Message in array not json? - cannot unmarshal")
+								msg_corrupted_count++
+							} else {
+								msg_count++
+								if log_payload != "" {
+									fmt.Println("Message: " + string(j))
+								}
+							}
+						}
+					}
+				}
+
+				log.Debug("Commiting message")
+				ok, _ = send_http_request(nil, http.MethodPost, bridge_base_url+"/offsets", "", currentToken, 0, false)
+				if !ok {
+					log.Info("Failed to commit message")
+				}
+
+			} else {
+				log.Error("Bad response, method: GET, url: ", url, " resp: ", resp.StatusCode)
+				log.Error("Bad response, data: ", string(body))
+			}
+		}
+	}
+
+}
+
+func read_kafka_messages() {
+	var c *kafka.Consumer = nil
+	log.Info("Creating kafka consumer...")
+	var err error
+	for c == nil {
+		if jwt_file == "" && creds_service_url == "" {
+			if ssl_path == "" {
+				log.Info("unsecure consumer")
+				consumer_type = "kafka unsecure consumer"
+				c, err = kafka.NewConsumer(&kafka.ConfigMap{
+					"bootstrap.servers": bootstrapserver,
+					"group.id":          gid,
+					"client.id":         cid,
+					"auto.offset.reset": "latest",
+				})
+			} else {
+				log.Info("ssl consumer")
+				consumer_type = "kafka ssl consumer"
+				c, err = kafka.NewConsumer(&kafka.ConfigMap{
+					"bootstrap.servers":        bootstrapserver,
+					"group.id":                 gid,
+					"client.id":                cid,
+					"auto.offset.reset":        "latest",
+					"security.protocol":        "SSL",
+					"ssl.key.location":         ssl_path + "/clt.key",
+					"ssl.certificate.location": ssl_path + "/clt.crt",
+					"ssl.ca.location":          ssl_path + "/ca.crt",
+				})
+			}
+		} else {
+			if ssl_path != "" {
+				panic("SSL cannot be configued with JWT_FILE or RAPP_AUTH_SERVICE_URL")
+			}
+			log.Info("sasl consumer")
+			consumer_type = "kafka sasl unsecure consumer"
+			c, err = kafka.NewConsumer(&kafka.ConfigMap{
+				"bootstrap.servers": bootstrapserver,
+				"group.id":          gid,
+				"client.id":         cid,
+				"auto.offset.reset": "latest",
+				"sasl.mechanism":    "OAUTHBEARER",
+				"security.protocol": "SASL_PLAINTEXT",
+			})
+		}
+		if err != nil {
+			log.Warning("Cannot create kafka consumer - retrying, error: ", err)
+			time.Sleep(1 * time.Second)
+		}
+	}
+
+	log.Info("Creating kafka consumer - ok")
+	log.Info("Start subscribing to topic: ", topic)
+	topic_ok := false
+	for !topic_ok {
+		err = c.SubscribeTopics([]string{topic}, nil)
+		if err != nil {
+			log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying --  error details: ", err)
+		} else {
+			log.Info("Topic reader subscribing on topic: ", topic)
+			topic_ok = true
+		}
+	}
+
+	fileModTime := time.Now()
+	for {
+		if jwt_file != "" {
+			fileInfo, err := os.Stat(jwt_file)
+			if err == nil {
+				if fileModTime != fileInfo.ModTime() {
+					log.Debug("JWT file is updated")
+					fileModTime = fileInfo.ModTime()
+					fileBytes, err := ioutil.ReadFile(jwt_file)
+					if err != nil {
+						log.Error("JWT file read error: ", err)
+					} else {
+						fileString := string(fileBytes)
+						log.Info("JWT: ", fileString)
+						t := time.Now()
+						t15 := time.Second * 300
+						t = t.Add(t15)
+						oauthBearerToken := kafka.OAuthBearerToken{
+							TokenValue: fileString,
+							Expiration: t,
+						}
+						log.Debug("Setting new token to consumer")
+						setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
+						if setTokenError != nil {
+							log.Warning("Cannot cannot set token in client: ", setTokenError)
+						}
+					}
+				} else {
+					log.Debug("JWT file not updated - OK")
+				}
+			} else {
+				log.Error("JWT does not exist: ", err)
+			}
+		}
+		ev := c.Poll(1000)
+		if ev == nil {
+			log.Debug(" Nothing to consume on topic: ", topic)
+			continue
+		}
+		switch e := ev.(type) {
+		case *kafka.Message:
+			var pdata *[]byte = &e.Value
+			if gzipped_data != "" {
+				var buf bytes.Buffer
+				err = gzipWrite(&buf, pdata)
+				if err != nil {
+					log.Warning("Cannot unzip data")
+					pdata = nil
+				} else {
+					*pdata = buf.Bytes()
+					fmt.Println("Unzipped data")
+				}
+			}
+			if pdata != nil {
+				buf := &bytes.Buffer{}
+
+				if err := json.Indent(buf, *pdata, "", " "); err != nil {
+					log.Warning("Received msg not json?")
+				} else {
+					fmt.Println(buf.String())
+					msg_count++
+					fmt.Println("Number of received json msgs: " + strconv.Itoa(msg_count))
+				}
+			}
+			c.Commit()
+		case kafka.Error:
+			fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+
+		case kafka.OAuthBearerTokenRefresh:
+			if jwt_file == "" {
+				oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
+				fmt.Println(oart)
+				if !ok {
+					continue
+				}
+				retrive_token(c)
+			}
+		default:
+			fmt.Printf("Ignored %v\n", e)
+		}
+
+	}
+}