NONRTRIC - Implement DMaaP mediator producer service in Java
Added API documentation.
Added some unittest.
Improved some logging.
Simplified the registration task somewhat.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: I2afa2fbf1073459cf560212f2d2e601446352a25
diff --git a/dmaap-adaptor-java/api/api.json b/dmaap-adaptor-java/api/api.json
new file mode 100644
index 0000000..39056e9
--- /dev/null
+++ b/dmaap-adaptor-java/api/api.json
@@ -0,0 +1,399 @@
+{
+ "components": {"schemas": {
+ "producer_info_job_request": {
+ "description": "The body of the Information Producer callbacks for Information Job creation and deletion",
+ "type": "object",
+ "required": ["info_job_identity"],
+ "properties": {
+ "owner": {
+ "description": "The owner of the job",
+ "type": "string"
+ },
+ "last_updated": {
+ "description": "The time when the job was last updated or created (ISO-8601)",
+ "type": "string"
+ },
+ "info_job_identity": {
+ "description": "Identity of the Information Job",
+ "type": "string"
+ },
+ "target_uri": {
+ "description": "URI for the target of the produced Information",
+ "type": "string"
+ },
+ "info_job_data": {
+ "description": "Json for the job data",
+ "type": "object"
+ },
+ "info_type_identity": {
+ "description": "Type identity for the job",
+ "type": "string"
+ }
+ }
+ },
+ "error_information": {
+ "description": "Problem as defined in https://tools.ietf.org/html/rfc7807",
+ "type": "object",
+ "properties": {
+ "detail": {
+ "description": " A human-readable explanation specific to this occurrence of the problem.",
+ "type": "string",
+ "example": "Policy type not found"
+ },
+ "status": {
+ "format": "int32",
+ "description": "The HTTP status code generated by the origin server for this occurrence of the problem. ",
+ "type": "integer",
+ "example": 503
+ }
+ }
+ },
+ "void": {
+ "description": "Void/empty",
+ "type": "object"
+ },
+ "producer_registration_info": {
+ "description": "Information for an Information Producer",
+ "type": "object",
+ "required": [
+ "info_job_callback_url",
+ "info_producer_supervision_callback_url",
+ "supported_info_types"
+ ],
+ "properties": {
+ "info_producer_supervision_callback_url": {
+ "description": "callback for producer supervision",
+ "type": "string"
+ },
+ "supported_info_types": {
+ "description": "Supported Information Type IDs",
+ "type": "array",
+ "items": {
+ "description": "Supported Information Type IDs",
+ "type": "string"
+ }
+ },
+ "info_job_callback_url": {
+ "description": "callback for Information Job",
+ "type": "string"
+ }
+ }
+ },
+ "Link": {
+ "type": "object",
+ "properties": {
+ "templated": {"type": "boolean"},
+ "href": {"type": "string"}
+ }
+ },
+ "producer_info_type_info": {
+ "description": "Information for an Information Type",
+ "type": "object",
+ "required": [
+ "info_job_data_schema",
+ "info_type_information"
+ ],
+ "properties": {
+ "info_type_information": {
+ "description": "Type specific information for the information type",
+ "type": "object"
+ },
+ "info_job_data_schema": {
+ "description": "Json schema for the job data",
+ "type": "object"
+ }
+ }
+ }
+ }},
+ "openapi": "3.0.1",
+ "paths": {
+ "/dmaap_dataproducer/info_job": {
+ "post": {
+ "summary": "Callback for Information Job creation/modification",
+ "requestBody": {
+ "content": {"application/json": {"schema": {"type": "string"}}},
+ "required": true
+ },
+ "description": "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.",
+ "operationId": "jobCreatedCallback",
+ "responses": {
+ "200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+ },
+ "404": {
+ "description": "Information type is not found",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/error_information"}}}
+ }
+ },
+ "tags": ["Producer job control API"]
+ },
+ "get": {
+ "summary": "Get all jobs",
+ "description": "Returns all info jobs, can be used for trouble shooting",
+ "operationId": "getJobs",
+ "responses": {"200": {
+ "description": "Information jobs",
+ "content": {"application/json": {"schema": {
+ "type": "array",
+ "items": {"$ref": "#/components/schemas/producer_info_job_request"}
+ }}}
+ }},
+ "tags": ["Producer job control API"]
+ }
+ },
+ "/dmaap_dataproducer/health_check": {"get": {
+ "summary": "Producer supervision",
+ "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
+ "operationId": "producerSupervision",
+ "responses": {"200": {
+ "description": "The producer is OK",
+ "content": {"application/json": {"schema": {"type": "string"}}}
+ }},
+ "tags": ["Producer job control API"]
+ }},
+ "/actuator/threaddump": {"get": {
+ "summary": "Actuator web endpoint 'threaddump'",
+ "operationId": "handle_2_1_3",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/actuator/info": {"get": {
+ "summary": "Actuator web endpoint 'info'",
+ "operationId": "handle_9",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/data-producer/v1/info-types/{infoTypeId}": {"put": {
+ "requestBody": {
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/producer_info_type_info"}}},
+ "required": true
+ },
+ "operationId": "putInfoType",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"type": "object"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "infoTypeId",
+ "required": true
+ }],
+ "tags": ["Information Coordinator Service Simulator (exists only in test)"]
+ }},
+ "/actuator/loggers": {"get": {
+ "summary": "Actuator web endpoint 'loggers'",
+ "operationId": "handle_6",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/actuator/health/**": {"get": {
+ "summary": "Actuator web endpoint 'health-path'",
+ "operationId": "handle_12",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/data-producer/v1/info-producers/{infoProducerId}": {
+ "get": {
+ "operationId": "getInfoProducer",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"type": "object"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "infoProducerId",
+ "required": true
+ }],
+ "tags": ["Information Coordinator Service Simulator (exists only in test)"]
+ },
+ "put": {
+ "requestBody": {
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/producer_registration_info"}}},
+ "required": true
+ },
+ "operationId": "putInfoProducer",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"type": "object"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "infoProducerId",
+ "required": true
+ }],
+ "tags": ["Information Coordinator Service Simulator (exists only in test)"]
+ }
+ },
+ "/actuator/metrics/{requiredMetricName}": {"get": {
+ "summary": "Actuator web endpoint 'metrics-requiredMetricName'",
+ "operationId": "handle_5",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "requiredMetricName",
+ "required": true
+ }],
+ "tags": ["Actuator"]
+ }},
+ "/actuator": {"get": {
+ "summary": "Actuator root web endpoint",
+ "operationId": "links_1",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {
+ "additionalProperties": {
+ "additionalProperties": {"$ref": "#/components/schemas/Link"},
+ "type": "object"
+ },
+ "type": "object"
+ }}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/actuator/logfile": {"get": {
+ "summary": "Actuator web endpoint 'logfile'",
+ "operationId": "handle_8",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/actuator/loggers/{name}": {
+ "post": {
+ "summary": "Actuator web endpoint 'loggers-name'",
+ "operationId": "handle_0",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "name",
+ "required": true
+ }],
+ "tags": ["Actuator"]
+ },
+ "get": {
+ "summary": "Actuator web endpoint 'loggers-name'",
+ "operationId": "handle_7",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "name",
+ "required": true
+ }],
+ "tags": ["Actuator"]
+ }
+ },
+ "/dmaap_dataproducer/info_job/{infoJobId}": {"delete": {
+ "summary": "Callback for Information Job deletion",
+ "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
+ "operationId": "jobDeletedCallback",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "infoJobId",
+ "required": true
+ }],
+ "tags": ["Producer job control API"]
+ }},
+ "/actuator/health": {"get": {
+ "summary": "Actuator web endpoint 'health'",
+ "operationId": "handle_11",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/consumer": {"post": {
+ "summary": "Consume data",
+ "requestBody": {
+ "content": {"application/json": {"schema": {"type": "string"}}},
+ "required": true
+ },
+ "description": "The call is invoked to push data to consumer",
+ "operationId": "postData",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+ }},
+ "tags": ["Test Consumer Simulator (exists only in test)"]
+ }},
+ "/dmaap-topic-1": {"get": {
+ "summary": "GET from topic",
+ "description": "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.",
+ "operationId": "getFromTopic",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+ }},
+ "tags": ["DMAAP Simulator (exists only in test)"]
+ }},
+ "/actuator/metrics": {"get": {
+ "summary": "Actuator web endpoint 'metrics'",
+ "operationId": "handle_4",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/actuator/heapdump": {"get": {
+ "summary": "Actuator web endpoint 'heapdump'",
+ "operationId": "handle_10",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }}
+ },
+ "info": {
+ "license": {
+ "name": "Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.",
+ "url": "http://www.apache.org/licenses/LICENSE-2.0"
+ },
+ "description": "Reads data from DMAAP and sends it further to information consumers",
+ "title": "Generic Dmaap Information Producer",
+ "version": "1.0"
+ },
+ "tags": [{
+ "name": "Actuator",
+ "description": "Monitor and interact",
+ "externalDocs": {
+ "description": "Spring Boot Actuator Web API Documentation",
+ "url": "https://docs.spring.io/spring-boot/docs/current/actuator-api/html/"
+ }
+ }]
+}
\ No newline at end of file
diff --git a/dmaap-adaptor-java/api/api.yaml b/dmaap-adaptor-java/api/api.yaml
new file mode 100644
index 0000000..3c9fb59
--- /dev/null
+++ b/dmaap-adaptor-java/api/api.yaml
@@ -0,0 +1,471 @@
+openapi: 3.0.1
+info:
+ title: Generic Dmaap Information Producer
+ description: Reads data from DMAAP and sends it further to information consumers
+ license:
+ name: Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.
+ url: http://www.apache.org/licenses/LICENSE-2.0
+ version: "1.0"
+servers:
+- url: /
+tags:
+- name: Actuator
+ description: Monitor and interact
+ externalDocs:
+ description: Spring Boot Actuator Web API Documentation
+ url: https://docs.spring.io/spring-boot/docs/current/actuator-api/html/
+paths:
+ /dmaap_dataproducer/info_job:
+ get:
+ tags:
+ - Producer job control API
+ summary: Get all jobs
+ description: Returns all info jobs, can be used for trouble shooting
+ operationId: getJobs
+ responses:
+ 200:
+ description: Information jobs
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/producer_info_job_request'
+ post:
+ tags:
+ - Producer job control API
+ summary: Callback for Information Job creation/modification
+ description: The call is invoked to activate or to modify a data subscription.
+ The endpoint is provided by the Information Producer.
+ operationId: jobCreatedCallback
+ requestBody:
+ content:
+ application/json:
+ schema:
+ type: string
+ required: true
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/void'
+ 404:
+ description: Information type is not found
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/error_information'
+ /dmaap_dataproducer/health_check:
+ get:
+ tags:
+ - Producer job control API
+ summary: Producer supervision
+ description: The endpoint is provided by the Information Producer and is used
+ for supervision of the producer.
+ operationId: producerSupervision
+ responses:
+ 200:
+ description: The producer is OK
+ content:
+ application/json:
+ schema:
+ type: string
+ /actuator/threaddump:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'threaddump'
+ operationId: handle_2_1_3
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /actuator/info:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'info'
+ operationId: handle_9
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /data-producer/v1/info-types/{infoTypeId}:
+ put:
+ tags:
+ - Information Coordinator Service Simulator (exists only in test)
+ operationId: putInfoType
+ parameters:
+ - name: infoTypeId
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/producer_info_type_info'
+ required: true
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: object
+ /actuator/loggers:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'loggers'
+ operationId: handle_6
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /actuator/health/**:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'health-path'
+ operationId: handle_12
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /data-producer/v1/info-producers/{infoProducerId}:
+ get:
+ tags:
+ - Information Coordinator Service Simulator (exists only in test)
+ operationId: getInfoProducer
+ parameters:
+ - name: infoProducerId
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: object
+ put:
+ tags:
+ - Information Coordinator Service Simulator (exists only in test)
+ operationId: putInfoProducer
+ parameters:
+ - name: infoProducerId
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/producer_registration_info'
+ required: true
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: object
+ /actuator/metrics/{requiredMetricName}:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'metrics-requiredMetricName'
+ operationId: handle_5
+ parameters:
+ - name: requiredMetricName
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /actuator:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator root web endpoint
+ operationId: links_1
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ additionalProperties:
+ type: object
+ additionalProperties:
+ $ref: '#/components/schemas/Link'
+ /actuator/logfile:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'logfile'
+ operationId: handle_8
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /actuator/loggers/{name}:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'loggers-name'
+ operationId: handle_7
+ parameters:
+ - name: name
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ post:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'loggers-name'
+ operationId: handle_0
+ parameters:
+ - name: name
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /dmaap_dataproducer/info_job/{infoJobId}:
+ delete:
+ tags:
+ - Producer job control API
+ summary: Callback for Information Job deletion
+ description: The call is invoked to terminate a data subscription. The endpoint
+ is provided by the Information Producer.
+ operationId: jobDeletedCallback
+ parameters:
+ - name: infoJobId
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/void'
+ /actuator/health:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'health'
+ operationId: handle_11
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /consumer:
+ post:
+ tags:
+ - Test Consumer Simulator (exists only in test)
+ summary: Consume data
+ description: The call is invoked to push data to consumer
+ operationId: postData
+ requestBody:
+ content:
+ application/json:
+ schema:
+ type: string
+ required: true
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/void'
+ /dmaap-topic-1:
+ get:
+ tags:
+ - DMAAP Simulator (exists only in test)
+ summary: GET from topic
+ description: The call is invoked to activate or to modify a data subscription.
+ The endpoint is provided by the Information Producer.
+ operationId: getFromTopic
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/void'
+ /actuator/metrics:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'metrics'
+ operationId: handle_4
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+ /actuator/heapdump:
+ get:
+ tags:
+ - Actuator
+ summary: Actuator web endpoint 'heapdump'
+ operationId: handle_10
+ responses:
+ 200:
+ description: OK
+ content:
+ '*/*':
+ schema:
+ type: object
+components:
+ schemas:
+ producer_info_job_request:
+ required:
+ - info_job_identity
+ type: object
+ properties:
+ owner:
+ type: string
+ description: The owner of the job
+ last_updated:
+ type: string
+ description: The time when the job was last updated or created (ISO-8601)
+ info_job_identity:
+ type: string
+ description: Identity of the Information Job
+ target_uri:
+ type: string
+ description: URI for the target of the produced Information
+ info_job_data:
+ type: object
+ description: Json for the job data
+ info_type_identity:
+ type: string
+ description: Type identity for the job
+ description: The body of the Information Producer callbacks for Information
+ Job creation and deletion
+ error_information:
+ type: object
+ properties:
+ detail:
+ type: string
+ description: ' A human-readable explanation specific to this occurrence
+ of the problem.'
+ example: Policy type not found
+ status:
+ type: integer
+ description: 'The HTTP status code generated by the origin server for this
+ occurrence of the problem. '
+ format: int32
+ example: 503
+ description: Problem as defined in https://tools.ietf.org/html/rfc7807
+ void:
+ type: object
+ description: Void/empty
+ producer_registration_info:
+ required:
+ - info_job_callback_url
+ - info_producer_supervision_callback_url
+ - supported_info_types
+ type: object
+ properties:
+ info_producer_supervision_callback_url:
+ type: string
+ description: callback for producer supervision
+ supported_info_types:
+ type: array
+ description: Supported Information Type IDs
+ items:
+ type: string
+ description: Supported Information Type IDs
+ info_job_callback_url:
+ type: string
+ description: callback for Information Job
+ description: Information for an Information Producer
+ Link:
+ type: object
+ properties:
+ templated:
+ type: boolean
+ href:
+ type: string
+ producer_info_type_info:
+ required:
+ - info_job_data_schema
+ - info_type_information
+ type: object
+ properties:
+ info_type_information:
+ type: object
+ description: Type specific information for the information type
+ info_job_data_schema:
+ type: object
+ description: Json schema for the job data
+ description: Information for an Information Type
diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml
index b327a0b..5733ea7 100644
--- a/dmaap-adaptor-java/config/application.yaml
+++ b/dmaap-adaptor-java/config/application.yaml
@@ -11,7 +11,8 @@
exposure:
# Enabling of springboot actuator features. See springboot documentation.
include: "loggers,logfile,health,info,metrics,threaddump,heapdump"
-
+springdoc:
+ show-actuator: true
logging:
# Configuration of logging
level:
diff --git a/dmaap-adaptor-java/pom.xml b/dmaap-adaptor-java/pom.xml
index 01f51e2..1fbd83c 100644
--- a/dmaap-adaptor-java/pom.xml
+++ b/dmaap-adaptor-java/pom.xml
@@ -3,7 +3,7 @@
* ========================LICENSE_START=================================
* O-RAN-SC
* %%
-* Copyright (C) 2019 Nordix Foundation
+* Copyright (C) 2021 Nordix Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -301,7 +301,7 @@
<language>openapi-yaml</language>
<output>${project.basedir}/api</output>
<configOptions>
- <outputFile>ecs-api.yaml</outputFile>
+ <outputFile>api.yaml</outputFile>
</configOptions>
</configuration>
</execution>
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java
new file mode 100644
index 0000000..8f33377
--- /dev/null
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/SwaggerConfig.java
@@ -0,0 +1,43 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter;
+
+import io.swagger.v3.oas.annotations.OpenAPIDefinition;
+import io.swagger.v3.oas.annotations.info.Info;
+import io.swagger.v3.oas.annotations.info.License;
+
+/**
+ * Swagger configuration class that uses swagger documentation type and scans
+ * all the controllers. To access the swagger gui go to
+ * http://ip:port/swagger-ui.html
+ */
+@OpenAPIDefinition( //
+ info = @Info(title = SwaggerConfig.API_TITLE, //
+ version = "1.0", //
+ description = SwaggerConfig.DESCRIPTION, //
+ license = @License(name = "Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.",
+ url = "http://www.apache.org/licenses/LICENSE-2.0")))
+public class SwaggerConfig {
+ private SwaggerConfig() {}
+
+ static final String API_TITLE = "Generic Dmaap Information Producer";
+ static final String DESCRIPTION = "Reads data from DMAAP and sends it further to information consumers";
+}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
index d4fe95c..ca7c96c 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
@@ -24,12 +24,16 @@
import com.google.gson.GsonBuilder;
import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.ArrayList;
+import java.util.Collection;
+
import org.oran.dmaapadapter.r1.ProducerJobInfo;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
@@ -52,7 +56,7 @@
public class ProducerCallbacksController {
private static final Logger logger = LoggerFactory.getLogger(ProducerCallbacksController.class);
- public static final String API_NAME = "Management of configuration";
+ public static final String API_NAME = "Producer job control API";
public static final String API_DESCRIPTION = "";
public static final String JOB_URL = "/dmaap_dataproducer/info_job";
public static final String SUPERVISION_URL = "/dmaap_dataproducer/health_check";
@@ -70,7 +74,9 @@
description = "The call is invoked to activate or to modify a data subscription. The endpoint is provided by the Information Producer.")
@ApiResponses(value = { //
@ApiResponse(responseCode = "200", description = "OK", //
- content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
+ content = @Content(schema = @Schema(implementation = VoidResponse.class))), //
+ @ApiResponse(responseCode = "404", description = "Information type is not found", //
+ content = @Content(schema = @Schema(implementation = ErrorResponse.ErrorInfo.class))), //
})
public ResponseEntity<Object> jobCreatedCallback( //
@RequestBody String body) {
@@ -78,7 +84,8 @@
ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class);
logger.info("Job started callback {}", request.id);
- Job job = new Job(request.id, request.targetUri, types.getType(request.typeId));
+ Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner,
+ request.lastUpdated);
this.jobs.put(job);
return new ResponseEntity<>(HttpStatus.OK);
} catch (Exception e) {
@@ -86,6 +93,21 @@
}
}
+ @GetMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE)
+ @Operation(summary = "Get all jobs", description = "Returns all info jobs, can be used for trouble shooting")
+ @ApiResponse(responseCode = "200", //
+ description = "Information jobs", //
+ content = @Content(array = @ArraySchema(schema = @Schema(implementation = ProducerJobInfo.class)))) //
+ public ResponseEntity<Object> getJobs() {
+
+ Collection<ProducerJobInfo> producerJobs = new ArrayList<>();
+ for (Job j : this.jobs.getAll()) {
+ producerJobs.add(new ProducerJobInfo(null, j.getId(), j.getType().getId(), j.getCallbackUrl(), j.getOwner(),
+ j.getLastUpdated()));
+ }
+ return new ResponseEntity<>(gson.toJson(producerJobs), HttpStatus.OK);
+ }
+
@DeleteMapping(path = JOB_URL + "/{infoJobId}", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Callback for Information Job deletion",
description = "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.")
@@ -109,7 +131,7 @@
content = @Content(schema = @Schema(implementation = String.class))) //
})
public ResponseEntity<Object> producerSupervision() {
- logger.info("Producer supervision");
+ logger.debug("Producer supervision");
return new ResponseEntity<>(HttpStatus.OK);
}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
index 690e465..0da94a6 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
@@ -33,10 +33,18 @@
@Getter
private final InfoType type;
- public Job(String id, String callbackUrl, InfoType type) {
+ @Getter
+ private final String owner;
+
+ @Getter
+ private final String lastUpdated;
+
+ public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated) {
this.id = id;
this.callbackUrl = callbackUrl;
this.type = type;
+ this.owner = owner;
+ this.lastUpdated = lastUpdated;
}
}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
index b9a50b3..837ca32 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
@@ -22,11 +22,12 @@
import com.google.gson.JsonParser;
+import lombok.Getter;
+
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
-import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
import org.oran.dmaapadapter.repository.InfoType;
@@ -56,6 +57,7 @@
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
+ @Getter
private boolean isRegisteredInEcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
@@ -68,47 +70,58 @@
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
- logger.debug("Checking producers starting");
- createTask().subscribe(null, null, () -> logger.debug("Producer registration completed"));
+ checkRegistration() //
+ .filter(isRegisterred -> !isRegisterred) //
+ .flatMap(isRegisterred -> registerTypesAndProducer()) //
+ .subscribe( //
+ null, //
+ this::handleRegistrationFailure, //
+ this::handleRegistrationCompleted);
}
- public Mono<Object> createTask() {
- return checkProducerRegistration() //
- .doOnError(t -> isRegisteredInEcs = false) //
- .onErrorResume(t -> registerTypesAndProducer());
+ private void handleRegistrationCompleted() {
+ logger.debug("Registering types and producer succeeded");
+ isRegisteredInEcs = true;
}
- public boolean isRegisteredInEcs() {
- return this.isRegisteredInEcs;
+ private void handleRegistrationFailure(Throwable t) {
+ logger.warn("Registration failed {}", t.getMessage());
+ isRegisteredInEcs = false;
}
- private Mono<Object> checkProducerRegistration() {
+ private Mono<Boolean> checkRegistration() {
final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
- .flatMap(this::checkRegistrationInfo) //
- ;
+ .flatMap(this::isRegisterredInfoCorrect) //
+ .onErrorResume(t -> Mono.just(Boolean.FALSE));
+ }
+
+ private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
+ ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
+ if (isEqual(producerRegistrationInfo(), registerredInfo)) {
+ logger.trace("Already registered");
+ return Mono.just(Boolean.TRUE);
+ } else {
+ return Mono.just(Boolean.FALSE);
+ }
}
private String registerTypeUrl(InfoType type) {
- String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
- return url;
+ return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
private Mono<String> registerTypesAndProducer() {
+ final int CONCURRENCY = 20;
final String producerUrl =
applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
- .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) //
+ .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+ CONCURRENCY) //
.collectList() //
- .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) //
- .onErrorResume(t -> {
- logger.warn("Registration failed {}", t.getMessage());
- isRegisteredInEcs = false;
- return Mono.empty();
- }) //
- .doOnNext(x -> logger.debug("Registering types and producer completed"));
+ .doOnNext(type -> logger.info("Registering producer")) //
+ .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo())));
}
private Object typeSpecifcInfoObject() {
@@ -138,17 +151,6 @@
}
}
- private Mono<String> checkRegistrationInfo(String resp) {
- ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class);
- if (isEqual(producerRegistrationInfo(), info)) {
- logger.debug("Already registered");
- this.isRegisteredInEcs = true;
- return Mono.empty();
- } else {
- return Mono.error(new ServiceException("Producer registration will be started"));
- }
- }
-
private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) {
return a.jobCallbackUrl.equals(b.jobCallbackUrl) //
&& a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) //
@@ -160,7 +162,7 @@
return ProducerRegistrationInfo.builder() //
.jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
.producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //
- .supportedTypeIds(types.typeIds()) //
+ .supportedTypeIds(this.types.typeIds()) //
.build();
}
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
index cbaa59f..b1c1780 100644
--- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
+++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
@@ -147,7 +147,6 @@
this.consumerController.testResults.reset();
this.ecsSimulatorController.testResults.reset();
this.jobs.clear();
- this.types.clear();
}
private AsyncRestClient restClient(boolean useTrustValidation) {
@@ -240,7 +239,8 @@
final String JOB_ID = "ID";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
// Create a job
this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
@@ -254,9 +254,32 @@
await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2));
assertThat(consumer.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
+ String jobUrl = baseUrl() + ProducerCallbacksController.JOB_URL;
+ String jobs = restClient().get(jobUrl).block();
+ assertThat(jobs).contains("ExampleInformationType");
+
// Delete the job
this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+
+ }
+
+ @Test
+ void testReRegister() throws Exception {
+ // Wait foir register types and producer
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+ // Clear the registration, should trigger a re-register
+ ecsSimulatorController.testResults.reset();
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+ // Just clear the registerred types, should trigger a re-register
+ ecsSimulatorController.testResults.types.clear();
+ await().untilAsserted(
+ () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1));
+
}
private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java
index 1dbe83f..4b6d901 100644
--- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java
+++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/ConsumerController.java
@@ -43,7 +43,7 @@
import org.springframework.web.bind.annotation.RestController;
@RestController("ConsumerSimulatorController")
-@Tag(name = "Consts.PRODUCER_API_CALLBACKS_NAME")
+@Tag(name = "Test Consumer Simulator (exists only in test)")
public class ConsumerController {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -64,7 +64,7 @@
final TestResults testResults = new TestResults();
@PostMapping(path = CONSUMER_TARGET_URL, produces = MediaType.APPLICATION_JSON_VALUE)
- @Operation(summary = "GET from topic", description = "The call is invoked to push data to consumer")
+ @Operation(summary = "Consume data", description = "The call is invoked to push data to consumer")
@ApiResponses(value = { //
@ApiResponse(responseCode = "200", description = "OK", //
content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java
index fbb600f..5259ee1 100644
--- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java
+++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/DmaapSimulatorController.java
@@ -42,8 +42,8 @@
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
-@RestController("ProducerSimulatorController")
-@Tag(name = "ProducerConsts.PRODUCER_API_CALLBACKS_NAME")
+@RestController("DmaapSimulatorController")
+@Tag(name = "DMAAP Simulator (exists only in test)")
public class DmaapSimulatorController {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
index 7542e0b..828b027 100644
--- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
+++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
@@ -45,8 +45,8 @@
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
-@RestController("EcsSimulatorController")
-@Tag(name = "EcsSimulator")
+@RestController("IcsSimulatorController")
+@Tag(name = "Information Coordinator Service Simulator (exists only in test)")
public class EcsSimulatorController {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -54,14 +54,16 @@
public static class TestResults {
- ProducerRegistrationInfo registrationInfo;
+ ProducerRegistrationInfo registrationInfo = null;
Map<String, ProducerInfoTypeInfo> types = new HashMap<>();
+ String infoProducerId = null;
public TestResults() {}
public void reset() {
registrationInfo = null;
types.clear();
+ infoProducerId = null;
}
}
@@ -86,6 +88,7 @@
@PathVariable("infoProducerId") String infoProducerId, //
@RequestBody ProducerRegistrationInfo registrationInfo) {
testResults.registrationInfo = registrationInfo;
+ testResults.infoProducerId = infoProducerId;
return new ResponseEntity<>(HttpStatus.OK);
}