Helm charts and apps for pm-setup
Helm charts and scripts for installation
Apps - https-server, pm-rapp and kafka-pm-producer
Issue-ID: NONRTRIC-854
Signed-off-by: BjornMagnussonXA <bjorn.magnusson@est.tech>
Change-Id: I167b377f7d1a54923a040b05c4177afbb87ad7ef
diff --git a/pm-rapp/.gitignore b/pm-rapp/.gitignore
new file mode 100644
index 0000000..d39c18f
--- /dev/null
+++ b/pm-rapp/.gitignore
@@ -0,0 +1 @@
+REM_*
diff --git a/pm-rapp/Dockerfile b/pm-rapp/Dockerfile
new file mode 100644
index 0000000..1d138c5
--- /dev/null
+++ b/pm-rapp/Dockerfile
@@ -0,0 +1,35 @@
+# ============LICENSE_START===============================================
+# Copyright (C) 2023 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+FROM golang:1.19-bullseye AS build
+WORKDIR /app
+COPY go.mod .
+COPY go.sum .
+RUN go mod download
+COPY main.go .
+RUN go build -o /pm-rapp
+
+#Replaced distroless image with ubuntu for debug purposes (seem to be cert problems with distroless image...?)
+FROM gcr.io/distroless/base-debian11
+#FROM ubuntu
+WORKDIR /
+## Copy from "build" stage
+COPY --from=build /pm-rapp .
+
+##Uncomment this when using distroless image
+USER nonroot:nonroot
+ENTRYPOINT ["/pm-rapp"]
diff --git a/pm-rapp/README.md b/pm-rapp/README.md
new file mode 100644
index 0000000..99a8260
--- /dev/null
+++ b/pm-rapp/README.md
@@ -0,0 +1,16 @@
+
+
+## Basic rAPP for demo purpose - starts a subscription and prints out received data on the topic to stdout
+
+
+### Manual build, tag and push to image repo
+
+Build for docker or local kubernetes\
+`./build.sh no-push [<image-tag>]`
+
+Build for remote kubernetes - an externally accessible image repo (e.g. docker hub) is needed \
+`./build.sh <external-image-repo> [<image-tag>]`
+
+
+### Configuration
+
diff --git a/pm-rapp/TODO.txt b/pm-rapp/TODO.txt
new file mode 100644
index 0000000..a4a0b45
--- /dev/null
+++ b/pm-rapp/TODO.txt
@@ -0,0 +1,10 @@
+
+pm-rapp
+=======
+Dockerfile
+- Remove ref to ubuntu image (if distroless image work ok)
+
+
+
+install/helm
+============
diff --git a/pm-rapp/build.sh b/pm-rapp/build.sh
new file mode 100755
index 0000000..581b5c2
--- /dev/null
+++ b/pm-rapp/build.sh
@@ -0,0 +1,78 @@
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2023 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# Build image from Dockerfile with/without custom image tag
+# Optionally push to external docker hub repo
+
+print_usage() {
+ echo "Usage: build.sh no-push|<docker-hub-repo-name> [<image-tag>]"
+ exit 1
+}
+
+if [ $# -ne 1 ] && [ $# -ne 2 ]; then
+ print_usage
+fi
+
+IMAGE_NAME="pm-rapp"
+IMAGE_TAG="latest"
+REPO=""
+if [ $1 == "no-push" ]; then
+ echo "Only local image build"
+else
+ REPO=$1
+ echo "Attempt to push built image to: "$REPO
+fi
+
+if [ "$2" != "" ]; then
+ IMAGE_TAG=$2
+fi
+ echo "Setting image tag to: "$IMAGE_TAG
+
+IMAGE=$IMAGE_NAME:$IMAGE_TAG
+echo "Building image $IMAGE"
+docker build -t $IMAGE_NAME:$IMAGE_TAG .
+if [ $? -ne 0 ]; then
+ echo "BUILD FAILED"
+ exit 1
+fi
+echo "BUILD OK"
+
+if [ "$REPO" != "" ]; then
+ echo "Tagging image"
+ NEW_IMAGE=$REPO/$IMAGE_NAME:$IMAGE_TAG
+ docker tag $IMAGE $NEW_IMAGE
+ if [ $? -ne 0 ]; then
+ echo "RE-TAGGING FAILED"
+ exit 1
+ fi
+ echo "RE-TAG OK"
+
+ echo "Pushing image $NEW_IMAGE"
+ docker push $NEW_IMAGE
+ if [ $? -ne 0 ]; then
+ echo "PUSHED FAILED"
+ echo " Perhaps not logged into docker-hub repo $REPO?"
+ exit 1
+ fi
+ IMAGE=$NEW_IMAGE
+ echo "PUSH OK"
+fi
+
+echo "IMAGE OK: $IMAGE"
+echo "DONE"
diff --git a/pm-rapp/container.yaml b/pm-rapp/container.yaml
new file mode 100644
index 0000000..658fb57
--- /dev/null
+++ b/pm-rapp/container.yaml
@@ -0,0 +1,22 @@
+# ============LICENSE_START===============================================
+# Copyright (C) 2023 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# The Jenkins job requires a tag to build the Docker image.
+# By default this file is in the docker build directory,
+# but the location can configured in the JJB template.
+---
+tag: 1.0.0
\ No newline at end of file
diff --git a/pm-rapp/go.mod b/pm-rapp/go.mod
new file mode 100644
index 0000000..7b5edbe
--- /dev/null
+++ b/pm-rapp/go.mod
@@ -0,0 +1,21 @@
+module main
+
+go 1.19
+
+require (
+ github.com/confluentinc/confluent-kafka-go v1.9.3-RC3
+ github.com/gorilla/mux v1.8.0
+ github.com/json-iterator/go v1.1.11
+ github.com/sirupsen/logrus v1.9.0
+ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
+)
+
+require (
+ github.com/golang/protobuf v1.5.2 // indirect
+ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+ github.com/modern-go/reflect2 v1.0.1 // indirect
+ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
+ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
+ google.golang.org/appengine v1.4.0 // indirect
+ google.golang.org/protobuf v1.28.0 // indirect
+)
diff --git a/pm-rapp/go.sum b/pm-rapp/go.sum
new file mode 100644
index 0000000..79c006a
--- /dev/null
+++ b/pm-rapp/go.sum
@@ -0,0 +1,231 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
+github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
+github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
+github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
+github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
+github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/confluentinc/confluent-kafka-go v1.9.3-RC3 h1:urDeBIsNr0hgRf3nZn66SHtdBO/4DmEDSsMWquaolmo=
+github.com/confluentinc/confluent-kafka-go v1.9.3-RC3/go.mod h1:NlSjbxG73kJM0iKUC6/CDbMnY3H4WF6e+YFBlfLffi8=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/frankban/quicktest v1.2.2/go.mod h1:Qh/WofXFeiAFII1aEBu529AtJo6Zg2VHscnEsbBnJ20=
+github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
+github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y=
+github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
+github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.2.1-0.20190312032427-6f77996f0c42/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
+github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/pprof v0.0.0-20211008130755-947d60d73cc0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
+github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
+github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
+github.com/hamba/avro v1.5.6/go.mod h1:3vNT0RLXXpFm2Tb/5KC71ZRJlOroggq1Rcitb6k4Fr8=
+github.com/heetch/avro v0.3.1/go.mod h1:4xn38Oz/+hiEUTpbVfGVLfvOg0yKLlRP7Q9+gJJILgA=
+github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA=
+github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
+github.com/invopop/jsonschema v0.4.0/go.mod h1:O9uiLokuu0+MGFlyiaqtWxwqJm41/+8Nj0lD7A36YH0=
+github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
+github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
+github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
+github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
+github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
+github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
+github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJMErpKy6A4=
+github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
+github.com/linkedin/goavro/v2 v2.10.0/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
+github.com/linkedin/goavro/v2 v2.10.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
+github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g=
+github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ=
+github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
+github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
+github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
+github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
+github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
+github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
+google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
+google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
+google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
+google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/errgo.v1 v1.0.0/go.mod h1:CxwszS/Xz1C49Ucd2i6Zil5UToP1EmyrFhKaMVbg1mk=
+gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/httprequest.v1 v1.2.1/go.mod h1:x2Otw96yda5+8+6ZeWwHIJTFkEHWP/qP8pJOzqEtWPM=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/retry.v1 v1.0.3/go.mod h1:FJkXmWiMaAo7xB+xhvDF59zhfjDWyzmyAxiT4dB688g=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/pm-rapp/main.go b/pm-rapp/main.go
new file mode 100644
index 0000000..4113a42
--- /dev/null
+++ b/pm-rapp/main.go
@@ -0,0 +1,829 @@
+// ============LICENSE_START===============================================
+// Copyright (C) 2023 Nordix Foundation. All rights reserved.
+// ========================================================================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ============LICENSE_END=================================================
+//
+
+package main
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/http/pprof"
+ "os"
+ "os/signal"
+ "runtime"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/gorilla/mux"
+ jsoniter "github.com/json-iterator/go"
+ log "github.com/sirupsen/logrus"
+ "golang.org/x/oauth2/clientcredentials"
+)
+
+type JobDefinition struct {
+ InfoTypeID string `json:"info_type_id"`
+ JobOwner string `json:"job_owner"`
+ JobResultURI string `json:"job_result_uri"`
+ JobDefinition struct {
+ KafkaOutputTopic string `json:"kafkaOutputTopic"`
+ FilterType string `json:"filterType"`
+ Filter json.RawMessage `json:"filter"`
+ DeliveryInfo struct {
+ Topic string `json:"topic"`
+ BootStrapServers string `json:"bootStrapServers"`
+ } `json:"deliveryInfo"`
+ } `json:"job_definition"`
+}
+
+const jobdef = "/config/jobDefinition.json"
+
+var rapp_id = os.Getenv("APPID")
+
+var rapp_ns = os.Getenv("APPNS")
+
+var bootstrapserver = os.Getenv("KAFKA_SERVER")
+
+var topic = os.Getenv("TOPIC")
+
+var ics_server = os.Getenv("ICS")
+
+var jwt_file = os.Getenv("JWT_FILE")
+
+var ssl_path = os.Getenv("SSLPATH")
+
+var gzipped_data = os.Getenv("GZIP")
+
+var log_payload = os.Getenv("LOG_PAYLOAD")
+
+// This are optional - if rapp is fethcing the token instead of the side car
+var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
+var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
+var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
+var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
+
+var gid = ""
+var cid = "cid-0"
+
+var msg_count int = 0
+var msg_corrupted_count int = 0
+
+var jobid = "<not-set>"
+var consumer_type = "<not-set>"
+
+var currentToken = ""
+
+var appStatus = "INIT"
+
+var msg_per_sec int = 0
+
+var httpclient = &http.Client{}
+
+// == Main ==//
+func main() {
+
+ log.SetLevel(log.InfoLevel)
+ log.SetLevel(log.DebugLevel)
+
+ log.Info("Server starting...")
+
+ if creds_service_url != "" {
+ log.Warn("Disabling jwt retrieval from side car")
+ jwt_file = ""
+ }
+
+ if rapp_id == "" {
+ log.Error("Env APPID not set")
+ os.Exit(1)
+ }
+
+ if rapp_ns == "" {
+ log.Error("Env APPNS not set")
+ os.Exit(1)
+ }
+
+ if bootstrapserver == "" {
+ log.Error("Env KAFKA_SERVER not set")
+ os.Exit(1)
+ }
+
+ if topic == "" {
+ log.Error("Env TOPIC not set")
+ os.Exit(1)
+ }
+
+ if ics_server == "" {
+ log.Error("Env ICS not set")
+ os.Exit(1)
+ }
+
+ rtr := mux.NewRouter()
+ rtr.HandleFunc("/statistics", statistics)
+ rtr.HandleFunc("/status", status)
+ rtr.HandleFunc("/logging/{level}", logging_level)
+ rtr.HandleFunc("/logging", logging_level)
+ rtr.HandleFunc("/", alive)
+
+ //For perf/mem profiling
+ rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
+
+ http.Handle("/", rtr)
+
+ fileBytes, err := os.ReadFile(jobdef)
+ if err != nil {
+ log.Error("Cannot read job defintion file: ", jobdef, err)
+ os.Exit(1)
+ }
+ fmt.Println("FROM FILE")
+ fmt.Println(string(fileBytes))
+
+ job_json := JobDefinition{}
+ err = jsoniter.Unmarshal([]byte(fileBytes), &job_json)
+ if err != nil {
+ log.Error("Cannot parse job defintion file: ", jobdef, err)
+ os.Exit(1)
+ }
+ job_type := job_json.InfoTypeID
+ job_json.JobDefinition.KafkaOutputTopic = topic
+ job_json.JobDefinition.DeliveryInfo.Topic = topic
+ job_json.JobDefinition.DeliveryInfo.BootStrapServers = bootstrapserver
+
+ gid = "pm-rapp-" + job_type + "-" + rapp_id
+
+ jobid = "rapp-job-" + job_type + "-" + rapp_id
+
+ json_bytes, err := json.Marshal(job_json)
+ if err != nil {
+ log.Error("Cannot marshal job json", err)
+ os.Exit(1)
+ }
+
+ json_str := string(json_bytes)
+
+ if strings.HasPrefix(bootstrapserver, "http://") {
+ if creds_service_url != "" {
+ consumer_type = "accesstoken strimzi bridge consumer"
+ retrive_token_strimzi()
+ }
+ } else {
+ go read_kafka_messages()
+ }
+
+ ok := false
+ if ics_server != "" {
+ for !ok {
+ log.Debug("Registring job: ", jobid, " json: ", json_str)
+ ok, _ = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
+ if !ok {
+ log.Info("Failed to register job: ", jobid, " - retrying")
+ time.Sleep(time.Second)
+ }
+ }
+ } else {
+ log.Info("No job registered - read from topic only")
+ }
+ if strings.HasPrefix(bootstrapserver, "http://") {
+ go read_bridge_messages()
+ }
+
+ go calc_average()
+
+ http_port := "80"
+ http_server := &http.Server{Addr: ":" + http_port, Handler: nil}
+
+ sigs := make(chan os.Signal, 1)
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ go func() {
+ fmt.Println("Setting handler for signal sigint and sigterm")
+ sig := <-sigs
+ appStatus = "TERMINATING"
+ fmt.Printf("Received signal %s - application will terminate\n", sig)
+
+ if strings.HasPrefix(bootstrapserver, "http://") {
+ log.Debug("stopping strimzi consumer for job: ", jobid)
+ ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
+ if !ok {
+ log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - retrying")
+ }
+ }
+
+ ok := false
+ if ics_server != "" {
+ for !ok {
+ log.Debug("stopping job: ", jobid, " json: ", json_str)
+ ok, _ = send_http_request(nil, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
+ if !ok {
+ log.Info("Failed to stop job: ", jobid, " - retrying")
+ time.Sleep(time.Second)
+ }
+ }
+ }
+ http_server.Shutdown(context.Background())
+ }()
+ appStatus = "RUNNING"
+ log.Info("Starting http service...")
+ err = http_server.ListenAndServe()
+ if err == http.ErrServerClosed { // graceful shutdown
+ log.Info("http server shutdown...")
+ os.Exit(1)
+ } else if err != nil {
+ log.Error("http server error: ", err)
+ log.Info("http server shutdown...")
+ os.Exit(1)
+ }
+
+ //Wait until all go routines has exited
+ runtime.Goexit()
+
+ log.Warn("main routine exit")
+ log.Warn("server is stopping...")
+}
+
+// Simple alive check
+func alive(w http.ResponseWriter, req *http.Request) {
+ //Alive check
+}
+
+// Get/Set logging level
+func logging_level(w http.ResponseWriter, req *http.Request) {
+ vars := mux.Vars(req)
+ if level, ok := vars["level"]; ok {
+ if req.Method == http.MethodPut {
+ switch level {
+ case "trace":
+ log.SetLevel(log.TraceLevel)
+ case "debug":
+ log.SetLevel(log.DebugLevel)
+ case "info":
+ log.SetLevel(log.InfoLevel)
+ case "warn":
+ log.SetLevel(log.WarnLevel)
+ case "error":
+ log.SetLevel(log.ErrorLevel)
+ case "fatal":
+ log.SetLevel(log.FatalLevel)
+ case "panic":
+ log.SetLevel(log.PanicLevel)
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ } else {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ }
+ } else {
+ if req.Method == http.MethodGet {
+ msg := "none"
+ if log.IsLevelEnabled(log.PanicLevel) {
+ msg = "panic"
+ } else if log.IsLevelEnabled(log.FatalLevel) {
+ msg = "fatal"
+ } else if log.IsLevelEnabled(log.ErrorLevel) {
+ msg = "error"
+ } else if log.IsLevelEnabled(log.WarnLevel) {
+ msg = "warn"
+ } else if log.IsLevelEnabled(log.InfoLevel) {
+ msg = "info"
+ } else if log.IsLevelEnabled(log.DebugLevel) {
+ msg = "debug"
+ } else if log.IsLevelEnabled(log.TraceLevel) {
+ msg = "trace"
+ }
+ w.Header().Set("Content-Type", "application/text")
+ w.Write([]byte(msg))
+ } else {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ }
+ }
+}
+
+// Get app state
+func status(w http.ResponseWriter, req *http.Request) {
+ if req.Method != http.MethodGet {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ _, err := w.Write([]byte(appStatus))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Error("Cannot send statistics json")
+ return
+ }
+}
+
+// producer statictics, all jobs
+func statistics(w http.ResponseWriter, req *http.Request) {
+ if req.Method != http.MethodGet {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+ m := make(map[string]interface{})
+ log.Debug("rapp statictics")
+
+ req.Header.Set("Content-Type", "application/json; charset=utf-8")
+ m["number-of-messages"] = strconv.Itoa(msg_count)
+ m["number-of-corrupted-messages"] = strconv.Itoa(msg_corrupted_count)
+ m["job id"] = jobid
+ m["group id"] = gid
+ m["client id"] = cid
+ m["kafka consumer type"] = consumer_type
+ m["server"] = bootstrapserver
+ m["topic"] = topic
+ m["messages per sec"] = msg_per_sec
+
+ json, err := json.Marshal(m)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Error("Cannot marshal statistics json")
+ return
+ }
+ _, err = w.Write(json)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Error("Cannot send statistics json")
+ return
+ }
+}
+
+func calc_average() {
+
+ for true {
+ v := msg_count
+ time.Sleep(60 * time.Second)
+ msg_per_sec = (msg_count - v) / 60
+ }
+}
+
+func send_http_request(jsonData []byte, method string, url string, contentType string, accessToken string, alt_ok_response int, returnJson bool) (bool, map[string]interface{}) {
+
+ var req *http.Request
+ var err error
+ if jsonData != nil {
+ req, err = http.NewRequest(method, url, bytes.NewBuffer(jsonData))
+ if err != nil {
+ log.Error("Cannot create http request method: ", method, " url: ", url)
+ return false, nil
+ }
+ if contentType == "" {
+ req.Header.Set("Content-Type", "application/json; charset=utf-8")
+ } else {
+ req.Header.Set("Content-Type", contentType)
+ }
+ } else {
+ req, err = http.NewRequest(method, url, nil)
+ if err != nil {
+ log.Error("Cannot create http request method: ", method, " url: ", url)
+ return false, nil
+ }
+ }
+ if jwt_file != "" || creds_service_url != "" {
+ if accessToken != "" {
+ req.Header.Add("authorization", accessToken)
+ } else {
+ log.Error("Cannot create http request for url: ", url, " - token missing")
+ return false, nil
+ }
+ }
+ log.Debug("Http request: ", req)
+ resp, err2 := httpclient.Do(req)
+ if err2 != nil {
+ log.Error("Cannot send http request, method: ", method, "url: ", url)
+ } else {
+ if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
+
+ if returnJson {
+ defer resp.Body.Close()
+ body, err3 := ioutil.ReadAll(resp.Body)
+ if err3 != nil {
+ log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
+ return false, nil
+ } else {
+ var responseJson map[string]interface{}
+ err := json.Unmarshal(body, &responseJson)
+ if err != nil {
+ log.Error("Received msg not json? - cannot unmarshal")
+ return false, nil
+ }
+ fmt.Println(string(body))
+ log.Debug("Accepted response code: ", resp.StatusCode)
+ return true, responseJson
+ }
+ }
+
+ log.Debug("Accepted response code: ", resp.StatusCode)
+ return true, nil
+ } else {
+ if alt_ok_response != 0 && resp.StatusCode == alt_ok_response {
+
+ if returnJson {
+ defer resp.Body.Close()
+ body, err3 := ioutil.ReadAll(resp.Body)
+ if err3 != nil {
+ log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
+ return false, nil
+ } else {
+ var responseJson map[string]interface{}
+ err := json.Unmarshal(body, &responseJson)
+ if err != nil {
+ log.Error("Received msg not json? - cannot unmarshal")
+ return false, nil
+ }
+ fmt.Println(string(body))
+ log.Debug("Accepted alternative response code: ", resp.StatusCode)
+ return true, responseJson
+ }
+ }
+ } else {
+ log.Error("Bad response, method: ", method, " url: ", url, " resp: ", resp.StatusCode)
+ }
+ }
+ }
+ return false, nil
+
+}
+
+func retrive_token_strimzi() {
+ log.Debug("Get token inline - strimzi comm")
+
+ conf := &clientcredentials.Config{
+ ClientID: creds_client_id,
+ ClientSecret: creds_client_secret,
+ TokenURL: creds_service_url,
+ }
+ var modExpiry = time.Now()
+ ok := false
+ for !ok {
+ token, err := conf.Token(context.Background())
+ if err != nil {
+ log.Warning("Cannot fetch access token: ", err, " - retrying ")
+ time.Sleep(time.Second)
+ continue
+ }
+ log.Debug("token: ", token)
+ log.Debug("TokenValue: ", token.AccessToken)
+ log.Debug("Expiration: ", token.Expiry)
+ modExpiry = token.Expiry.Add(-time.Minute)
+ log.Debug("Modified expiration: ", modExpiry)
+ currentToken = token.AccessToken
+ ok = true
+ }
+ log.Debug("Initial token ok")
+ diff := modExpiry.Sub(time.Now())
+ go func() {
+ select {
+ case <-time.After(diff):
+ for !ok {
+ token, err := conf.Token(context.Background())
+ if err != nil {
+ log.Warning("Cannot fetch access token: ", err, " - retrying ")
+ time.Sleep(time.Second)
+ continue
+ }
+ log.Debug("token: ", token)
+ log.Debug("TokenValue: ", token.AccessToken)
+ log.Debug("Expiration: ", token.Expiry)
+ modExpiry = token.Expiry.Add(-time.Minute)
+ log.Debug("Modified expiration: ", modExpiry)
+ currentToken = token.AccessToken
+ ok = true
+ }
+ diff = modExpiry.Sub(time.Now())
+ }
+ }()
+}
+
+func retrive_token(c *kafka.Consumer) {
+ log.Debug("Get token inline")
+ conf := &clientcredentials.Config{
+ ClientID: creds_client_id,
+ ClientSecret: creds_client_secret,
+ TokenURL: creds_service_url,
+ }
+ token, err := conf.Token(context.Background())
+ if err != nil {
+ log.Warning("Cannot fetch access token: ", err)
+ c.SetOAuthBearerTokenFailure(err.Error())
+ return
+ }
+ extensions := map[string]string{}
+ log.Debug("token: ", token)
+ log.Debug("TokenValue: ", token.AccessToken)
+ log.Debug("Expiration: ", token.Expiry)
+ t := token.Expiry.Add(-time.Minute)
+ log.Debug("Modified expiration: ", t)
+ oauthBearerToken := kafka.OAuthBearerToken{
+ TokenValue: token.AccessToken,
+ Expiration: t,
+ Extensions: extensions,
+ }
+ log.Debug("Setting new token to consumer")
+ setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
+ currentToken = token.AccessToken
+ if setTokenError != nil {
+ log.Warning("Cannot cannot set token in client: ", setTokenError)
+ c.SetOAuthBearerTokenFailure(setTokenError.Error())
+ }
+}
+
+func gzipWrite(w io.Writer, data *[]byte) error {
+ gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
+
+ if err1 != nil {
+ return err1
+ }
+ defer gw.Close()
+ _, err2 := gw.Write(*data)
+ return err2
+}
+
+func read_bridge_messages() {
+
+ consumer_type = "unsecure strimzi bridge consumer"
+ if creds_service_url != "" {
+ consumer_type = "accesstoken strimzi bridge consumer"
+ }
+ ok := false
+ log.Debug("Cleaning consumer "+cid+" in group: ", gid)
+ ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
+ if !ok {
+ log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - it may not exist - ok")
+ }
+ var bridge_base_url = ""
+ ok = false
+ json_str := "{\"name\": \"" + cid + "\", \"auto.offset.reset\": \"latest\",\"format\": \"json\"}"
+ for !ok {
+ log.Debug("Creating consumer "+cid+" in group: ", gid)
+ var respJson map[string]interface{}
+ ok, respJson = send_http_request([]byte(json_str), http.MethodPost, bootstrapserver+"/consumers/"+gid, "application/vnd.kafka.v2+json", currentToken, 409, true) //409 if consumer already exists
+ if ok {
+ bridge_base_url = fmt.Sprintf("%s", respJson["base_uri"])
+ } else {
+ log.Info("Failed create consumer "+cid+" in group: ", gid, " - retrying")
+ time.Sleep(time.Second)
+ }
+ }
+
+ ok = false
+ json_str = "{\"topics\": [\"" + topic + "\"]}"
+
+ for !ok {
+ log.Debug("Subscribing to topic: ", topic)
+ ok, _ = send_http_request([]byte(json_str), http.MethodPost, bridge_base_url+"/subscription", "application/vnd.kafka.v2+json", currentToken, 0, false)
+ if !ok {
+ log.Info("Failed subscribe to topic: ", topic, " - retrying")
+ time.Sleep(time.Second)
+ }
+ }
+
+ for true {
+ log.Debug("Reading messages on topic: ", topic)
+
+ var req *http.Request
+ var err error
+ url := bridge_base_url + "/records"
+
+ req, err = http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ log.Error("Cannot create http request method: GET, url: ", url)
+ time.Sleep(1 * time.Second)
+ continue
+ }
+ req.Header.Set("accept", "application/vnd.kafka.json.v2+json")
+
+ if creds_service_url != "" {
+ if currentToken != "" {
+ req.Header.Add("authorization", currentToken)
+ } else {
+ log.Error("Cannot create http request for url: ", url, " - token missing")
+ time.Sleep(1 * time.Second)
+ continue
+ }
+ }
+
+ values := req.URL.Query()
+ values.Add("timeout", "10000")
+ req.URL.RawQuery = values.Encode()
+
+ log.Debug(req)
+
+ resp, err2 := httpclient.Do(req)
+ if err2 != nil {
+ log.Error("Cannot send http request, method: GET, url: ", url)
+ time.Sleep(1 * time.Second)
+ continue
+ } else {
+ body, err := ioutil.ReadAll(resp.Body)
+ resp.Body.Close()
+ if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
+ log.Debug("Accepted response code: ", resp.StatusCode)
+
+ if err != nil {
+ log.Error("Cannot read body, method: GET, url: ", url, " resp: ", resp.StatusCode)
+ } else {
+ var responseJson []interface{}
+ err := json.Unmarshal(body, &responseJson)
+ if err != nil {
+ log.Error("Received msg not json? - cannot unmarshal")
+ msg_corrupted_count++
+ } else {
+ if len(responseJson) == 0 {
+ log.Debug("No message")
+ continue
+ }
+ for _, item := range responseJson {
+ j, err := json.MarshalIndent(item, "", " ")
+ if err != nil {
+ log.Error("Message in array not json? - cannot unmarshal")
+ msg_corrupted_count++
+ } else {
+ msg_count++
+ if log_payload != "" {
+ fmt.Println("Message: " + string(j))
+ }
+ }
+ }
+ }
+ }
+
+ log.Debug("Commiting message")
+ ok, _ = send_http_request(nil, http.MethodPost, bridge_base_url+"/offsets", "", currentToken, 0, false)
+ if !ok {
+ log.Info("Failed to commit message")
+ }
+
+ } else {
+ log.Error("Bad response, method: GET, url: ", url, " resp: ", resp.StatusCode)
+ log.Error("Bad response, data: ", string(body))
+ }
+ }
+ }
+
+}
+
+func read_kafka_messages() {
+ var c *kafka.Consumer = nil
+ log.Info("Creating kafka consumer...")
+ var err error
+ for c == nil {
+ if jwt_file == "" && creds_service_url == "" {
+ if ssl_path == "" {
+ log.Info("unsecure consumer")
+ consumer_type = "kafka unsecure consumer"
+ c, err = kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "group.id": gid,
+ "client.id": cid,
+ "auto.offset.reset": "latest",
+ })
+ } else {
+ log.Info("ssl consumer")
+ consumer_type = "kafka ssl consumer"
+ c, err = kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "group.id": gid,
+ "client.id": cid,
+ "auto.offset.reset": "latest",
+ "security.protocol": "SSL",
+ "ssl.key.location": ssl_path + "/clt.key",
+ "ssl.certificate.location": ssl_path + "/clt.crt",
+ "ssl.ca.location": ssl_path + "/ca.crt",
+ })
+ }
+ } else {
+ if ssl_path != "" {
+ panic("SSL cannot be configued with JWT_FILE or RAPP_AUTH_SERVICE_URL")
+ }
+ log.Info("sasl consumer")
+ consumer_type = "kafka sasl unsecure consumer"
+ c, err = kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "group.id": gid,
+ "client.id": cid,
+ "auto.offset.reset": "latest",
+ "sasl.mechanism": "OAUTHBEARER",
+ "security.protocol": "SASL_PLAINTEXT",
+ })
+ }
+ if err != nil {
+ log.Warning("Cannot create kafka consumer - retrying, error: ", err)
+ time.Sleep(1 * time.Second)
+ }
+ }
+
+ log.Info("Creating kafka consumer - ok")
+ log.Info("Start subscribing to topic: ", topic)
+ topic_ok := false
+ for !topic_ok {
+ err = c.SubscribeTopics([]string{topic}, nil)
+ if err != nil {
+ log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying -- error details: ", err)
+ } else {
+ log.Info("Topic reader subscribing on topic: ", topic)
+ topic_ok = true
+ }
+ }
+
+ fileModTime := time.Now()
+ for {
+ if jwt_file != "" {
+ fileInfo, err := os.Stat(jwt_file)
+ if err == nil {
+ if fileModTime != fileInfo.ModTime() {
+ log.Debug("JWT file is updated")
+ fileModTime = fileInfo.ModTime()
+ fileBytes, err := ioutil.ReadFile(jwt_file)
+ if err != nil {
+ log.Error("JWT file read error: ", err)
+ } else {
+ fileString := string(fileBytes)
+ log.Info("JWT: ", fileString)
+ t := time.Now()
+ t15 := time.Second * 300
+ t = t.Add(t15)
+ oauthBearerToken := kafka.OAuthBearerToken{
+ TokenValue: fileString,
+ Expiration: t,
+ }
+ log.Debug("Setting new token to consumer")
+ setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
+ if setTokenError != nil {
+ log.Warning("Cannot cannot set token in client: ", setTokenError)
+ }
+ }
+ } else {
+ log.Debug("JWT file not updated - OK")
+ }
+ } else {
+ log.Error("JWT does not exist: ", err)
+ }
+ }
+ ev := c.Poll(1000)
+ if ev == nil {
+ log.Debug(" Nothing to consume on topic: ", topic)
+ continue
+ }
+ switch e := ev.(type) {
+ case *kafka.Message:
+ var pdata *[]byte = &e.Value
+ if gzipped_data != "" {
+ var buf bytes.Buffer
+ err = gzipWrite(&buf, pdata)
+ if err != nil {
+ log.Warning("Cannot unzip data")
+ pdata = nil
+ } else {
+ *pdata = buf.Bytes()
+ fmt.Println("Unzipped data")
+ }
+ }
+ if pdata != nil {
+ buf := &bytes.Buffer{}
+
+ if err := json.Indent(buf, *pdata, "", " "); err != nil {
+ log.Warning("Received msg not json?")
+ } else {
+ fmt.Println(buf.String())
+ msg_count++
+ fmt.Println("Number of received json msgs: " + strconv.Itoa(msg_count))
+ }
+ }
+ c.Commit()
+ case kafka.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+
+ case kafka.OAuthBearerTokenRefresh:
+ if jwt_file == "" {
+ oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
+ fmt.Println(oart)
+ if !ok {
+ continue
+ }
+ retrive_token(c)
+ }
+ default:
+ fmt.Printf("Ignored %v\n", e)
+ }
+
+ }
+}