Merge "Refactor for more efficient job handling"
diff --git a/dmaap-adaptor-java/README.md b/dmaap-adaptor-java/README.md
index d9826ad..0378bc7 100644
--- a/dmaap-adaptor-java/README.md
+++ b/dmaap-adaptor-java/README.md
@@ -1,5 +1,4 @@
 # O-RAN-SC Non-RealTime RIC DMaaP Information Producer
-
 This product is a generic information producer (as defined by the Information Coordinator Service (ICS)). It can produce any information that can be retrieved from DMaaP. Its main tasks is to register information types and itself as a producer using the ICS Data Producer API.
 
 A data consumer may create information jobs through the ICS Data Producer API.
@@ -8,33 +7,33 @@
 
 The component is a springboot service and is configured as any springboot service through the file `config/application.yaml`. The component log can be retrieved and logging can be controled by means of REST call. See the API documentation (api/api.yaml).
 
-The file `config/application_configuration.json` contains the configuration of job types that the producer will support.
+The file `config/application_configuration.json` contains the configuration of job types that the producer will support. Here follows an example with one type:
 
+```sh
     {
        "types":
         [
           {
-            "id": The ID of the job type, e.g. "STD_Fault_Messages",
-            "dmaapTopicUrl": The topic URL to poll from DMaaP Message Router, e.g. "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+             "id":  "STD_Fault_Messages",
+             "dmaapTopicUrl":  events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages",
+             "useHttpProxy": false
           }
-      ]
+        ]
     }
+```
+
+Each information has the following properties:
+ - id the information type identity as exposed in the Information Coordination Service data consumer API
+ - dmaapTopicUrl the URL to for fetching information from  DMaaP
+ - useHttpProxy if true, the received information will be delivered using a HTTP proxy (provided that one is setup in the application.yaml file). This might for instance be needed if the data consumer is in the RAN or outside the cluster.
 
 The service producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
 
 ## License
 
-Copyright (C) 2021 Nordix Foundation.
-Licensed under the Apache License, Version 2.0 (the "License")
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
+Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License, Version 2.0 (the "License") you may not use this file except in compliance with the License. You may obtain a copy of the License at
 
-      http://www.apache.org/licenses/LICENSE-2.0
+http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-For more information about license please see the [LICENSE](LICENSE.txt) file for details.
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
+For more information about license please see the [LICENSE](LICENSE.txt) file for details.
\ No newline at end of file
diff --git a/dmaap-adaptor-java/config/application_configuration.json b/dmaap-adaptor-java/config/application_configuration.json
index a8967d8..ae34c56 100644
--- a/dmaap-adaptor-java/config/application_configuration.json
+++ b/dmaap-adaptor-java/config/application_configuration.json
@@ -2,7 +2,8 @@
    "types": [
       {
          "id": "ExampleInformationType",
-         "dmaapTopicUrl": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+         "dmaapTopicUrl": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
+         "useHttpProxy": true
       }
    ]
-}
+}
\ No newline at end of file
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java
index d19577d..9dda1e6 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java
@@ -30,9 +30,13 @@
     @Getter
     private final String dmaapTopicUrl;
 
-    public InfoType(String id, String dmaapTopicUrl) {
+    @Getter
+    private final boolean useHttpProxy;
+
+    public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy) {
         this.id = id;
         this.dmaapTopicUrl = dmaapTopicUrl;
+        this.useHttpProxy = useHttpProxy;
     }
 
 }
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java
index f82d7f6..b7c4ec6 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java
@@ -43,7 +43,8 @@
     private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
     private final ApplicationConfig applicationConfig;
-    private final AsyncRestClient restClient;
+    private final AsyncRestClient dmaapRestClient;
+    private final AsyncRestClient consumerRestClient;
     private final InfoType type;
     private final Jobs jobs;
     private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
@@ -82,7 +83,9 @@
     public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
         this.applicationConfig = applicationConfig;
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
-        this.restClient = restclientFactory.createRestClientNoHttpProxy("");
+        this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
+        this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
+                : restclientFactory.createRestClientNoHttpProxy("");
         this.type = type;
         this.jobs = jobs;
     }
@@ -116,7 +119,7 @@
 
     protected Mono<String> getFromMessageRouter(String topicUrl) {
         logger.trace("getFromMessageRouter {}", topicUrl);
-        return restClient.get(topicUrl) //
+        return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
                 .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
                 .onErrorResume(this::handleDmaapErrorResponse); //
@@ -129,7 +132,7 @@
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
                 .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
-                .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
+                .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }
 
diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json
index 0db20cb..8d211b8 100644
--- a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json
+++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json
@@ -2,7 +2,8 @@
    "types": [
       {
          "id": "ExampleInformationType",
-         "dmaapTopicUrl": "/dmaap-topic-1"
+         "dmaapTopicUrl": "/dmaap-topic-1",
+         "useHttpProxy": true
       }
    ]
 }
\ No newline at end of file