NONRTRIC - Implement DMaaP mediator producer service in Java

Improved documentation.

Using random port in unittest

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Ic2de6fd0efec7b04f513db9e3ff3b797b47308fc
diff --git a/dmaap-adaptor-java/README.md b/dmaap-adaptor-java/README.md
index 9b35fe5..162bfb2 100644
--- a/dmaap-adaptor-java/README.md
+++ b/dmaap-adaptor-java/README.md
@@ -1,9 +1,9 @@
 # O-RAN-SC Non-RealTime RIC DMaaP Information Producer
-This product is a generic information producer (as defined by the Information Coordinator Service (ICS)). It can produce any information that can be retrieved from DMaaP. Its main tasks is to register information types and itself as a producer using the ICS Data Producer API.
+This product is a generic information producer (as defined by the Information Coordinator Service (ICS)). It can produce any information that can be retrieved from DMaaP or Kafka. Its main tasks is to register information types and itself as a producer using the ICS Data Producer API.
 
 A data consumer may create information jobs through the ICS Data Producer API.
 
-This service will retrieve data from the DMaaP Message Router (MR) and distribute it further to the data consumers (information job owners).
+This service will retrieve data from the DMaaP Message Router (MR) or from the Kafka streaming platform and will distribute it further to the data consumers (information job owners).
 
 The component is a springboot service and is configured as any springboot service through the file `config/application.yaml`. The component log can be retrieved and logging can be controled by means of REST call. See the API documentation (api/api.yaml).
 
@@ -14,20 +14,91 @@
        "types":
         [
           {
-             "id":  "STD_Fault_Messages",
-             "dmaapTopicUrl":  events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0",
+             "id":  "ExampleInformationType1_1.0.0",
+             "dmaapTopicUrl":  "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0",
+             "useHttpProxy": true
+          },
+          {
+             "id": "ExampleInformationType2_2.0.0",
+             "kafkaInputTopic": "KafkaInputTopic",
              "useHttpProxy": false
           }
         ]
     }
 ```
 
-Each information has the following properties:
+Each information type has the following properties:
  - id the information type identity as exposed in the Information Coordination Service data consumer API
  - dmaapTopicUrl the URL to for fetching information from  DMaaP
+ - kafkaInputTopic a Kafka topic to get input from
  - useHttpProxy if true, the received information will be delivered using a HTTP proxy (provided that one is setup in the application.yaml file). This might for instance be needed if the data consumer is in the RAN or outside the cluster.
 
-The service producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+The service producer will poll MR and/or listen to Kafka topics for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+
+When an Information Job is created in the Information Coordinator Service Consumer API, it is possible to define a number of job specific properties. For an Information type that has a Kafka topic defined, the following Json schema defines the properties that can be used:
+
+
+```sh
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+      "type": "string"
+    },
+    "maxConcurrency": {
+      "type": "integer"
+    },
+    "bufferTimeout": {
+      "type": "object",
+      "properties": {
+        "maxSize": {
+          "type": "integer"
+        },
+        "maxTimeMiliseconds": {
+          "type": "integer"
+        }
+      },
+      "additionalProperties": false,
+      "required": [
+        "maxSize",
+        "maxTimeMiliseconds"
+      ]
+    }
+  },
+  "additionalProperties": false
+}
+```
+-filter is a regular expression. Only strings that matches the expression will be pushed further to the consumer.
+-maxConcurrency the maximum number of concurrent REST session for the data delivery to the consumer. 
+ The default is 1 and that is the number that must be used to guarantee that the object sequence is maintained. 
+ A higher number will give higher throughtput. 
+-bufferTimeout, can be used to reduce the number of REST calls to the consumer. If defined, a number of objects will be 
+ buffered and sent in one REST call to the consumer.
+ The buffered objects will be put in a Json array and quoted. Example; 
+   Object1 and Object2 may be posted in one call -->  ["Object1", "Object2"]
+ The bufferTimeout is a Json object and the parameters in the object are:
+   - maxSize the maximum number of buffered objects before posting
+   - maxTimeMiliseconds the maximum delay time to buffer before posting
+ If no bufferTimeout is specified, each object will be posted as received in separate calls (not quoted and put in a Json array).
+
+
+For an information type that only has a DMaaP topic, the following Json schema defines the possible parameters to use when creating an information job:
+
+```sh
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+       "type": "string"
+     }
+  },
+  "additionalProperties": false
+}
+```
+-filter is a regular expression. Only strings that matches the expression will be pushed furter to the consumer. This
+ has a similar meaning as in jobs that receives data from Kafka.
 
 ## License
 
diff --git a/dmaap-adaptor-java/api/api.json b/dmaap-adaptor-java/api/api.json
index 6cd3525..04c4ab0 100644
--- a/dmaap-adaptor-java/api/api.json
+++ b/dmaap-adaptor-java/api/api.json
@@ -107,7 +107,53 @@
     }},
     "openapi": "3.0.1",
     "paths": {
-        "/dmaap_dataproducer/info_job": {
+        "/actuator/threaddump": {"get": {
+            "summary": "Actuator web endpoint 'threaddump'",
+            "operationId": "handle_2_1_3",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
+        "/actuator/info": {"get": {
+            "summary": "Actuator web endpoint 'info'",
+            "operationId": "handle_9",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"*/*": {"schema": {"type": "object"}}}
+            }},
+            "tags": ["Actuator"]
+        }},
+        "/data-producer/v1/info-types/{infoTypeId}": {"put": {
+            "requestBody": {
+                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/producer_info_type_info"}}},
+                "required": true
+            },
+            "operationId": "putInfoType",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"application/json": {"schema": {"type": "object"}}}
+            }},
+            "parameters": [{
+                "schema": {"type": "string"},
+                "in": "path",
+                "name": "infoTypeId",
+                "required": true
+            }],
+            "tags": ["Information Coordinator Service Simulator (exists only in test)"]
+        }},
+        "/generic_dataproducer/health_check": {"get": {
+            "summary": "Producer supervision",
+            "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
+            "operationId": "producerSupervision",
+            "responses": {"200": {
+                "description": "The producer is OK",
+                "content": {"application/json": {"schema": {"type": "string"}}}
+            }},
+            "tags": ["Producer job control API"]
+        }},
+        "/generic_dataproducer/info_job": {
             "post": {
                 "summary": "Callback for Information Job creation/modification",
                 "requestBody": {
@@ -146,52 +192,6 @@
                 "tags": ["Producer job control API"]
             }
         },
-        "/dmaap_dataproducer/health_check": {"get": {
-            "summary": "Producer supervision",
-            "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
-            "operationId": "producerSupervision",
-            "responses": {"200": {
-                "description": "The producer is OK",
-                "content": {"application/json": {"schema": {"type": "string"}}}
-            }},
-            "tags": ["Producer job control API"]
-        }},
-        "/actuator/threaddump": {"get": {
-            "summary": "Actuator web endpoint 'threaddump'",
-            "operationId": "handle_2_1_3",
-            "responses": {"200": {
-                "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
-            }},
-            "tags": ["Actuator"]
-        }},
-        "/actuator/info": {"get": {
-            "summary": "Actuator web endpoint 'info'",
-            "operationId": "handle_9",
-            "responses": {"200": {
-                "description": "OK",
-                "content": {"*/*": {"schema": {"type": "object"}}}
-            }},
-            "tags": ["Actuator"]
-        }},
-        "/data-producer/v1/info-types/{infoTypeId}": {"put": {
-            "requestBody": {
-                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/producer_info_type_info"}}},
-                "required": true
-            },
-            "operationId": "putInfoType",
-            "responses": {"200": {
-                "description": "OK",
-                "content": {"application/json": {"schema": {"type": "object"}}}
-            }},
-            "parameters": [{
-                "schema": {"type": "string"},
-                "in": "path",
-                "name": "infoTypeId",
-                "required": true
-            }],
-            "tags": ["Information Coordinator Service Simulator (exists only in test)"]
-        }},
         "/actuator/loggers": {"get": {
             "summary": "Actuator web endpoint 'loggers'",
             "operationId": "handle_6",
@@ -244,6 +244,22 @@
                 "tags": ["Information Coordinator Service Simulator (exists only in test)"]
             }
         },
+        "/generic_dataproducer/info_job/{infoJobId}": {"delete": {
+            "summary": "Callback for Information Job deletion",
+            "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
+            "operationId": "jobDeletedCallback",
+            "responses": {"200": {
+                "description": "OK",
+                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+            }},
+            "parameters": [{
+                "schema": {"type": "string"},
+                "in": "path",
+                "name": "infoJobId",
+                "required": true
+            }],
+            "tags": ["Producer job control API"]
+        }},
         "/actuator/metrics/{requiredMetricName}": {"get": {
             "summary": "Actuator web endpoint 'metrics-requiredMetricName'",
             "operationId": "handle_5",
@@ -315,22 +331,6 @@
                 "tags": ["Actuator"]
             }
         },
-        "/dmaap_dataproducer/info_job/{infoJobId}": {"delete": {
-            "summary": "Callback for Information Job deletion",
-            "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
-            "operationId": "jobDeletedCallback",
-            "responses": {"200": {
-                "description": "OK",
-                "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
-            }},
-            "parameters": [{
-                "schema": {"type": "string"},
-                "in": "path",
-                "name": "infoJobId",
-                "required": true
-            }],
-            "tags": ["Producer job control API"]
-        }},
         "/actuator/health": {"get": {
             "summary": "Actuator web endpoint 'health'",
             "operationId": "handle_11",
@@ -388,8 +388,8 @@
             "name": "Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.",
             "url": "http://www.apache.org/licenses/LICENSE-2.0"
         },
-        "description": "Reads data from DMAAP and sends it further to information consumers",
-        "title": "Generic Dmaap Information Producer",
+        "description": "Reads data from DMaaP and Kafka and posts it further to information consumers",
+        "title": "Generic Dmaap and Kafka Information Producer",
         "version": "1.0"
     },
     "tags": [{
diff --git a/dmaap-adaptor-java/api/api.yaml b/dmaap-adaptor-java/api/api.yaml
index b3acfda..1fb78fa 100644
--- a/dmaap-adaptor-java/api/api.yaml
+++ b/dmaap-adaptor-java/api/api.yaml
@@ -1,7 +1,8 @@
 openapi: 3.0.1
 info:
-  title: Generic Dmaap Information Producer
-  description: Reads data from DMAAP and sends it further to information consumers
+  title: Generic Dmaap and Kafka Information Producer
+  description: Reads data from DMaaP and Kafka and posts it further to information
+    consumers
   license:
     name: Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.
     url: http://www.apache.org/licenses/LICENSE-2.0
@@ -15,69 +16,6 @@
     description: Spring Boot Actuator Web API Documentation
     url: https://docs.spring.io/spring-boot/docs/current/actuator-api/html/
 paths:
-  /dmaap_dataproducer/info_job:
-    get:
-      tags:
-      - Producer job control API
-      summary: Get all jobs
-      description: Returns all info jobs, can be used for trouble shooting
-      operationId: getJobs
-      responses:
-        200:
-          description: Information jobs
-          content:
-            application/json:
-              schema:
-                type: array
-                items:
-                  $ref: '#/components/schemas/producer_info_job_request'
-    post:
-      tags:
-      - Producer job control API
-      summary: Callback for Information Job creation/modification
-      description: The call is invoked to activate or to modify a data subscription.
-        The endpoint is provided by the Information Producer.
-      operationId: jobCreatedCallback
-      requestBody:
-        content:
-          application/json:
-            schema:
-              type: string
-        required: true
-      responses:
-        200:
-          description: OK
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/void'
-        400:
-          description: Other error in the request
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/error_information'
-        404:
-          description: Information type is not found
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/error_information'
-  /dmaap_dataproducer/health_check:
-    get:
-      tags:
-      - Producer job control API
-      summary: Producer supervision
-      description: The endpoint is provided by the Information Producer and is used
-        for supervision of the producer.
-      operationId: producerSupervision
-      responses:
-        200:
-          description: The producer is OK
-          content:
-            application/json:
-              schema:
-                type: string
   /actuator/threaddump:
     get:
       tags:
@@ -130,6 +68,69 @@
             application/json:
               schema:
                 type: object
+  /generic_dataproducer/health_check:
+    get:
+      tags:
+      - Producer job control API
+      summary: Producer supervision
+      description: The endpoint is provided by the Information Producer and is used
+        for supervision of the producer.
+      operationId: producerSupervision
+      responses:
+        200:
+          description: The producer is OK
+          content:
+            application/json:
+              schema:
+                type: string
+  /generic_dataproducer/info_job:
+    get:
+      tags:
+      - Producer job control API
+      summary: Get all jobs
+      description: Returns all info jobs, can be used for trouble shooting
+      operationId: getJobs
+      responses:
+        200:
+          description: Information jobs
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/producer_info_job_request'
+    post:
+      tags:
+      - Producer job control API
+      summary: Callback for Information Job creation/modification
+      description: The call is invoked to activate or to modify a data subscription.
+        The endpoint is provided by the Information Producer.
+      operationId: jobCreatedCallback
+      requestBody:
+        content:
+          application/json:
+            schema:
+              type: string
+        required: true
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/void'
+        400:
+          description: Other error in the request
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/error_information'
+        404:
+          description: Information type is not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/error_information'
   /actuator/loggers:
     get:
       tags:
@@ -201,6 +202,29 @@
             application/json:
               schema:
                 type: object
+  /generic_dataproducer/info_job/{infoJobId}:
+    delete:
+      tags:
+      - Producer job control API
+      summary: Callback for Information Job deletion
+      description: The call is invoked to terminate a data subscription. The endpoint
+        is provided by the Information Producer.
+      operationId: jobDeletedCallback
+      parameters:
+      - name: infoJobId
+        in: path
+        required: true
+        style: simple
+        explode: false
+        schema:
+          type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/void'
   /actuator/metrics/{requiredMetricName}:
     get:
       tags:
@@ -293,29 +317,6 @@
             '*/*':
               schema:
                 type: object
-  /dmaap_dataproducer/info_job/{infoJobId}:
-    delete:
-      tags:
-      - Producer job control API
-      summary: Callback for Information Job deletion
-      description: The call is invoked to terminate a data subscription. The endpoint
-        is provided by the Information Producer.
-      operationId: jobDeletedCallback
-      parameters:
-      - name: infoJobId
-        in: path
-        required: true
-        style: simple
-        explode: false
-        schema:
-          type: string
-      responses:
-        200:
-          description: OK
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/void'
   /actuator/health:
     get:
       tags:
diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml
index 6a2d68a..bd260ea 100644
--- a/dmaap-adaptor-java/config/application.yaml
+++ b/dmaap-adaptor-java/config/application.yaml
@@ -53,7 +53,8 @@
   dmaap-base-url: http://dradmin:dradmin@localhost:2222
   # The url used to adress this component. This is used as a callback url sent to other components.
   dmaap-adapter-base-url: https://localhost:8435
-  # KAFKA boostrap server. This is only needed if there are Information Types that uses a kafkaInputTopic
+  # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
+  # several redundant boostrap servers can be specified, separated by a comma ','.
   kafka:
     bootstrap-servers: localhost:9092
 
diff --git a/dmaap-adaptor-java/config/application_configuration.json b/dmaap-adaptor-java/config/application_configuration.json
index ae34c56..6aaffd1 100644
--- a/dmaap-adaptor-java/config/application_configuration.json
+++ b/dmaap-adaptor-java/config/application_configuration.json
@@ -1,9 +1,15 @@
 {
    "types": [
       {
-         "id": "ExampleInformationType",
+         "id": "ExampleInformationType1",
          "dmaapTopicUrl": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
          "useHttpProxy": true
+      },
+      {
+         "id": "ExampleInformationType2",
+         "kafkaInputTopic": "TutorialTopic",
+         "useHttpProxy": false
       }
+      
    ]
-}
\ No newline at end of file
+}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java
index 8f33377..6128d2e 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java
@@ -38,6 +38,6 @@
 public class SwaggerConfig {
     private SwaggerConfig() {}
 
-    static final String API_TITLE = "Generic Dmaap Information Producer";
-    static final String DESCRIPTION = "Reads data from DMAAP and sends it further to information consumers";
+    static final String API_TITLE = "Generic Dmaap and Kafka Information Producer";
+    static final String DESCRIPTION = "Reads data from DMaaP and Kafka and posts it further to information consumers";
 }
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
index f17a9c0..b40c606 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
@@ -28,6 +28,7 @@
 import java.util.Collections;
 
 import lombok.Getter;
+import lombok.Setter;
 
 import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -73,6 +74,7 @@
     private int httpProxyPort = 0;
 
     @Getter
+    @Setter
     @Value("${server.port}")
     private int localServerHttpPort;
 
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
index 094ead7..94f9f8d 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
@@ -59,8 +59,8 @@
 
     public static final String API_NAME = "Producer job control API";
     public static final String API_DESCRIPTION = "";
-    public static final String JOB_URL = "/dmaap_dataproducer/info_job";
-    public static final String SUPERVISION_URL = "/dmaap_dataproducer/health_check";
+    public static final String JOB_URL = "/generic_dataproducer/info_job";
+    public static final String SUPERVISION_URL = "/generic_dataproducer/health_check";
     private static Gson gson = new GsonBuilder().create();
     private final Jobs jobs;
     private final InfoTypes types;
@@ -91,7 +91,7 @@
             return new ResponseEntity<>(HttpStatus.OK);
         } catch (ServiceException e) {
             logger.warn("jobCreatedCallback failed: {}", e.getMessage());
-            return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
+            return ErrorResponse.create(e, e.getHttpStatus());
         } catch (Exception e) {
             logger.warn("jobCreatedCallback failed: {}", e.getMessage());
             return ErrorResponse.create(e, HttpStatus.BAD_REQUEST);
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java
index ce4a3b7..c1737db 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/r1/ConsumerJobInfo.java
@@ -28,7 +28,7 @@
 import org.immutables.gson.Gson;
 
 @Gson.TypeAdapters
-@Schema(name = "consumer_job", description = "Information for an Enrichment Information Job")
+@Schema(name = "consumer_job", description = "Information for an Information Job")
 public class ConsumerJobInfo {
 
     @Schema(name = "info_type_id", description = "Information type Idenitifier of the subscription job",
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java
index 558fc46..baa998b 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java
@@ -28,6 +28,7 @@
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
 
 public class InfoTypes {
     private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class);
@@ -47,7 +48,7 @@
     public synchronized InfoType getType(String id) throws ServiceException {
         InfoType type = allTypes.get(id);
         if (type == null) {
-            throw new ServiceException("Could not find type: " + id);
+            throw new ServiceException("Could not find type: " + id, HttpStatus.NOT_FOUND);
         }
         return type;
     }
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
index 3a81f39..306cc6b 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
@@ -144,25 +144,8 @@
     }
 
     private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
-
-        if (type.isKafkaTopicDefined()) {
-            String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json");
-            return jsonObject(schemaStrKafka);
-        } else {
-            // An object with no properties
-            String schemaStr = "{" //
-                    + "\"type\": \"object\"," //
-                    + "\"properties\": {" //
-                    + "   \"filter\": { \"type\": \"string\" }" //
-                    + "}," //
-                    + "\"additionalProperties\": false" //
-                    + "}"; //
-
-            return
-
-            jsonObject(schemaStr);
-        }
-
+        String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+        return jsonObject(readSchemaFile(schemaFile));
     }
 
     private String readSchemaFile(String filePath) throws IOException, ServiceException {
@@ -174,12 +157,13 @@
         return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
     }
 
+    @SuppressWarnings("java:S2139") // Log exception
     private Object jsonObject(String json) {
         try {
             return JsonParser.parseString(json).getAsJsonObject();
         } catch (Exception e) {
-            logger.error("Bug, error in JSON: {}", json);
-            throw new NullPointerException(e.toString());
+            logger.error("Bug, error in JSON: {} {}", json, e.getMessage());
+            throw new NullPointerException(e.getMessage());
         }
     }
 
@@ -190,7 +174,6 @@
     }
 
     private ProducerRegistrationInfo producerRegistrationInfo() {
-
         return ProducerRegistrationInfo.builder() //
                 .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
                 .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //
diff --git a/dmaap-adaptor-java/src/main/resources/typeSchemaDmaap.json b/dmaap-adaptor-java/src/main/resources/typeSchemaDmaap.json
new file mode 100644
index 0000000..a50b236
--- /dev/null
+++ b/dmaap-adaptor-java/src/main/resources/typeSchemaDmaap.json
@@ -0,0 +1,10 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+       "type": "string"
+     }
+  },
+  "additionalProperties": false
+}
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
index c4c9602..e78313c 100644
--- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
+++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
@@ -34,6 +34,7 @@
 
 import org.json.JSONObject;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
@@ -68,7 +69,7 @@
 import reactor.test.StepVerifier;
 
 @ExtendWith(SpringExtension.class)
-@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
 @TestPropertySource(properties = { //
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
@@ -138,6 +139,11 @@
         }
     }
 
+    @BeforeEach
+    void setPort() {
+        this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort);
+    }
+
     @AfterEach
     void reset() {
         this.consumerController.testResults.reset();