Creating datafile
Datafile collector, wich is moved from the prototype repo.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-852
Change-Id: I1b91a51e328dc8cd11c14b290fe7296ed165ddf4
diff --git a/datafilecollector/.gitignore b/datafilecollector/.gitignore
new file mode 100644
index 0000000..9ec364a
--- /dev/null
+++ b/datafilecollector/.gitignore
@@ -0,0 +1,54 @@
+# Compiled class file
+*.class
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+# Intellij IDE
+.idea
+*.iml
+
+# Eclipse IDE
+.project
+.classpath
+.settings
+bin
+
+# Maven
+target
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+.mvn/wrapper/maven-wrapper.jar
+
+# CheckStyle files
+.checkstyle
+
+opt/
+
+# Visual Studio Code
+.factorypath
diff --git a/datafilecollector/Dockerfile b/datafilecollector/Dockerfile
new file mode 100755
index 0000000..cee8d75
--- /dev/null
+++ b/datafilecollector/Dockerfile
@@ -0,0 +1,51 @@
+#
+# ============LICENSE_START=======================================================
+# Copyright (C) 2023 Nordix Foundation.
+# Copyright (C) 2020 Nokia.
+# Copyright (C) 2021 Samsung Electronics.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=========================================================
+#
+FROM openjdk:17-jdk-slim
+
+EXPOSE 8100 8433
+
+ARG user=datafile
+ARG group=datafile
+
+USER root
+WORKDIR /opt/app/datafile
+
+ADD /config/application.yaml /opt/app/datafile/config/
+ADD /config/ftps_keystore.pass /opt/app/datafile/config/
+ADD /config/ftps_keystore.p12 /opt/app/datafile/config/
+ADD /config/keystore.jks /opt/app/datafile/config/
+ADD /config/truststore.jks /opt/app/datafile/config/
+ADD /config/truststore.pass /opt/app/datafile/config/
+
+
+
+
+RUN mkdir -p /var/log/ONAP /opt/app/datafile/etc/cert/ && \
+ addgroup $group && adduser --system --disabled-password --no-create-home --ingroup $group $user && \
+ chown -R $user:$group /var/log/ONAP /opt/app/datafile/config && \
+ chmod -R u+rw /opt/app/datafile/config/
+
+
+USER $user
+
+COPY --chown=$user:$group /target/datafile-app-server.jar /opt/app/datafile/
+ENTRYPOINT ["java", "-jar", "/opt/app/datafile/datafile-app-server.jar"]
diff --git a/datafilecollector/LICENSE.txt b/datafilecollector/LICENSE.txt
new file mode 100644
index 0000000..581378e
--- /dev/null
+++ b/datafilecollector/LICENSE.txt
@@ -0,0 +1,36 @@
+/*
+* ============LICENSE_START==========================================
+* Copyright (C) 2018-2023 Nordix Foundation. All rights reserved.
+* Copyright (c) 2018 NOKIA Intellectual Property. All rights reserved.
+* ===================================================================
+*
+* Unless otherwise specified, all software contained herein is licensed
+* under the Apache License, Version 2.0 (the "License");
+* you may not use this software except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*
+*
+* Unless otherwise specified, all documentation contained herein is licensed
+* under the Creative Commons License, Attribution 4.0 Intl. (the "License");
+* you may not use this documentation except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* https://creativecommons.org/licenses/by/4.0/
+*
+* Unless required by applicable law or agreed to in writing, documentation
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* ============LICENSE_END============================================
+*/
diff --git a/datafilecollector/README.md b/datafilecollector/README.md
new file mode 100644
index 0000000..785a46d
--- /dev/null
+++ b/datafilecollector/README.md
@@ -0,0 +1,37 @@
+# DFC (DataFile Collector)
+
+Datafile Collector is responsible for collecting PM counter files from traffical functions.
+The files are stored in a persistent volume or in an S3 object store.
+
+The origin is from ONAP. This variant uses Kafka and S3 object store and does not use the Dmaap.
+
+## Introduction
+
+DFC is delivered as one **Docker container** which hosts application server and can be started by `docker-compose`.
+
+## Compiling DFC
+
+Whole project (top level of DFC directory) and each module (sub module directory) can be compiled using
+`mvn clean install` command.
+
+## Build image
+```
+mvn install docker:build
+```
+
+## Main API Endpoints
+
+Running with dev-mode of DFC
+
+- **Heartbeat**: http://<container_address>:8100/**heartbeat** or https://<container_address>:8443/**heartbeat**
+
+- **Start DFC**: http://<container_address>:8100/**start** or https://<container_address>:8433/**start**
+
+- **Stop DFC**: http://<container_address>:8100/**stopDatafile** or https://<container_address>:8433/**stopDatafile**
+
+
+
+## License
+
+Copyright (C) 2018-2019 NOKIA Intellectual Property, 2018-2023 Nordix Foundation. All rights reserved.
+[License](http://www.apache.org/licenses/LICENSE-2.0)
diff --git a/datafilecollector/config/README b/datafilecollector/config/README
new file mode 100644
index 0000000..cfde02e
--- /dev/null
+++ b/datafilecollector/config/README
@@ -0,0 +1,43 @@
+The keystore.jks and truststore.jks files are created by using the following commands (note that this is an example):
+
+1) Create a CA certificate and a private key:
+
+openssl genrsa -des3 -out CA-key.pem 2048
+openssl req -new -key CA-key.pem -x509 -days 3600 -out CA-cert.pem
+
+2) Create a keystore with a private key entry that is signed by the CA:
+
+Note: your name must be "localhost"
+
+keytool -genkeypair -alias policy_agent -keyalg RSA -keysize 2048 -keystore keystore.jks -validity 3650 -storepass policy_agent
+keytool -certreq -alias policy_agent -file request.csr -keystore keystore.jks -ext san=dns:your.domain.com -storepass policy_agent
+openssl x509 -req -days 3650 -in request.csr -CA CA-cert.pem -CAkey CA-key.pem -CAcreateserial -out ca_signed-cert.pem
+keytool -importcert -alias ca_cert -file CA-cert.pem -keystore keystore.jks -trustcacerts -storepass policy_agent
+keytool -importcert -alias policy_agent -file ca_signed-cert.pem -keystore keystore.jks -trustcacerts -storepass policy_agent
+
+
+3) Create a trust store containing the CA cert (to trust all certs signed by the CA):
+
+keytool -genkeypair -alias not_used -keyalg RSA -keysize 2048 -keystore truststore.jks -validity 3650 -storepass policy_agent
+keytool -importcert -alias ca_cert -file CA-cert.pem -keystore truststore.jks -trustcacerts -storepass policy_agent
+
+
+4) Command for listing of the contents of jks files, examples:
+keytool -list -v -keystore keystore.jks -storepass policy_agent
+keytool -list -v -keystore truststore.jks -storepass policy_agent
+
+## License
+
+Copyright (C) 2022 Nordix Foundation. All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
diff --git a/datafilecollector/config/application.yaml b/datafilecollector/config/application.yaml
new file mode 100644
index 0000000..71f3172
--- /dev/null
+++ b/datafilecollector/config/application.yaml
@@ -0,0 +1,56 @@
+spring:
+ profiles:
+ active: prod
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "loggers,logfile,health,info,metrics"
+server:
+ port: 8433
+ ssl:
+ key-store-type: JKS
+ key-store-password: policy_agent
+ key-store: config/keystore.jks
+ key-password: policy_agent
+ key-alias: policy_agent
+logging:
+ level:
+ ROOT: WARN
+ org.onap: WARN
+ org.springframework: WARN
+ org.springframework.data: WARN
+ org.springframework.web.reactive.function.client.ExchangeFunctions: WARN
+ org.onap.dcaegen2.collectors.datafile: INFO
+
+ file:
+ name: /var/log/ONAP/application.log
+app:
+ filepath: config/datafile_endpoints_test.json
+ collected-files-path: "/tmp/onap_datafile/"
+ # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
+ # several redundant boostrap servers can be specified, separated by a comma ','.
+ kafka:
+ bootstrap-servers: localhost:9092
+ # output topic
+ collected-file-topic: collected-file
+ client-id: datafile-1
+ # input topic
+ file-ready-event-topic: file-ready
+ sftp:
+ known-hosts-file-path:
+ strict-host-key-checking: false
+ ssl:
+ key-store-password-file: /opt/app/datafile/config/ftps_keystore.pass
+ key-store: /opt/app/datafile/config/ftps_keystore.p12
+ trust-store-password-file: /opt/app/datafile/config/truststore.pass
+ trust-store: /opt/app/datafile/config/truststore.jks
+ s3:
+ endpointOverride:
+ accessKeyId:
+ secretAccessKey:
+ bucket:
+ locksBucket:
+springdoc:
+ show-actuator: true
+ swagger-ui.disable-swagger-default-url: true
\ No newline at end of file
diff --git a/datafilecollector/config/ftps_keystore.p12 b/datafilecollector/config/ftps_keystore.p12
new file mode 100755
index 0000000..b847707
--- /dev/null
+++ b/datafilecollector/config/ftps_keystore.p12
Binary files differ
diff --git a/datafilecollector/config/ftps_keystore.pass b/datafilecollector/config/ftps_keystore.pass
new file mode 100755
index 0000000..1e7befc
--- /dev/null
+++ b/datafilecollector/config/ftps_keystore.pass
@@ -0,0 +1 @@
+HVpAf0kHGl4P#fdpblJLka6b
\ No newline at end of file
diff --git a/datafilecollector/config/keystore.jks b/datafilecollector/config/keystore.jks
new file mode 100644
index 0000000..563c67b
--- /dev/null
+++ b/datafilecollector/config/keystore.jks
Binary files differ
diff --git a/datafilecollector/config/truststore.jks b/datafilecollector/config/truststore.jks
new file mode 100644
index 0000000..50a0f9e
--- /dev/null
+++ b/datafilecollector/config/truststore.jks
Binary files differ
diff --git a/datafilecollector/config/truststore.pass b/datafilecollector/config/truststore.pass
new file mode 100755
index 0000000..b915b0f
--- /dev/null
+++ b/datafilecollector/config/truststore.pass
@@ -0,0 +1 @@
+policy_agent
\ No newline at end of file
diff --git a/datafilecollector/onap-java-formatter.xml b/datafilecollector/onap-java-formatter.xml
new file mode 100644
index 0000000..ccd07d9
--- /dev/null
+++ b/datafilecollector/onap-java-formatter.xml
@@ -0,0 +1,296 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<profiles version="13">
+<profile kind="CodeFormatterProfile" name="onap-java-formatter" version="12">
+<setting id="org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode" value="enabled"/>
+<setting id="org.eclipse.jdt.core.compiler.codegen.targetPlatform" value="1.8"/>
+<setting id="org.eclipse.jdt.core.compiler.compliance" value="1.8"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.assertIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.compiler.problem.enumIdentifier" value="error"/>
+<setting id="org.eclipse.jdt.core.compiler.source" value="1.8"/>
+<setting id="org.eclipse.jdt.core.formatter.align_type_members_on_columns" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation" value="48"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_assignment" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_binary_expression" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_compact_if" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_conditional_expression" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_enum_constants" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_method_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_multiple_fields" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_resources_in_try" value="80"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch" value="16"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_imports" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_after_package" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_field" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_imports" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_member_type" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_method" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_before_package" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_import_groups" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations" value="2"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_array_initializer" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_block_in_case" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_constant" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_lambda_body" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_method_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_switch" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.brace_position_for_type_declaration" value="end_of_line"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_html" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.format_source_code" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_parameter_description" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.indent_root_tags" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="120"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.compact_else_if" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.disabling_tag" value="@formatter:off"/>
+<setting id="org.eclipse.jdt.core.formatter.enabling_tag" value="@formatter:on"/>
+<setting id="org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_empty_lines" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_block" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_statements_compare_to_body" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_label" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_after_type_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_ellipsis" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_lambda_arrow" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_after_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_binary_operator" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_ellipsis" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_lambda_arrow" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional" value="insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_before_unary_operator" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation" value="do not insert"/>
+<setting id="org.eclipse.jdt.core.formatter.join_lines_in_comments" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.join_wrapped_lines" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="120"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body" value="0"/>
+<setting id="org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve" value="1"/>
+<setting id="org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
+<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
+<setting id="org.eclipse.jdt.core.formatter.use_on_off_tags" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_binary_operator" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch" value="true"/>
+<setting id="org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested" value="true"/>
+</profile>
+</profiles>
+
diff --git a/datafilecollector/pom.xml b/datafilecollector/pom.xml
new file mode 100644
index 0000000..c122253
--- /dev/null
+++ b/datafilecollector/pom.xml
@@ -0,0 +1,334 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ Copyright (C) 2023 Nordix Foundation
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>3.0.4</version>
+ </parent>
+ <groupId>org.o-ran-sc.nonrtric.plt.ranpm</groupId>
+ <artifactId>datafile-app-server</artifactId>
+ <packaging>jar</packaging>
+ <properties>
+ <java.version>17</java.version>
+ <maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
+ <docker-client.version>8.7.1</docker-client.version>
+ <springfox.version>3.0.0</springfox.version>
+ <gson.version>2.9.0</gson.version>
+ <docker-maven-plugin>0.30.0</docker-maven-plugin>
+ <jacoco-maven-plugin.version>0.8.8</jacoco-maven-plugin.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ <version>1.3.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webmvc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-client</artifactId>
+ <version>${docker-client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webflux</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-autoconfigure</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents.core5</groupId>
+ <artifactId>httpcore5</artifactId>
+ </dependency>
+ <!-- Actuator dependencies -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <version>2.17.292</version>
+ </dependency>
+ <!--TESTS
+ DEPENDENCIES -->
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <version>5.7.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!--REQUIRED
+ TO GENERATE DOCUMENTATION -->
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-spring-web</artifactId>
+ <version>${springfox.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-spi</artifactId>
+ <version>${springfox.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-core</artifactId>
+ <version>${springfox.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springdoc</groupId>
+ <artifactId>springdoc-openapi-ui</artifactId>
+ <version>1.6.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger2</artifactId>
+ <version>${springfox.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger-ui</artifactId>
+ <version>${springfox.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>3.6</version>
+ </dependency>
+ <dependency>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ <version>0.1.55</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ <version>1.0.22</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+ <finalName>${project.artifactId}</finalName>
+ <mainClass>org.onap.dcaegen2.collectors.datafile.MainApp</mainClass>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>build-info</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>${docker-maven-plugin}</version>
+ <inherited>false</inherited>
+ <executions>
+ <execution>
+ <id>generate-image</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ <configuration>
+ <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+ <images>
+ <image>
+ <name>
+ o-ran-sc/nonrtric-plt-ranpm-datafilecollector:${project.version}</name>
+ <build>
+ <cleanup>try</cleanup>
+ <contextDir>${basedir}</contextDir>
+ <dockerFile>Dockerfile</dockerFile>
+ <args>
+ <JAR>${project.build.finalName}.jar</JAR>
+ </args>
+ <tags>
+ <tag>${project.version}</tag>
+ </tags>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ <execution>
+ <id>push-image</id>
+ <goals>
+ <goal>build</goal>
+ <goal>push</goal>
+ </goals>
+ <configuration>
+ <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+ <pushRegistry>${env.CONTAINER_PUSH_REGISTRY}</pushRegistry>
+ <images>
+ <image>
+ <name>
+ o-ran-sc/nonrtric-plt-ranpm-datafilecollector:${project.version}</name>
+ <build>
+ <contextDir>${basedir}</contextDir>
+ <dockerFile>Dockerfile</dockerFile>
+ <args>
+ <JAR>${project.build.finalName}.jar</JAR>
+ </args>
+ <tags>
+ <tag>${project.version}</tag>
+ <tag>latest</tag>
+ </tags>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>get-the-git-infos</id>
+ <goals>
+ <goal>revision</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>true</verbose>
+ <dotGitDirectory>${project.basedir}/.git</dotGitDirectory>
+ <dateFormat>MM-dd-yyyy '@' HH:mm:ss Z</dateFormat>
+ <generateGitPropertiesFile>true</generateGitPropertiesFile>
+ <generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties
+ </generateGitPropertiesFilename>
+ <failOnNoGitDirectory>true</failOnNoGitDirectory>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>1.18.0</version>
+ <configuration>
+ <java>
+ <removeUnusedImports />
+ <importOrder>
+ <order>com,java,javax,org</order>
+ </importOrder>
+ </java>
+ </configuration>
+ <!-- https://github.com/diffplug/spotless/tree/master/plugin-maven use
+ mvn spotless:apply to rewrite source files use mvn spotless:check to validate
+ source files -->
+ </plugin>
+ <plugin>
+ <groupId>net.revelc.code.formatter</groupId>
+ <artifactId>formatter-maven-plugin</artifactId>
+ <version>2.8.1</version>
+ <configuration>
+ <configFile>${project.basedir}/onap-java-formatter.xml</configFile>
+ </configuration>
+ <!-- https://code.revelc.net/formatter-maven-plugin/ use mvn formatter:format
+ to rewrite source files use mvn formatter:validate to validate source files -->
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>${jacoco-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>default-prepare-agent</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>default-report</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
new file mode 100644
index 0000000..851db32
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2021 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
+
+/**
+ * The main app of DFC.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+@SpringBootApplication()
+@EnableScheduling
+public class MainApp {
+
+ public static void main(String[] args) {
+ SpringApplication.run(MainApp.class, args);
+ }
+
+ @Bean
+ TaskScheduler taskScheduler() {
+ return new ConcurrentTaskScheduler();
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java
new file mode 100644
index 0000000..517e382
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileCollectClient.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.commons;
+
+import java.nio.file.Path;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * A closeable file client.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface FileCollectClient extends AutoCloseable {
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
+
+ public void open() throws DatafileTaskException;
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java
new file mode 100644
index 0000000..5896fe6
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/FileServerData.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Modifications copyright (C) 2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.commons;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import lombok.Builder;
+import lombok.ToString;
+
+import org.apache.hc.core5.http.NameValuePair;
+
+/**
+ * Data about the file server to collect a file from.
+ * In case of http protocol it also contains data required to recreate target
+ * uri
+ */
+@Builder
+@ToString
+public class FileServerData {
+
+ public String serverAddress;
+ public String userId;
+
+ @ToString.Exclude
+ public String password;
+
+ @Builder.Default
+ @ToString.Exclude
+ public List<NameValuePair> queryParameters = new ArrayList<>();
+
+ @Builder.Default
+ public String uriRawFragment = "";
+
+ public Integer port;
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java
new file mode 100644
index 0000000..613fa39
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.commons;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+ FTPES, SFTP, HTTP, HTTPS;
+
+ public static final String DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG = "DFC does not support protocol ";
+ public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE =
+ ". Supported protocols are FTPeS, sFTP, HTTP and HTTPS";
+
+ /**
+ * Get a <code>Scheme</code> from a string.
+ *
+ * @param schemeString the string to convert to <code>Scheme</code>.
+ * @return The corresponding <code>Scheme</code>
+ * @throws DatafileTaskException if the value of the string doesn't match any defined scheme.
+ */
+ public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+ Scheme result;
+ if ("FTPES".equalsIgnoreCase(schemeString)) {
+ result = Scheme.FTPES;
+ } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.SFTP;
+ } else if ("HTTP".equalsIgnoreCase(schemeString)) {
+ result = Scheme.HTTP;
+ } else if ("HTTPS".equalsIgnoreCase(schemeString)) {
+ result = Scheme.HTTPS;
+ } else {
+ throw new DatafileTaskException(
+ DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + schemeString + SUPPORTED_PROTOCOLS_ERROR_MESSAGE);
+ }
+ return result;
+ }
+
+ /**
+ * Check if <code>Scheme</code> is FTP type or HTTP type.
+ *
+ * @param scheme the <code>Scheme</code> which has to be checked.
+ * @return true if <code>Scheme</code> is FTP type or false if it is HTTP type
+ */
+ public static boolean isFtpScheme(Scheme scheme) {
+ return scheme == SFTP || scheme == FTPES;
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/SecurityUtil.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/SecurityUtil.java
new file mode 100644
index 0000000..9d6b7f9
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/SecurityUtil.java
@@ -0,0 +1,52 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.commons;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class containing functions used for certificates configuration
+ *
+ * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
+ */
+public final class SecurityUtil {
+ private SecurityUtil() {
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(SecurityUtil.class);
+
+ public static String getKeystorePasswordFromFile(String passwordPath) {
+ return getPasswordFromFile(passwordPath, "Keystore");
+ }
+
+ public static String getTruststorePasswordFromFile(String passwordPath) {
+ return getPasswordFromFile(passwordPath, "Truststore");
+ }
+
+ public static String getPasswordFromFile(String passwordPath, String element) {
+ try {
+ return new String(Files.readAllBytes(Paths.get(passwordPath)));
+ } catch (IOException e) {
+ logger.error("{} password file at path: {} cannot be opened ", element, passwordPath);
+ }
+ return "";
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
new file mode 100644
index 0000000..f8be04d
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -0,0 +1,126 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.util.Properties;
+
+import lombok.Getter;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * Holds all configuration for the DFC.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on
+ * 3/23/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+
+@Component
+@EnableConfigurationProperties
+public class AppConfig {
+
+ @Value("#{systemEnvironment}")
+ Properties systemEnvironment;
+
+ @Value("${app.filepath}")
+ String filepath;
+
+ @Value("${app.kafka.bootstrap-servers:}")
+ private String kafkaBootStrapServers;
+
+ @Value("${app.kafka.collected-file-topic:}")
+ public String collectedFileTopic;
+
+ @Value("${app.kafka.file-ready-event-topic:}")
+ public String fileReadyEventTopic;
+
+ @Value("${app.kafka.client-id:undefined}")
+ public String kafkaClientId;
+
+ @Value("${app.collected-files-path:}")
+ public String collectedFilesPath;
+
+ @Value("${app.sftp.strict-host-key-checking:false}")
+ public boolean strictHostKeyChecking;
+
+ @Value("${app.sftp.known-hosts-file-path:}")
+ public String knownHostsFilePath;
+
+ @Value("${app.ssl.key-store-password-file}")
+ private String clientKeyStorePassword = "";
+
+ @Value("${app.ssl.key-store:}")
+ private String clientKeyStore = "";
+
+ @Value("${app.ssl.trust-store:}")
+ private String clientTrustStore = "";
+
+ @Value("${app.ssl.trust-store-password-file:}")
+ private String clientTrustStorePassword;
+
+ @Getter
+ @Value("${app.s3.endpointOverride:}")
+ private String s3EndpointOverride;
+
+ @Getter
+ @Value("${app.s3.accessKeyId:}")
+ private String s3AccessKeyId;
+
+ @Getter
+ @Value("${app.s3.secretAccessKey:}")
+ private String s3SecretAccessKey;
+
+ @Getter
+ @Value("${app.s3.bucket:}")
+ private String s3Bucket;
+
+ @Value("${app.s3.locksBucket:}")
+ private String s3LocksBucket;
+
+ public String getS3LocksBucket() {
+ return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
+ }
+
+ public boolean isS3Enabled() {
+ return !s3EndpointOverride.isEmpty() && !s3Bucket.isEmpty();
+ }
+
+ public String getKafkaBootStrapServers() {
+ return kafkaBootStrapServers;
+ }
+
+ public synchronized CertificateConfig getCertificateConfiguration() {
+ return CertificateConfig.builder() //
+ .trustedCa(this.clientTrustStore) //
+ .trustedCaPasswordPath(this.clientTrustStorePassword) //
+ .keyCert(this.clientKeyStore) //
+ .keyPasswordPath(this.clientKeyStorePassword) //
+ .build();
+ }
+
+ public synchronized SftpConfig getSftpConfiguration() {
+ return SftpConfig.builder() //
+ .knownHostsFilePath(this.knownHostsFilePath) //
+ .strictHostKeyChecking(this.strictHostKeyChecking) //
+ .build();
+ }
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CertificateConfig.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CertificateConfig.java
new file mode 100644
index 0000000..938d322
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CertificateConfig.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2022 Nokia. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import lombok.Builder;
+
+@Builder
+public class CertificateConfig {
+
+ public String keyCert;
+
+ public String keyPasswordPath;
+
+ public String trustedCa;
+
+ public String trustedCaPasswordPath;
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SftpConfig.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SftpConfig.java
new file mode 100644
index 0000000..182a59e
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SftpConfig.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import lombok.Builder;
+
+@Builder
+public class SftpConfig {
+
+ public boolean strictHostKeyChecking;
+
+ public String knownHostsFilePath;
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
new file mode 100644
index 0000000..b7dc521
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -0,0 +1,42 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2021 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import io.swagger.v3.oas.annotations.OpenAPIDefinition;
+import io.swagger.v3.oas.annotations.info.Info;
+import io.swagger.v3.oas.annotations.info.License;
+
+@OpenAPIDefinition(
+ info = @Info(
+ title = SwaggerConfig.API_TITLE,
+ version = SwaggerConfig.VERSION,
+ description = SwaggerConfig.DESCRIPTION,
+ license = @License(
+ name = "Copyright (C) 2020 Nordix Foundation. Licensed under the Apache License.",
+ url = "http://www.apache.org/licenses/LICENSE-2.0")))
+public class SwaggerConfig {
+
+ public static final String VERSION = "1.0";
+ public static final String API_TITLE = "DATAFILE App Server";
+ static final String DESCRIPTION = "<p>This page lists all the rest apis for DATAFILE app server.</p>";
+
+ private SwaggerConfig() {
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
new file mode 100644
index 0000000..d1f615b
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
@@ -0,0 +1,99 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2021 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.controllers;
+
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+import org.onap.dcaegen2.collectors.datafile.tasks.CollectAndReportFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestHeader;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+/**
+ * REST Controller to check the heart beat and status of the DFC.
+ */
+@RestController
+@Tag(name = "StatusController")
+public class StatusController {
+
+ private static final Logger logger = LoggerFactory.getLogger(StatusController.class);
+
+ private final CollectAndReportFiles collectAndReportFiles;
+
+ public StatusController(CollectAndReportFiles task) {
+ this.collectAndReportFiles = task;
+ }
+
+ /**
+ * Checks the heart beat of DFC.
+ *
+ * @return the heart beat status of DFC.
+ */
+ @GetMapping("/heartbeat")
+ @Operation(summary = "Returns liveness of DATAFILE service")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "DATAFILE service is living"),
+ @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
+ @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
+ public Mono<ResponseEntity<String>> heartbeat(@RequestHeader HttpHeaders headers) {
+ logger.info("ENTRY {}", "Heartbeat request");
+
+ String statusString = "I'm living!";
+
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(statusString, HttpStatus.OK));
+ logger.info("EXIT {}", "Heartbeat request");
+ return response;
+ }
+
+ /**
+ * Returns diagnostics and statistics information. It is intended for testing
+ * and trouble
+ * shooting.
+ *
+ * @return information.
+ */
+ @GetMapping("/status")
+ @Operation(summary = "Returns status and statistics of DATAFILE service")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "DATAFILE service is living"),
+ @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
+ @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
+ @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")})
+ public Mono<ResponseEntity<String>> status(@RequestHeader HttpHeaders headers) {
+
+ logger.info("ENTRY {}", "Status request");
+
+ Counters counters = collectAndReportFiles.getCounters();
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(counters.toString(), HttpStatus.OK));
+ logger.info("EXIT {}", "Status request");
+ return response;
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java
new file mode 100644
index 0000000..af0512e
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java
@@ -0,0 +1,57 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.datastore;
+
+import java.nio.file.Path;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface DataStore {
+ public enum Bucket {
+ FILES, LOCKS
+ }
+
+ public Flux<String> listObjects(Bucket bucket, String prefix);
+
+ public Mono<byte[]> readObject(Bucket bucket, String name);
+
+ public Mono<Boolean> createLock(String name);
+
+ public Mono<Boolean> deleteLock(String name);
+
+ public Mono<Boolean> deleteObject(Bucket bucket, String name);
+
+ public Mono<String> copyFileTo(Path from, String to);
+
+ public Mono<String> create(DataStore.Bucket bucket);
+
+ public Mono<String> deleteBucket(Bucket bucket);
+
+ public Mono<Boolean> fileExists(Bucket bucket, String key);
+
+ public static DataStore create(AppConfig config) {
+ return config.isS3Enabled() ? new S3ObjectStore(config) : new FileStore(config);
+ }
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java
new file mode 100644
index 0000000..7f497be
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java
@@ -0,0 +1,160 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.datastore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class FileStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(java.lang.invoke.MethodHandles.lookup().lookupClass());
+
+ AppConfig applicationConfig;
+
+ public FileStore(AppConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+ }
+
+ @Override
+ public Flux<String> listObjects(Bucket bucket, String prefix) {
+ Path root = Path.of(applicationConfig.collectedFilesPath, prefix);
+ if (!root.toFile().exists()) {
+ root = root.getParent();
+ }
+
+ logger.debug("Listing files in: {}", root);
+
+ List<String> result = new ArrayList<>();
+ try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
+
+ stream.forEach(path -> filterListFiles(path, prefix, result));
+
+ return Flux.fromIterable(result);
+ } catch (Exception e) {
+ return Flux.error(e);
+ }
+ }
+
+ private void filterListFiles(Path path, String prefix, List<String> result) {
+ if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
+ result.add(externalName(path));
+ } else {
+ logger.debug("Ignoring file {} that does not start with: {}", path, prefix);
+ }
+ }
+
+ private String externalName(Path path) {
+ String fullName = path.toString();
+ String externalName = fullName.substring(applicationConfig.collectedFilesPath.length());
+ if (externalName.startsWith("/")) {
+ externalName = externalName.substring(1);
+ }
+ return externalName;
+ }
+
+ @Override
+ public Mono<byte[]> readObject(Bucket bucket, String fileName) {
+ try {
+ byte[] contents = Files.readAllBytes(path(fileName));
+ return Mono.just(contents);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> createLock(String name) {
+ File file = path(name).toFile();
+ try {
+ Files.createDirectories(path(name).getParent());
+ boolean res = file.createNewFile();
+ return Mono.just(res);
+ } catch (Exception e) {
+ logger.warn("Could not create lock file: {}, reason: {}", file.getPath(), e.getMessage());
+ return Mono.just(!file.exists());
+ }
+ }
+
+ @Override
+ public Mono<String> copyFileTo(Path from, String to) {
+ try {
+ Path toPath = path(to);
+ Files.createDirectories(toPath);
+ Files.copy(from, path(to), StandardCopyOption.REPLACE_EXISTING);
+ return Mono.just(to);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> deleteLock(String name) {
+ return deleteObject(Bucket.LOCKS, name);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+ try {
+ Files.delete(path(name));
+ return Mono.just(true);
+ } catch (Exception e) {
+ return Mono.just(false);
+ }
+ }
+
+ @Override
+ public Mono<String> create(Bucket bucket) {
+ return Mono.just("OK");
+ }
+
+ private Path path(String name) {
+ return Path.of(applicationConfig.collectedFilesPath, name);
+ }
+
+ public Mono<Boolean> fileExists(Bucket bucket, String key) {
+ return Mono.just(path(key).toFile().exists());
+ }
+
+ @Override
+ public Mono<String> deleteBucket(Bucket bucket) {
+ try {
+ FileSystemUtils.deleteRecursively(Path.of(applicationConfig.collectedFilesPath));
+ } catch (IOException e) {
+ logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.collectedFilesPath,
+ e.getMessage());
+ }
+ return Mono.just("OK");
+ }
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java
new file mode 100644
index 0000000..f93bbaf
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java
@@ -0,0 +1,313 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.datastore;
+
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.BytesWrapper;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3ObjectStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
+ private final AppConfig applicationConfig;
+
+ private static S3AsyncClient s3AsynchClient;
+
+ public S3ObjectStore(AppConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+
+ getS3AsynchClient(applicationConfig);
+ }
+
+ private static synchronized S3AsyncClient getS3AsynchClient(AppConfig applicationConfig) {
+ if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
+ s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
+ }
+ return s3AsynchClient;
+ }
+
+ private static S3AsyncClientBuilder getS3AsyncClientBuilder(AppConfig applicationConfig) {
+ URI uri = URI.create(applicationConfig.getS3EndpointOverride());
+ return S3AsyncClient.builder() //
+ .region(Region.US_EAST_1) //
+ .endpointOverride(uri) //
+ .credentialsProvider(StaticCredentialsProvider.create( //
+ AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
+ applicationConfig.getS3SecretAccessKey())));
+ }
+
+ @Override
+ public Flux<String> listObjects(Bucket bucket, String prefix) {
+ return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
+ }
+
+ @Override
+ public Mono<Boolean> createLock(String name) {
+ return getHeadObject(bucket(Bucket.LOCKS), name).flatMap(head -> createLock(name, head)) //
+ .onErrorResume(t -> createLock(name, null));
+ }
+
+ private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
+ if (head == null) {
+
+ return this.putObject(Bucket.LOCKS, name, "") //
+ .flatMap(resp -> Mono.just(true)) //
+ .doOnError(t -> logger.warn("Failed to create lock {}, reason: {}", name, t.getMessage())) //
+ .onErrorResume(t -> Mono.just(false));
+ } else {
+ return Mono.just(false);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> deleteLock(String name) {
+ return deleteObject(Bucket.LOCKS, name);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(Bucket bucket, String name) {
+
+ DeleteObjectRequest request = DeleteObjectRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .key(name) //
+ .build();
+
+ CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
+
+ return Mono.fromFuture(future).map(resp -> true);
+ }
+
+ @Override
+ public Mono<byte[]> readObject(Bucket bucket, String fileName) {
+ return getDataFromS3Object(bucket(bucket), fileName);
+ }
+
+ public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
+ PutObjectRequest request = PutObjectRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .key(fileName) //
+ .build();
+
+ AsyncRequestBody body = AsyncRequestBody.fromString(bodyString);
+
+ CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+ return Mono.fromFuture(future) //
+ .map(putObjectResponse -> fileName) //
+ .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+ }
+
+ @Override
+ public Mono<String> copyFileTo(Path fromFile, String toFile) {
+ return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile);
+ }
+
+ public Mono<Boolean> fileExists(Bucket bucket, String key) {
+ return this.getHeadObject(bucket(bucket), key).map(obj -> true) //
+ .onErrorResume(t -> Mono.just(false));
+ }
+
+ @Override
+ public Mono<String> create(Bucket bucket) {
+ return createS3Bucket(bucket(bucket));
+ }
+
+ private Mono<String> createS3Bucket(String s3Bucket) {
+
+ CreateBucketRequest request = CreateBucketRequest.builder() //
+ .bucket(s3Bucket) //
+ .build();
+
+ CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
+
+ return Mono.fromFuture(future) //
+ .map(f -> s3Bucket) //
+ .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage()))
+ .onErrorResume(t -> Mono.just(s3Bucket));
+ }
+
+ @Override
+ public Mono<String> deleteBucket(Bucket bucket) {
+ return deleteAllFiles(bucket) //
+ .collectList() //
+ .flatMap(list -> deleteBucketFromS3Storage(bucket)) //
+ .map(resp -> "OK")
+ .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
+ .onErrorResume(t -> Mono.just("NOK"));
+ }
+
+ private Flux<DeleteObjectsResponse> deleteAllFiles(Bucket bucket) {
+ return listObjectsInBucket(bucket(bucket), "") //
+ .buffer(500) //
+ .flatMap(list -> deleteObjectsFromS3Storage(bucket, list)) //
+ .doOnError(t -> logger.info("Deleted all files {}", t.getMessage())) //
+ .onErrorStop() //
+ .onErrorResume(t -> Flux.empty()); //
+
+ }
+
+ private Mono<DeleteObjectsResponse> deleteObjectsFromS3Storage(Bucket bucket, Collection<S3Object> objects) {
+ Collection<ObjectIdentifier> oids = new ArrayList<>();
+ for (S3Object o : objects) {
+ ObjectIdentifier oid = ObjectIdentifier.builder() //
+ .key(o.key()) //
+ .build();
+ oids.add(oid);
+ }
+
+ Delete delete = Delete.builder() //
+ .objects(oids) //
+ .build();
+
+ DeleteObjectsRequest request = DeleteObjectsRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .delete(delete) //
+ .build();
+
+ CompletableFuture<DeleteObjectsResponse> future = s3AsynchClient.deleteObjects(request);
+
+ return Mono.fromFuture(future);
+ }
+
+ private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
+ ListObjectsResponse prevResponse) {
+ ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
+ .bucket(bucket) //
+ .maxKeys(1000) //
+ .prefix(prefix);
+
+ if (prevResponse != null) {
+ if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
+ builder.marker(prevResponse.nextMarker());
+ } else {
+ return Mono.empty();
+ }
+ }
+
+ ListObjectsRequest listObjectsRequest = builder.build();
+ CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+ return Mono.fromFuture(future);
+ }
+
+ private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+
+ return listObjectsRequest(bucket, prefix, null) //
+ .expand(response -> listObjectsRequest(bucket, prefix, response)) //
+ .map(ListObjectsResponse::contents) //
+ .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
+ .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
+ .flatMap(Flux::fromIterable) //
+ .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
+ }
+
+ private Mono<DeleteBucketResponse> deleteBucketFromS3Storage(Bucket bucket) {
+ DeleteBucketRequest request = DeleteBucketRequest.builder() //
+ .bucket(bucket(bucket)) //
+ .build();
+
+ CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
+
+ return Mono.fromFuture(future);
+ }
+
+ private String bucket(Bucket bucket) {
+ return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
+ }
+
+ private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
+
+ PutObjectRequest request = PutObjectRequest.builder() //
+ .bucket(s3Bucket) //
+ .key(s3Key) //
+ .build();
+
+ AsyncRequestBody body = AsyncRequestBody.fromFile(fileName);
+
+ CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+ return Mono.fromFuture(future) //
+ .map(f -> s3Key) //
+ .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+
+ }
+
+ private Mono<HeadObjectResponse> getHeadObject(String bucket, String key) {
+ HeadObjectRequest request = HeadObjectRequest.builder().bucket(bucket).key(key).build();
+
+ CompletableFuture<HeadObjectResponse> future = s3AsynchClient.headObject(request);
+ return Mono.fromFuture(future);
+ }
+
+ private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
+
+ GetObjectRequest request = GetObjectRequest.builder() //
+ .bucket(bucket) //
+ .key(key) //
+ .build();
+
+ CompletableFuture<ResponseBytes<GetObjectResponse>> future =
+ s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
+
+ return Mono.fromFuture(future) //
+ .map(BytesWrapper::asByteArray) //
+ .doOnError(
+ t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket, t.getMessage())) //
+ .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+ .onErrorResume(t -> Mono.empty());
+ }
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
new file mode 100644
index 0000000..6aa7615
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+public class DatafileTaskException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public DatafileTaskException(String message) {
+ super(message);
+ }
+
+ public DatafileTaskException(String message, Exception originalException) {
+ super(message, originalException);
+ }
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
new file mode 100644
index 0000000..d49a051
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+/**
+ * Exception thrown when there is a problem with the Consul environment.
+ *
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+public class EnvironmentLoaderException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public EnvironmentLoaderException(String message) {
+ super(message);
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java
new file mode 100644
index 0000000..5c2a0d2
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/NonRetryableDatafileTaskException.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+public class NonRetryableDatafileTaskException extends DatafileTaskException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NonRetryableDatafileTaskException(String message) {
+ super(message);
+ }
+
+ public NonRetryableDatafileTaskException(String message, Exception originalException) {
+ super(message, originalException);
+ }
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java
new file mode 100644
index 0000000..aef5033
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClient.java
@@ -0,0 +1,230 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPReply;
+import org.apache.commons.net.ftp.FTPSClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.SecurityUtil;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.FileSystemResource;
+
+/**
+ * Gets file from PNF with FTPS protocol.
+ *
+ * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
+ */
+public class FtpesClient implements FileCollectClient {
+ private static final Logger logger = LoggerFactory.getLogger(FtpesClient.class);
+
+ private static final int DEFAULT_PORT = 21;
+
+ FTPSClient realFtpsClient = new FTPSClient();
+ private final FileServerData fileServerData;
+ private static TrustManager theTrustManager = null;
+ private static KeyManager theKeyManager = null;
+
+ private final Path keyCertPath;
+ private final String keyCertPasswordPath;
+ private final Path trustedCaPath;
+ private final String trustedCaPasswordPath;
+
+ /**
+ * Constructor.
+ *
+ * @param fileServerData info needed to connect to the PNF.
+ * @param keyCertPath path to DFC's key cert.
+ * @param keyCertPasswordPath path of file containing password for DFC's key
+ * cert.
+ * @param trustedCaPath path to the PNF's trusted keystore.
+ * @param trustedCaPasswordPath path of file containing password for the PNF's
+ * trusted keystore.
+ */
+ public FtpesClient(FileServerData fileServerData, Path keyCertPath, String keyCertPasswordPath, Path trustedCaPath,
+ String trustedCaPasswordPath) {
+ this.fileServerData = fileServerData;
+ this.keyCertPath = keyCertPath;
+ this.keyCertPasswordPath = keyCertPasswordPath;
+ this.trustedCaPath = trustedCaPath;
+ this.trustedCaPasswordPath = trustedCaPasswordPath;
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ try {
+ realFtpsClient.setNeedClientAuth(trustedCaPath != null);
+ realFtpsClient.setKeyManager(getKeyManager(keyCertPath, keyCertPasswordPath));
+ realFtpsClient.setTrustManager(getTrustManager(trustedCaPath, trustedCaPasswordPath));
+ setUpConnection();
+ } catch (DatafileTaskException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new DatafileTaskException("Could not open connection: " + e, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ logger.trace("starting to closeDownConnection");
+ if (realFtpsClient.isConnected()) {
+ try {
+ boolean logOut = realFtpsClient.logout();
+ logger.trace("logOut: {}", logOut);
+ } catch (Exception e) {
+ logger.trace("Unable to logout connection.", e);
+ }
+ try {
+ realFtpsClient.disconnect();
+ logger.trace("disconnected!");
+ } catch (Exception e) {
+ logger.trace("Unable to disconnect connection.", e);
+ }
+ }
+ }
+
+ @Override
+ public void collectFile(String remoteFileName, Path localFileName) throws DatafileTaskException {
+ logger.trace("collectFile called");
+
+ try (OutputStream output = createOutputStream(localFileName)) {
+ logger.trace("begin to retrieve from xNF.");
+ if (!realFtpsClient.retrieveFile(remoteFileName, output)) {
+ throw new NonRetryableDatafileTaskException(
+ "Could not retrieve file. No retry attempts will be done, file :" + remoteFileName);
+ }
+ } catch (IOException e) {
+ throw new DatafileTaskException("Could not fetch file: " + e, e);
+ }
+ logger.trace("collectFile fetched: {}", localFileName);
+ }
+
+ private static int getPort(Integer port) {
+ return port != null ? port : DEFAULT_PORT;
+ }
+
+ private void setUpConnection() throws DatafileTaskException, IOException {
+
+ realFtpsClient.connect(fileServerData.serverAddress, getPort(fileServerData.port));
+ logger.trace("after ftp connect");
+
+ if (!realFtpsClient.login(fileServerData.userId, fileServerData.password)) {
+ throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress);
+ }
+
+ if (FTPReply.isPositiveCompletion(realFtpsClient.getReplyCode())) {
+ realFtpsClient.enterLocalPassiveMode();
+ realFtpsClient.setFileType(FTP.BINARY_FILE_TYPE);
+ // Set protection buffer size
+ realFtpsClient.execPBSZ(0);
+ // Set data channel protection to private
+ realFtpsClient.execPROT("P");
+ realFtpsClient.setBufferSize(1024 * 1024);
+ } else {
+ throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress
+ + " xNF reply code: " + realFtpsClient.getReplyCode());
+ }
+
+ logger.trace("setUpConnection successfully!");
+ }
+
+ private TrustManager createTrustManager(Path trustedCaPath, String trustedCaPassword)
+ throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+ logger.trace("Creating trust manager from file: {}", trustedCaPath);
+ try (InputStream fis = createInputStream(trustedCaPath)) {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(fis, trustedCaPassword.toCharArray());
+ TrustManagerFactory factory = TrustManagerFactory.getInstance("SunX509");
+ factory.init(keyStore);
+ return factory.getTrustManagers()[0];
+ }
+ }
+
+ protected InputStream createInputStream(Path localFileName) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(localFileName);
+ return realResource.getInputStream();
+ }
+
+ protected OutputStream createOutputStream(Path localFileName) throws IOException, DatafileTaskException {
+ File localFile = localFileName.toFile();
+ if (!localFile.createNewFile()) {
+ logger.debug("Local file {} already created", localFileName);
+ throw new NonRetryableDatafileTaskException("Local file already created: " + localFileName);
+ }
+ OutputStream output = new FileOutputStream(localFile);
+ logger.trace("File {} opened xNF", localFileName);
+ return output;
+ }
+
+ protected TrustManager getTrustManager(Path trustedCaPath, String trustedCaPasswordPath)
+ throws KeyStoreException, NoSuchAlgorithmException, IOException, CertificateException {
+ synchronized (FtpesClient.class) {
+ if (theTrustManager == null && trustedCaPath != null) {
+ String trustedCaPassword = SecurityUtil.getTruststorePasswordFromFile(trustedCaPasswordPath);
+ theTrustManager = createTrustManager(trustedCaPath, trustedCaPassword);
+ }
+ return theTrustManager;
+ }
+ }
+
+ protected KeyManager getKeyManager(Path keyCertPath, String keyCertPasswordPath)
+ throws IOException, GeneralSecurityException {
+
+ synchronized (FtpesClient.class) {
+ if (theKeyManager == null) {
+ String keyCertPassword = SecurityUtil.getKeystorePasswordFromFile(keyCertPasswordPath);
+ theKeyManager = createKeyManager(keyCertPath, keyCertPassword);
+ }
+ return theKeyManager;
+ }
+ }
+
+ private KeyManager createKeyManager(Path keyCertPath, String keyCertPassword) throws IOException, KeyStoreException,
+ NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {
+ logger.trace("Creating key manager from file: {}", keyCertPath);
+ try (InputStream fis = createInputStream(keyCertPath)) {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(fis, keyCertPassword.toCharArray());
+ KeyManagerFactory factory = KeyManagerFactory.getInstance("SunX509");
+ factory.init(keyStore, keyCertPassword.toCharArray());
+ return factory.getKeyManagers()[0];
+ }
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
new file mode 100644
index 0000000..0c6db35
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -0,0 +1,151 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation, 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.SftpException;
+
+import java.nio.file.Path;
+
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Gets file from xNF with SFTP protocol.
+ *
+ * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
+ */
+public class SftpClient implements FileCollectClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+
+ private static final int SFTP_DEFAULT_PORT = 22;
+ private static final String STRICT_HOST_KEY_CHECKING = "StrictHostKeyChecking";
+
+ private final FileServerData fileServerData;
+ protected Session session = null;
+ protected ChannelSftp sftpChannel = null;
+ private final SftpClientSettings settings;
+
+ public SftpClient(FileServerData fileServerData, SftpClientSettings sftpConfig) {
+ this.fileServerData = fileServerData;
+ this.settings = sftpConfig;
+ }
+
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("collectFile {}", localFile);
+
+ try {
+ sftpChannel.get(remoteFile, localFile.toString());
+ logger.trace("File {} Download successful from xNF", localFile.getFileName());
+ } catch (SftpException e) {
+ boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED
+ && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
+ if (retry) {
+ throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e);
+ } else {
+ throw new NonRetryableDatafileTaskException(
+ "Unable to get file from xNF. No retry attempts will be done. Data: " + fileServerData, e);
+ }
+ }
+
+ logger.trace("collectFile OK");
+ }
+
+ @Override
+ public void close() {
+ logger.trace("closing sftp session");
+ if (sftpChannel != null) {
+ sftpChannel.exit();
+ sftpChannel = null;
+ }
+ if (session != null) {
+ session.disconnect();
+ session = null;
+ }
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ try {
+ if (session == null) {
+ session = setUpSession(fileServerData);
+ sftpChannel = getChannel(session);
+ }
+ } catch (JSchException e) {
+ boolean retry = !e.getMessage().contains("Auth fail");
+ if (retry) {
+ throw new DatafileTaskException("Could not open Sftp client. " + e);
+ } else {
+ throw new NonRetryableDatafileTaskException(
+ "Could not open Sftp client, no retry attempts will be done. " + e);
+ }
+ }
+ }
+
+ JSch createJsch() {
+ return new JSch();
+ }
+
+ private int getPort(Integer port) {
+ return port != null ? port : SFTP_DEFAULT_PORT;
+ }
+
+ private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ boolean useStrictHostChecking = this.settings.shouldUseStrictHostChecking();
+ JSch jsch = createJschClient(useStrictHostChecking);
+ return createJshSession(jsch, fileServerData, useStrictHostChecking);
+ }
+
+ private JSch createJschClient(boolean useStrictHostChecking) throws JSchException {
+ JSch jsch = createJsch();
+ if (useStrictHostChecking) {
+ jsch.setKnownHosts(this.settings.getKnownHostsFilePath());
+ }
+ return jsch;
+ }
+
+ private Session createJshSession(JSch jsch, FileServerData fileServerData, boolean useStrictHostKeyChecking)
+ throws JSchException {
+ Session newSession =
+ jsch.getSession(fileServerData.userId, fileServerData.serverAddress, getPort(fileServerData.port));
+ newSession.setConfig(STRICT_HOST_KEY_CHECKING, toYesNo(useStrictHostKeyChecking));
+ newSession.setPassword(fileServerData.password);
+ newSession.connect();
+ return newSession;
+ }
+
+ private String toYesNo(boolean useStrictHostKeyChecking) {
+ return useStrictHostKeyChecking ? "yes" : "no";
+ }
+
+ private ChannelSftp getChannel(Session session) throws JSchException {
+ Channel channel = session.openChannel("sftp");
+ channel.connect();
+ return (ChannelSftp) channel;
+ }
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettings.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettings.java
new file mode 100644
index 0000000..23e254b
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettings.java
@@ -0,0 +1,60 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import java.io.File;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpClientSettings {
+
+ private static final Logger logger = LoggerFactory.getLogger(SftpClientSettings.class);
+
+ private final SftpConfig sftpConfig;
+
+ public SftpClientSettings(SftpConfig sftpConfig) {
+ this.sftpConfig = sftpConfig;
+ }
+
+ public boolean shouldUseStrictHostChecking() {
+ boolean strictHostKeyChecking = false;
+ if (this.sftpConfig.strictHostKeyChecking) {
+ File file = new File(getKnownHostsFilePath());
+ strictHostKeyChecking = file.isFile();
+ logUsageOfStrictHostCheckingFlag(strictHostKeyChecking, file.getAbsolutePath());
+ } else {
+ logger.info("StrictHostKeyChecking will be disabled.");
+ }
+ return strictHostKeyChecking;
+ }
+
+ public String getKnownHostsFilePath() {
+ return this.sftpConfig.knownHostsFilePath;
+ }
+
+ private void logUsageOfStrictHostCheckingFlag(boolean strictHostKeyChecking, String filePath) {
+ if (strictHostKeyChecking) {
+ logger.info("StrictHostKeyChecking will be enabled with KNOWN_HOSTS_FILE_PATH [{}].", filePath);
+ } else {
+ logger.warn(
+ "StrictHostKeyChecking is enabled but environment variable KNOWN_HOSTS_FILE_PATH is not set or points to not existing file [{}] --> falling back to StrictHostKeyChecking='no'.",
+ filePath);
+ }
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
new file mode 100644
index 0000000..eab0082
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
@@ -0,0 +1,179 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.netty.resources.ConnectionProvider;
+
+/**
+ * Gets file from PNF with HTTP protocol.
+ *
+ * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
+ */
+public class DfcHttpClient implements FileCollectClient {
+
+ // Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
+ private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
+ private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class);
+ private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS);
+
+ private final FileServerData fileServerData;
+ private Disposable disposableClient;
+
+ protected HttpClient client;
+
+ public DfcHttpClient(FileServerData fileServerData) {
+ this.fileServerData = fileServerData;
+ }
+
+ @Override
+ public void open() throws DatafileTaskException {
+ logger.trace("Setting httpClient for file download.");
+
+ String authorizationContent = getAuthorizationContent();
+ this.client =
+ HttpClient.create(pool).keepAlive(true).headers(h -> h.add("Authorization", authorizationContent));
+
+ logger.trace("httpClient, auth header was set.");
+ }
+
+ protected String getAuthorizationContent() throws DatafileTaskException {
+ String jwtToken = HttpUtils.getJWTToken(fileServerData);
+ if (!jwtToken.isEmpty()) {
+ return HttpUtils.jwtAuthContent(jwtToken);
+ }
+ if (!HttpUtils.isBasicAuthDataFilled(fileServerData)) {
+ throw new DatafileTaskException("Not sufficient basic auth data for file.");
+ }
+ return HttpUtils.basicAuthContent(this.fileServerData.userId, this.fileServerData.password);
+ }
+
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("Prepare to collectFile {}", localFile);
+ CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Exception> errorMessage = new AtomicReference<>();
+
+ Consumer<Throwable> onError = processFailedConnectionWithServer(latch, errorMessage);
+ Consumer<InputStream> onSuccess = processDataFromServer(localFile, latch, errorMessage);
+
+ Flux<InputStream> responseContent = getServerResponse(remoteFile);
+ disposableClient = responseContent.subscribe(onSuccess, onError);
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new DatafileTaskException("Interrupted exception after datafile download - ", e);
+ }
+
+ if (isDownloadFailed(errorMessage)) {
+ if (errorMessage.get() instanceof NonRetryableDatafileTaskException) {
+ throw (NonRetryableDatafileTaskException) errorMessage.get();
+ }
+ throw (DatafileTaskException) errorMessage.get();
+ }
+
+ logger.trace("HTTP collectFile OK");
+ }
+
+ protected boolean isDownloadFailed(AtomicReference<Exception> errorMessage) {
+ return (errorMessage.get() != null);
+ }
+
+ protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch,
+ AtomicReference<Exception> errorMessages) {
+ return (Throwable response) -> {
+ Exception e = new Exception("Error in connection has occurred during file download", response);
+ errorMessages.set(new DatafileTaskException(response.getMessage(), e));
+ if (response instanceof NonRetryableDatafileTaskException) {
+ errorMessages.set(new NonRetryableDatafileTaskException(response.getMessage(), e));
+ }
+ latch.countDown();
+ };
+ }
+
+ protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
+ AtomicReference<Exception> errorMessages) {
+ return (InputStream response) -> {
+ logger.trace("Starting to process response.");
+ try {
+ long numBytes = Files.copy(response, localFile);
+ logger.trace("Transmission was successful - {} bytes downloaded.", numBytes);
+ logger.trace("CollectFile fetched: {}", localFile);
+ response.close();
+ } catch (IOException e) {
+ errorMessages.set(new DatafileTaskException("Error fetching file with", e));
+ } finally {
+ latch.countDown();
+ }
+ };
+ }
+
+ protected Flux<InputStream> getServerResponse(String remoteFile) {
+ return client.get().uri(HttpUtils.prepareHttpUri(fileServerData, remoteFile))
+ .response((responseReceiver, byteBufFlux) -> {
+ logger.trace("HTTP response status - {}", responseReceiver.status());
+ if (isResponseOk(responseReceiver)) {
+ return byteBufFlux.aggregate().asInputStream();
+ }
+ if (isErrorInConnection(responseReceiver)) {
+ return Mono.error(new NonRetryableDatafileTaskException(
+ HttpUtils.nonRetryableResponse(getResponseCode(responseReceiver))));
+ }
+ return Mono
+ .error(new DatafileTaskException(HttpUtils.retryableResponse(getResponseCode(responseReceiver))));
+ });
+ }
+
+ protected boolean isResponseOk(HttpClientResponse httpClientResponse) {
+ return getResponseCode(httpClientResponse) == 200;
+ }
+
+ private int getResponseCode(HttpClientResponse responseReceiver) {
+ return responseReceiver.status().code();
+ }
+
+ protected boolean isErrorInConnection(HttpClientResponse httpClientResponse) {
+ return getResponseCode(httpClientResponse) >= 400;
+ }
+
+ @Override
+ public void close() {
+ logger.trace("Starting http client disposal.");
+ disposableClient.dispose();
+ logger.trace("Http client disposed.");
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClient.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClient.java
new file mode 100644
index 0000000..5c652cb
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClient.java
@@ -0,0 +1,182 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.HttpHostConnectException;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Gets file from PNF with HTTPS protocol.
+ *
+ * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
+ */
+public class DfcHttpsClient implements FileCollectClient {
+
+ protected CloseableHttpClient httpsClient;
+
+ private static final Logger logger = LoggerFactory.getLogger(DfcHttpsClient.class);
+ private static final int FIFTEEN_SECONDS = 15 * 1000;
+
+ private final FileServerData fileServerData;
+ private final PoolingHttpClientConnectionManager connectionManager;
+
+ public DfcHttpsClient(FileServerData fileServerData, PoolingHttpClientConnectionManager connectionManager) {
+ this.fileServerData = fileServerData;
+ this.connectionManager = connectionManager;
+ }
+
+ @Override
+ public void open() {
+ logger.trace("Setting httpsClient for file download.");
+ SocketConfig socketConfig = SocketConfig.custom().setSoKeepAlive(true).build();
+
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(FIFTEEN_SECONDS).build();
+
+ httpsClient = HttpClients.custom().setConnectionManager(connectionManager).setDefaultSocketConfig(socketConfig)
+ .setDefaultRequestConfig(requestConfig).build();
+
+ logger.trace("httpsClient prepared for connection.");
+ }
+
+ @Override
+ public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+ logger.trace("Prepare to collectFile {}", localFile);
+ HttpGet httpGet = new HttpGet(HttpUtils.prepareHttpsUri(fileServerData, remoteFile));
+
+ String authorizationContent = getAuthorizationContent();
+ if (!authorizationContent.isEmpty()) {
+ httpGet.addHeader("Authorization", authorizationContent);
+ }
+ try {
+ HttpResponse httpResponse = makeCall(httpGet);
+ processResponse(httpResponse, localFile);
+ } catch (IOException e) {
+ logger.error("marker", e);
+ throw new DatafileTaskException("Error downloading file from server. ", e);
+ }
+ logger.trace("HTTPS collectFile OK");
+ }
+
+ private String getAuthorizationContent() throws DatafileTaskException {
+ String jwtToken = HttpUtils.getJWTToken(fileServerData);
+ if (shouldUseBasicAuth(jwtToken)) {
+ return HttpUtils.basicAuthContent(this.fileServerData.userId, this.fileServerData.password);
+ }
+ return HttpUtils.jwtAuthContent(jwtToken);
+ }
+
+ private boolean shouldUseBasicAuth(String jwtToken) throws DatafileTaskException {
+ return basicAuthValidNotPresentOrThrow() && jwtToken.isEmpty();
+ }
+
+ protected boolean basicAuthValidNotPresentOrThrow() throws DatafileTaskException {
+ if (isAuthDataEmpty()) {
+ return false;
+ }
+ if (HttpUtils.isBasicAuthDataFilled(fileServerData)) {
+ return true;
+ }
+ throw new DatafileTaskException("Not sufficient basic auth data for file.");
+ }
+
+ private boolean isAuthDataEmpty() {
+ return this.fileServerData.userId.isEmpty() && this.fileServerData.password.isEmpty();
+ }
+
+ protected HttpResponse makeCall(HttpGet httpGet) throws IOException, DatafileTaskException {
+ try {
+ HttpResponse httpResponse = executeHttpClient(httpGet);
+ if (isResponseOk(httpResponse)) {
+ return httpResponse;
+ }
+
+ EntityUtils.consume(httpResponse.getEntity());
+ if (isErrorInConnection(httpResponse)) {
+ logger.warn("Failed to download file, reason: {}, code: {}",
+ httpResponse.getStatusLine().getReasonPhrase(), httpResponse.getStatusLine());
+ throw new NonRetryableDatafileTaskException(HttpUtils.retryableResponse(getResponseCode(httpResponse)));
+ }
+ throw new DatafileTaskException(HttpUtils.nonRetryableResponse(getResponseCode(httpResponse)));
+ } catch (ConnectTimeoutException | UnknownHostException | HttpHostConnectException | SSLHandshakeException
+ | SSLPeerUnverifiedException e) {
+ logger.warn("Unable to get file from xNF: {}", e.getMessage());
+ throw new NonRetryableDatafileTaskException("Unable to get file from xNF. No retry attempts will be done.",
+ e);
+ }
+ }
+
+ protected CloseableHttpResponse executeHttpClient(HttpGet httpGet) throws IOException {
+ return httpsClient.execute(httpGet);
+ }
+
+ protected boolean isResponseOk(HttpResponse httpResponse) {
+ return getResponseCode(httpResponse) == 200;
+ }
+
+ private int getResponseCode(HttpResponse httpResponse) {
+ return httpResponse.getStatusLine().getStatusCode();
+ }
+
+ protected boolean isErrorInConnection(HttpResponse httpResponse) {
+ return getResponseCode(httpResponse) >= 400;
+ }
+
+ protected void processResponse(HttpResponse response, Path localFile) throws IOException {
+ logger.trace("Starting to process response.");
+ HttpEntity entity = response.getEntity();
+ InputStream stream = entity.getContent();
+ long numBytes = writeFile(localFile, stream);
+ stream.close();
+ EntityUtils.consume(entity);
+ logger.trace("Transmission was successful - {} bytes downloaded.", numBytes);
+ }
+
+ protected long writeFile(Path localFile, InputStream stream) throws IOException {
+ return Files.copy(stream, localFile, StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ @Override
+ public void close() {
+ logger.trace("Https client has ended downloading process.");
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 0000000..b003727
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,59 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+public class HttpAsyncClientBuilderWrapper {
+ HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+ public HttpAsyncClientBuilderWrapper setRedirectStrategy(RedirectStrategy redirectStrategy) {
+ builder.setRedirectStrategy(redirectStrategy);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setSslContext(SSLContext sslcontext) {
+ builder.setSSLContext(sslcontext);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setSslHostnameVerifier(HostnameVerifier hostnameVerifier) {
+ builder.setSSLHostnameVerifier(hostnameVerifier);
+ return this;
+ }
+
+ public HttpAsyncClientBuilderWrapper setDefaultRequestConfig(RequestConfig config) {
+ builder.setDefaultRequestConfig(config);
+ return this;
+ }
+
+ public CloseableHttpAsyncClient build() {
+ return builder.build();
+ }
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtil.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtil.java
new file mode 100644
index 0000000..7769e53
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtil.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.onap.dcaegen2.collectors.datafile.commons.SecurityUtil;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.FileSystemResource;
+
+/**
+ * Utility class supplying connection manager for HTTPS protocol.
+ *
+ * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
+ */
+public class HttpsClientConnectionManagerUtil {
+
+ private HttpsClientConnectionManagerUtil() {
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpsClientConnectionManagerUtil.class);
+ // Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
+ private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
+ private static PoolingHttpClientConnectionManager connectionManager;
+
+ public static PoolingHttpClientConnectionManager instance() throws DatafileTaskException {
+ if (connectionManager == null) {
+ throw new DatafileTaskException("ConnectionManager has to be set or update first");
+ }
+ return connectionManager;
+ }
+
+ public static void setupOrUpdate(String keyCertPath, String keyCertPasswordPath, String trustedCaPath,
+ String trustedCaPasswordPath, boolean useHostnameVerifier) throws DatafileTaskException {
+ synchronized (HttpsClientConnectionManagerUtil.class) {
+ if (connectionManager != null) {
+ connectionManager.close();
+ connectionManager = null;
+ }
+ setup(keyCertPath, keyCertPasswordPath, trustedCaPath, trustedCaPasswordPath, useHostnameVerifier);
+ }
+ logger.trace("HttpsConnectionManager setup or updated");
+ }
+
+ private static void setup(String keyCertPath, String keyCertPasswordPath, String trustedCaPath,
+ String trustedCaPasswordPath, boolean useHostnameVerifier) throws DatafileTaskException {
+ try {
+ SSLContextBuilder sslBuilder = SSLContexts.custom();
+ sslBuilder = supplyKeyInfo(keyCertPath, keyCertPasswordPath, sslBuilder);
+ if (!trustedCaPath.isEmpty()) {
+ sslBuilder = supplyTrustInfo(trustedCaPath, trustedCaPasswordPath, sslBuilder);
+ }
+
+ SSLContext sslContext = sslBuilder.build();
+
+ HostnameVerifier hostnameVerifier =
+ useHostnameVerifier ? new DefaultHostnameVerifier() : NoopHostnameVerifier.INSTANCE;
+
+ SSLConnectionSocketFactory sslConnectionSocketFactory =
+ new SSLConnectionSocketFactory(sslContext, new String[] {"TLSv1.2"}, null, hostnameVerifier);
+
+ Registry<ConnectionSocketFactory> socketFactoryRegistry =
+ RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslConnectionSocketFactory).build();
+
+ connectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+ connectionManager.setMaxTotal(MAX_NUMBER_OF_CONNECTIONS);
+
+ } catch (Exception e) {
+ throw new DatafileTaskException("Unable to prepare HttpsConnectionManager : ", e);
+ }
+ }
+
+ private static SSLContextBuilder supplyKeyInfo(String keyCertPath, String keyCertPasswordPath,
+ SSLContextBuilder sslBuilder) throws IOException, KeyStoreException, NoSuchAlgorithmException,
+ CertificateException, UnrecoverableKeyException {
+ String keyPass = SecurityUtil.getKeystorePasswordFromFile(keyCertPasswordPath);
+ KeyStore keyFile = createKeyStore(keyCertPath, keyPass);
+ return sslBuilder.loadKeyMaterial(keyFile, keyPass.toCharArray());
+ }
+
+ private static KeyStore createKeyStore(String path, String storePassword)
+ throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+ logger.trace("Creating manager from file: {}", path);
+ try (InputStream fis = createInputStream(path)) {
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ keyStore.load(fis, storePassword.toCharArray());
+ return keyStore;
+ }
+ }
+
+ private static InputStream createInputStream(String localFileName) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(Paths.get(localFileName));
+ return realResource.getInputStream();
+ }
+
+ private static SSLContextBuilder supplyTrustInfo(String trustedCaPath, String trustedCaPasswordPath,
+ SSLContextBuilder sslBuilder)
+ throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException {
+ String trustPass = SecurityUtil.getTruststorePasswordFromFile(trustedCaPasswordPath);
+ File trustStoreFile = new File(trustedCaPath);
+ return sslBuilder.loadTrustMaterial(trustStoreFile, trustPass.toCharArray());
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
new file mode 100644
index 0000000..2323d0e
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
@@ -0,0 +1,145 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Various counters that can be shown via a REST API.
+ *
+ */
+public class Counters {
+
+ private long noOfCollectedFiles = 0;
+ private long noOfFailedFtpAttempts = 0;
+ private long noOfFailedHttpAttempts = 0;
+ private long noOfFailedFtp = 0;
+ private long noOfFailedHttp = 0;
+ private long noOfFailedPublishAttempts = 0;
+ private long totalPublishedFiles = 0;
+ private long noOfFailedPublish = 0;
+ private Instant lastPublishedTime = Instant.MIN;
+ private long totalReceivedEvents = 0;
+ private Instant lastEventTime = Instant.MIN;
+
+ public final AtomicInteger threadPoolQueueSize = new AtomicInteger();
+
+ public synchronized void incNoOfReceivedEvents() {
+ totalReceivedEvents++;
+ lastEventTime = Instant.now();
+ }
+
+ public synchronized void incNoOfCollectedFiles() {
+ noOfCollectedFiles++;
+ }
+
+ public synchronized void incNoOfFailedFtpAttempts() {
+ noOfFailedFtpAttempts++;
+ }
+
+ public synchronized void incNoOfFailedHttpAttempts() {
+ noOfFailedHttpAttempts++;
+ }
+
+ public synchronized void incNoOfFailedFtp() {
+ noOfFailedFtp++;
+ }
+
+ public synchronized void incNoOfFailedHttp() {
+ noOfFailedHttp++;
+ }
+
+ public synchronized void incNoOfFailedPublishAttempts() {
+ noOfFailedPublishAttempts++;
+ }
+
+ public synchronized void incTotalPublishedFiles() {
+ totalPublishedFiles++;
+ lastPublishedTime = Instant.now();
+ }
+
+ public synchronized void incNoOfFailedPublish() {
+ noOfFailedPublish++;
+ }
+
+ @Override
+ public synchronized String toString() {
+ StringBuilder str = new StringBuilder();
+ str.append(format("totalReceivedEvents", totalReceivedEvents));
+ str.append(format("lastEventTime", lastEventTime));
+ str.append("\n");
+ str.append(format("collectedFiles", noOfCollectedFiles));
+ str.append(format("failedFtpAttempts", noOfFailedFtpAttempts));
+ str.append(format("failedHttpAttempts", noOfFailedHttpAttempts));
+ str.append(format("failedFtp", noOfFailedFtp));
+ str.append(format("failedHttp", noOfFailedHttp));
+ str.append("\n");
+ str.append(format("totalPublishedFiles", totalPublishedFiles));
+ str.append(format("lastPublishedTime", lastPublishedTime));
+
+ str.append(format("failedPublishAttempts", noOfFailedPublishAttempts));
+ str.append(format("noOfFailedPublish", noOfFailedPublish));
+
+ return str.toString();
+ }
+
+ private static String format(String name, Object value) {
+ String header = name + ":";
+ return String.format("%-24s%-22s%n", header, value);
+ }
+
+ public long getNoOfCollectedFiles() {
+ return noOfCollectedFiles;
+ }
+
+ public long getNoOfFailedFtpAttempts() {
+ return noOfFailedFtpAttempts;
+ }
+
+ public long getNoOfFailedHttpAttempts() {
+ return noOfFailedHttpAttempts;
+ }
+
+ public long getNoOfFailedFtp() {
+ return noOfFailedFtp;
+ }
+
+ public long getNoOfFailedHttp() {
+ return noOfFailedHttp;
+ }
+
+ public long getNoOfFailedPublishAttempts() {
+ return noOfFailedPublishAttempts;
+ }
+
+ public long getTotalPublishedFiles() {
+ return totalPublishedFiles;
+ }
+
+ public long getNoOfFailedPublish() {
+ return noOfFailedPublish;
+ }
+
+ public long getTotalReceivedEvents() {
+ return totalReceivedEvents;
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
new file mode 100644
index 0000000..3de5817
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -0,0 +1,162 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.Builder;
+
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.net.URIBuilder;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData.FileServerDataBuilder;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains data, from the fileReady event, about the file to collect from the
+ * xNF.
+ *
+ */
+@Builder
+public class FileData {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileData.class);
+
+ public FileReadyMessage.ArrayOfNamedHashMap fileInfo;
+
+ public FileReadyMessage.MessageMetaData messageMetaData;
+
+ public static Iterable<FileData> createFileData(FileReadyMessage msg) {
+ Collection<FileData> res = new ArrayList<>();
+ for (FileReadyMessage.ArrayOfNamedHashMap arr : msg.event.notificationFields.arrayOfNamedHashMap) {
+ FileData data = FileData.builder().fileInfo(arr).messageMetaData(msg.event.commonEventHeader).build();
+ res.add(data);
+ }
+ return res;
+ }
+
+ /**
+ * Get the name of the PNF, must be unique in the network.
+ *
+ * @return the name of the PNF, must be unique in the network
+ */
+ public String sourceName() {
+ return messageMetaData.sourceName;
+ }
+
+ public String name() {
+ return this.messageMetaData.sourceName + "/" + fileInfo.name;
+ }
+
+ /**
+ * Get the path to file to get from the PNF.
+ *
+ * @return the path to the file on the PNF.
+ */
+ public String remoteFilePath() {
+ return URI.create(fileInfo.hashMap.location).getPath();
+ }
+
+ public Scheme scheme() {
+ URI uri = URI.create(fileInfo.hashMap.location);
+ try {
+ return Scheme.getSchemeFromString(uri.getScheme());
+ } catch (Exception e) {
+ logger.warn("Could noit get scheme :{}", e.getMessage());
+ return Scheme.FTPES;
+ }
+ }
+
+ /**
+ * Get the path to the locally stored file.
+ *
+ * @return the path to the locally stored file.
+ */
+ public Path getLocalFilePath(AppConfig config) {
+ return Paths.get(config.collectedFilesPath, this.messageMetaData.sourceName, fileInfo.name);
+ }
+
+ /**
+ * Get the data about the file server where the file should be collected from.
+ * Query data included as it can contain JWT token
+ *
+ * @return the data about the file server where the file should be collected
+ * from.
+ */
+ public FileServerData fileServerData() {
+ URI uri = URI.create(fileInfo.hashMap.location);
+ Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
+
+ FileServerDataBuilder builder = FileServerData.builder() //
+ .serverAddress(uri.getHost()) //
+ .userId(userInfo.isPresent() ? userInfo.get()[0] : "") //
+ .password(userInfo.isPresent() ? userInfo.get()[1] : "");
+ if (uri.getPort() > 0) {
+ builder.port(uri.getPort());
+ }
+ URIBuilder uriBuilder = new URIBuilder(uri);
+ List<NameValuePair> query = uriBuilder.getQueryParams();
+ if (query != null && !query.isEmpty()) {
+ builder.queryParameters(query);
+ }
+ String fragment = uri.getRawFragment();
+ if (fragment != null && fragment.length() > 0) {
+ builder.uriRawFragment(fragment);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Extracts user name and password from the user info, if it they are given in
+ * the URI.
+ *
+ * @param userInfoString the user info string from the URI.
+ *
+ * @return An <code>Optional</code> containing a String array with the user name
+ * and password if given, or an empty
+ * <code>Optional</code> if not given.
+ */
+ private static Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ if (userInfoString != null) {
+ String[] userAndPassword = userInfoString.split(":");
+ if (userAndPassword.length == 2) {
+ return Optional.of(userAndPassword);
+ } else if (userAndPassword.length == 1)// if just user
+ {
+ String[] tab = new String[2];
+ tab[0] = userAndPassword[0];
+ tab[1] = "";// add empty password
+ return Optional.of(tab);
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
new file mode 100644
index 0000000..52e6413
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -0,0 +1,52 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+@Builder
+@EqualsAndHashCode
+public class FilePublishInformation {
+
+ String productName;
+
+ String vendorName;
+
+ long lastEpochMicrosec;
+
+ @Getter
+ String sourceName;
+
+ long startEpochMicrosec;
+
+ String timeZoneOffset;
+
+ String compression;
+
+ String fileFormatType;
+
+ String fileFormatVersion;
+
+ @Getter
+ String name;
+
+ String changeIdentifier;
+
+ String objectStoreBucket;
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
new file mode 100644
index 0000000..9ee461e
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -0,0 +1,111 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.util.List;
+
+import lombok.Builder;
+
+@Builder
+public class FileReadyMessage {
+
+ /**
+ * Meta data about a fileReady message.
+ */
+ @Builder
+ public static class MessageMetaData {
+
+ public String eventId;
+
+ public String priority;
+ public String version;
+ public String reportingEntityName;
+ public int sequence;
+ public String domain;
+
+ public String eventName;
+ public String vesEventListenerVersion;
+
+ public String sourceName;
+
+ public long lastEpochMicrosec;
+ public long startEpochMicrosec;
+
+ public String timeZoneOffset;
+
+ public String changeIdentifier;
+
+ /**
+ * Gets data from the event name. Defined as:
+ * {DomainAbbreviation}_{productName}-{vendorName}_{Description},
+ * example: Noti_RnNode-Ericsson_FileReady
+ *
+ */
+ public String productName() {
+ String[] eventArray = eventName.split("_|-");
+ if (eventArray.length >= 2) {
+ return eventArray[1];
+ } else {
+ return eventName;
+ }
+ }
+
+ public String vendorName() {
+ String[] eventArray = eventName.split("_|-");
+ if (eventArray.length >= 3) {
+ return eventArray[2];
+ } else {
+ return eventName;
+ }
+ }
+ }
+
+ @Builder
+ public static class FileInfo {
+ public String fileFormatType;
+ public String location;
+ public String fileFormatVersion;
+ public String compression;
+ }
+
+ @Builder
+ public static class ArrayOfNamedHashMap {
+ public String name;
+ public FileInfo hashMap;
+ }
+
+ @Builder
+ public static class NotificationFields {
+ public String notificationFieldsVersion;
+ public String changeType;
+ public String changeIdentifier;
+ public List<ArrayOfNamedHashMap> arrayOfNamedHashMap;
+ }
+
+ @Builder
+ public static class Event {
+ public MessageMetaData commonEventHeader;
+ public NotificationFields notificationFields;
+ }
+
+ public Event event;
+
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
new file mode 100644
index 0000000..208691f
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -0,0 +1,229 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Modifications Copyright (C) 2020-2021 Nokia. All rights reserved
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import java.util.Base64;
+import java.util.List;
+
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.http.HttpStatus;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HttpUtils implements HttpStatus {
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpUtils.class);
+ public static final int HTTP_DEFAULT_PORT = 80;
+ public static final int HTTPS_DEFAULT_PORT = 443;
+ public static final String JWT_TOKEN_NAME = "access_token";
+ public static final String AUTH_JWT_WARN = "Both JWT token and Basic auth data present. Omitting basic auth info.";
+ public static final String AUTH_JWT_ERROR =
+ "More than one JWT token present in the queryParameters. Omitting JWT token.";
+
+ private HttpUtils() {
+ }
+
+ public static String nonRetryableResponse(int responseCode) {
+ return "Unexpected response code - " + responseCode;
+ }
+
+ public static String retryableResponse(int responseCode) {
+ return "Unexpected response code - " + responseCode + ". No retry attempts will be done.";
+ }
+
+ public static boolean isSuccessfulResponseCodeWithDataRouter(Integer statusCode) {
+ return statusCode >= 200 && statusCode < 300;
+ }
+
+ public static boolean isBasicAuthDataFilled(final FileServerData fileServerData) {
+ return !fileServerData.userId.isEmpty() && !fileServerData.password.isEmpty();
+ }
+
+ public static String basicAuthContent(String username, String password) {
+ return "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
+ }
+
+ public static String jwtAuthContent(String token) {
+ return "Bearer " + token;
+ }
+
+ /**
+ * Prepare uri to retrieve file from xNF using HTTP connection. If JWT token was
+ * included
+ * in the queryParameters, it is removed. Other entries are rewritten.
+ *
+ * @param fileServerData fileServerData including - server address, port,
+ * queryParameters and uriRawFragment
+ * @param remoteFile file which has to be downloaded
+ * @return uri String representing the xNF HTTP location
+ */
+ public static String prepareHttpUri(FileServerData fileServerData, String remoteFile) {
+ return prepareUri("http", fileServerData, remoteFile, HTTP_DEFAULT_PORT);
+ }
+
+ /**
+ * Prepare uri to retrieve file from xNF using HTTPS connection. If JWT token
+ * was included
+ * in the queryParameters, it is removed. Other entries are rewritten.
+ *
+ * @param fileServerData fileServerData including - server address, port,
+ * queryParameters and uriRawFragment
+ * @param remoteFile file which has to be downloaded
+ * @return uri String representing the xNF HTTPS location
+ */
+ public static String prepareHttpsUri(FileServerData fileServerData, String remoteFile) {
+ return prepareUri("https", fileServerData, remoteFile, HTTPS_DEFAULT_PORT);
+ }
+
+ /**
+ * Prepare uri to retrieve file from xNF. If JWT token was included
+ * in the queryParameters, it is removed. Other entries are rewritten.
+ *
+ * @param scheme scheme which is used during the connection
+ * @param fileServerData fileServerData including - server address, port, query
+ * and fragment
+ * @param remoteFile file which has to be downloaded
+ * @param defaultPort default port which exchange empty entry for given
+ * connection type
+ * @return uri String representing the xNF location
+ */
+ public static String prepareUri(String scheme, FileServerData fileServerData, String remoteFile, int defaultPort) {
+ int port = fileServerData.port != null ? fileServerData.port : defaultPort;
+ String query = rewriteQueryWithoutToken(fileServerData.queryParameters);
+ String fragment = fileServerData.uriRawFragment;
+ if (!query.isEmpty()) {
+ query = "?" + query;
+ }
+ if (!fragment.isEmpty()) {
+ fragment = "#" + fragment;
+ }
+ return scheme + "://" + fileServerData.serverAddress + ":" + port + remoteFile + query + fragment;
+ }
+
+ /**
+ * Returns JWT token string (if single exist) from the queryParameters.
+ *
+ * @param fileServerData file server data which contain queryParameters where
+ * JWT token may exist
+ * @return JWT token value if single token entry exist or empty string
+ * elsewhere.
+ * If JWT token key has no value, empty string will be returned.
+ */
+ public static String getJWTToken(FileServerData fileServerData) {
+
+ if (fileServerData.queryParameters.isEmpty()) {
+ return "";
+ }
+ boolean jwtTokenKeyPresent = HttpUtils.isQueryWithSingleJWT(fileServerData.queryParameters);
+ if (!jwtTokenKeyPresent) {
+ return "";
+ }
+ String token = HttpUtils.getJWTToken(fileServerData.queryParameters);
+ if (HttpUtils.isBasicAuthDataFilled(fileServerData)) {
+ logger.warn(HttpUtils.AUTH_JWT_WARN);
+ }
+ return token;
+ }
+
+ /**
+ * Checks if the queryParameters contains single JWT token entry. Valid
+ * queryParameters
+ * contains only one token entry.
+ *
+ * @param query queryParameters
+ * @return true if queryParameters contains single token
+ */
+ public static boolean isQueryWithSingleJWT(List<NameValuePair> query) {
+ if (query == null) {
+ return false;
+ }
+ int i = getJWTTokenCount(query);
+ if (i == 0) {
+ return false;
+ }
+ if (i > 1) {
+ logger.error(AUTH_JWT_ERROR);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Returns the number of JWT token entries. Valid queryParameters contains only
+ * one token entry.
+ *
+ * @param queryElements elements of the queryParameters
+ * @return true if queryParameters contains single JWT token entry
+ */
+ public static int getJWTTokenCount(List<NameValuePair> queryElements) {
+ int i = 0;
+ for (NameValuePair element : queryElements) {
+ if (element.getName().equals(JWT_TOKEN_NAME)) {
+ i++;
+ }
+ }
+ return i;
+ }
+
+ private static String getJWTToken(List<NameValuePair> query) {
+ for (NameValuePair element : query) {
+ if (!element.getName().equals(JWT_TOKEN_NAME)) {
+ continue;
+ }
+ if (element.getValue() != null) {
+ return element.getValue();
+ }
+ return "";
+ }
+ return "";
+ }
+
+ /**
+ * Rewrites HTTP queryParameters without JWT token
+ *
+ * @param query list of NameValuePair of elements sent in the queryParameters
+ * @return String representation of queryParameters elements which were provided
+ * in the input
+ * Empty string is possible when queryParameters is empty or contains
+ * only access_token key.
+ */
+ public static String rewriteQueryWithoutToken(List<NameValuePair> query) {
+ if (query.isEmpty()) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (NameValuePair nvp : query) {
+ if (nvp.getName().equals(JWT_TOKEN_NAME)) {
+ continue;
+ }
+ sb.append(nvp.getName());
+ if (nvp.getValue() != null) {
+ sb.append("=");
+ sb.append(nvp.getValue());
+ }
+ sb.append("&");
+ }
+ if ((sb.length() > 0) && (sb.charAt(sb.length() - 1) == '&')) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
new file mode 100644
index 0000000..c127948
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
@@ -0,0 +1,301 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
+import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
+import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+import reactor.kafka.sender.SenderResult;
+import reactor.util.retry.Retry;
+
+/**
+ * This implements the main flow of the data file collector. Fetch file ready
+ * events from the
+ * message router, fetch new files from the PNF publish these in the data
+ * router.
+ */
+@Component
+public class CollectAndReportFiles {
+
+ private static Gson gson = new GsonBuilder() //
+ .disableHtmlEscaping() //
+ .create(); //
+
+ private static final int NUMBER_OF_WORKER_THREADS = 200;
+ private static final long FILE_TRANSFER_MAX_RETRIES = 2;
+ private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
+
+ private static final Logger logger = LoggerFactory.getLogger(CollectAndReportFiles.class);
+
+ private final AppConfig appConfig;
+
+ private Counters counters = new Counters();
+
+ private final KafkaSender<String, String> kafkaSender;
+
+ private final DataStore dataStore;
+
+ /**
+ * Constructor for task registration in Datafile Workflow.
+ *
+ * @param applicationConfiguration - application configuration
+ */
+ @Autowired
+ public CollectAndReportFiles(AppConfig applicationConfiguration) {
+ this.appConfig = applicationConfiguration;
+ this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
+ initCerts();
+
+ this.dataStore = DataStore.create(applicationConfiguration);
+
+ start();
+ }
+
+ private void initCerts() {
+ try {
+ CertificateConfig certificateConfig = appConfig.getCertificateConfiguration();
+ HttpsClientConnectionManagerUtil.setupOrUpdate(certificateConfig.keyCert, certificateConfig.keyPasswordPath,
+ certificateConfig.trustedCa, certificateConfig.trustedCaPasswordPath, true);
+ } catch (DatafileTaskException e) {
+ logger.error("Could not setup HttpsClient certs, reason: {}", e.getMessage());
+ }
+ }
+
+ /**
+ * Main function for scheduling for the file collection Workflow.
+ */
+ public void start() {
+ start(0);
+ }
+
+ private void start(int delayMillis) {
+ try {
+ logger.trace("Starting");
+ if (appConfig.isS3Enabled()) {
+ this.dataStore.create(Bucket.FILES).subscribe();
+ this.dataStore.create(Bucket.LOCKS).subscribe();
+ }
+ Thread.sleep(delayMillis);
+ createMainTask().subscribe(null, s -> start(2000), null);
+ } catch (Exception e) {
+ logger.error("Unexpected exception: {}", e.toString(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ Flux<FilePublishInformation> createMainTask() {
+ Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
+ return fetchFromKafka() //
+ .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
+ .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
+ .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
+ .runOn(scheduler) //
+ .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
+ .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
+ .flatMap(this::filterNotFetched, false, 1, 1) //
+ .flatMap(this::fetchFile, false, 1, 1) //
+ .flatMap(data -> reportFetchedFile(data, this.appConfig.collectedFileTopic), false, 1) //
+ .sequential() //
+ .doOnError(t -> logger.error("Received error: {}", t.toString())); //
+ }
+
+ private Mono<FileData> deleteLock(FileData info) {
+ return dataStore.deleteLock(lockName(info.name())).map(b -> info); //
+ }
+
+ private Mono<FilePublishInformation> moveFileToS3Bucket(FilePublishInformation info) {
+ if (this.appConfig.isS3Enabled()) {
+ return dataStore.copyFileTo(locaFilePath(info), info.getName())
+ .doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
+ .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
+ .map(f -> info) //
+ .doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
+ t.getMessage())) //
+ .doOnNext(n -> logger.debug("Stored file in S3: {}", info.getName())) //
+ .doOnNext(sig -> deleteLocalFile(info));
+ } else {
+ return Mono.just(info);
+ }
+ }
+
+ private Mono<FileData> filterNotFetched(FileData fileData) {
+ Path localPath = fileData.getLocalFilePath(this.appConfig);
+
+ return dataStore.fileExists(Bucket.FILES, fileData.name()) //
+ .filter(exists -> !exists) //
+ .filter(exists -> !localPath.toFile().exists()) //
+ .map(f -> fileData); //
+
+ }
+
+ private String lockName(String fileName) {
+ return fileName + ".lck";
+ }
+
+ private Path locaFilePath(FilePublishInformation info) {
+ return Paths.get(this.appConfig.collectedFilesPath, info.getName());
+ }
+
+ private void deleteLocalFile(FilePublishInformation info) {
+ Path path = locaFilePath(info);
+ try {
+ Files.delete(path);
+ } catch (Exception e) {
+ logger.warn("Could not delete local file: {}, reason:{}", path, e.getMessage());
+ }
+ }
+
+ private Flux<FilePublishInformation> reportFetchedFile(FilePublishInformation fileData, String topic) {
+ String json = gson.toJson(fileData);
+ return sendDataToStream(topic, fileData.getSourceName(), json) //
+ .map(result -> fileData);
+ }
+
+ public Flux<SenderResult<Integer>> sendDataToStream(String topic, String sourceName, String value) {
+ return sendDataToKafkaStream(Flux.just(senderRecord(topic, sourceName, value)));
+ }
+
+ private SenderRecord<String, String, Integer> senderRecord(String topic, String sourceName, String value) {
+ int correlationMetadata = 2;
+ String key = null;
+ var producerRecord = new ProducerRecord<>(topic, null, null, key, value, kafkaHeaders(sourceName));
+ return SenderRecord.create(producerRecord, correlationMetadata);
+ }
+
+ private Iterable<Header> kafkaHeaders(String sourceName) {
+ ArrayList<Header> result = new ArrayList<>();
+ Header h = new RecordHeader("SourceName", sourceName.getBytes());
+ result.add(h);
+ return result;
+ }
+
+ private Flux<SenderResult<Integer>> sendDataToKafkaStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
+
+ return kafkaSender.send(dataToSend) //
+ .doOnError(e -> logger.error("Send to kafka failed", e));
+ }
+
+ private SenderOptions<String, String> kafkaSenderOptions() {
+ String bootstrapServers = this.appConfig.getKafkaBootStrapServers();
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return SenderOptions.create(props);
+ }
+
+ public Counters getCounters() {
+ return this.counters;
+ }
+
+ protected FileCollector createFileCollector() {
+ return new FileCollector(appConfig, counters);
+ }
+
+ private Mono<FilePublishInformation> fetchFile(FileData fileData) {
+ return this.dataStore.createLock(lockName(fileData.name())).filter(granted -> granted) //
+ .map(granted -> createFileCollector()) //
+ .flatMap(collector -> collector.collectFile(fileData, FILE_TRANSFER_MAX_RETRIES,
+ FILE_TRANSFER_INITIAL_RETRY_TIMEOUT)) //
+ .flatMap(this::moveFileToS3Bucket) //
+ .doOnNext(b -> deleteLock(fileData).subscribe()) //
+ .doOnError(b -> deleteLock(fileData).subscribe()) //
+ .onErrorResume(exception -> handleFetchFileFailure(fileData, exception)); //
+ }
+
+ private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
+ Path localFilePath = fileData.getLocalFilePath(this.appConfig);
+ logger.error("File fetching failed, path {}, reason: {}", fileData.remoteFilePath(), t.getMessage());
+ deleteFile(localFilePath);
+ if (Scheme.isFtpScheme(fileData.scheme())) {
+ counters.incNoOfFailedFtp();
+ } else {
+ counters.incNoOfFailedHttp();
+ }
+ return Mono.empty();
+ }
+
+ /**
+ * Fetch more messages from the message router. This is done in a
+ * polling/blocking fashion.
+ */
+ private Flux<FileReadyMessage> fetchFromKafka() {
+ KafkaTopicListener listener = new KafkaTopicListener(this.appConfig.getKafkaBootStrapServers(),
+ this.appConfig.kafkaClientId, this.appConfig.fileReadyEventTopic);
+ return listener.getFlux() //
+ .flatMap(this::parseReceivedFileReadyMessage, 1);
+
+ }
+
+ Mono<FileReadyMessage> parseReceivedFileReadyMessage(KafkaTopicListener.DataFromTopic data) {
+ try {
+ FileReadyMessage msg = gson.fromJson(data.value, FileReadyMessage.class);
+ logger.debug("Received: {}", msg);
+ return Mono.just(msg);
+ } catch (Exception e) {
+ logger.warn("Could not parse received: {}, reason: {}", data.value, e.getMessage());
+ return Mono.empty();
+ }
+ }
+
+ private static void deleteFile(Path localFile) {
+ logger.trace("Deleting file: {}", localFile);
+ try {
+ Files.delete(localFile);
+ } catch (Exception e) {
+ logger.trace("Could not delete file: {}, reason: {}", localFile, e.getMessage());
+ }
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
new file mode 100644
index 0000000..b6c07e5
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -0,0 +1,187 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020-2022 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Optional;
+
+import org.apache.commons.io.FileUtils;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings;
+import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
+import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient;
+import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+/**
+ * Collects a file from a PNF.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class FileCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+ private final AppConfig appConfig;
+ private final Counters counters;
+
+ /**
+ * Constructor.
+ *
+ * @param appConfig application configuration
+ */
+ public FileCollector(AppConfig appConfig, Counters counters) {
+ this.appConfig = appConfig;
+ this.counters = counters;
+ }
+
+ /**
+ * Collects a file from the PNF and stores it in the local file system.
+ *
+ * @param fileData data about the file to collect.
+ * @param numRetries the number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
+ * @param contextMap context for logging.
+ * @return the data needed to publish the file.
+ */
+ public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
+
+ logger.trace("Entering collectFile with {}", fileData);
+
+ return Mono.just(fileData) //
+ .cache() //
+ .flatMap(fd -> tryCollectFile(fileData)) //
+ .retryWhen(Retry.backoff(numRetries, firstBackoff)) //
+ .flatMap(FileCollector::checkCollectedFile);
+ }
+
+ private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+ if (info.isPresent()) {
+ return Mono.just(info.get());
+ } else {
+ // If there is no info, the file is not retrievable
+ return Mono.error(new DatafileTaskException("Non retryable file transfer failure"));
+ }
+ }
+
+ private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData) {
+ logger.trace("starting to collectFile {}", fileData.fileInfo.name);
+
+ final String remoteFile = fileData.remoteFilePath();
+ final Path localFile = fileData.getLocalFilePath(this.appConfig);
+
+ try (FileCollectClient currentClient = createClient(fileData)) {
+ currentClient.open();
+ FileUtils.forceMkdirParent(localFile.toFile());
+ currentClient.collectFile(remoteFile, localFile);
+ counters.incNoOfCollectedFiles();
+ return Mono.just(Optional.of(createFilePublishInformation(fileData)));
+ } catch (NonRetryableDatafileTaskException nre) {
+ logger.warn("Failed to download file, not retryable: {} {}, reason: {}", fileData.sourceName(),
+ fileData.fileInfo.name, nre.getMessage());
+ incFailedAttemptsCounter(fileData);
+ return Mono.just(Optional.empty()); // Give up
+ } catch (DatafileTaskException e) {
+ logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
+ e.getMessage());
+ incFailedAttemptsCounter(fileData);
+ return Mono.error(e);
+ } catch (Exception throwable) {
+ logger.warn("Failed to close client: {} {}, reason: {}", fileData.sourceName(), fileData.fileInfo.name,
+ throwable.getMessage(), throwable);
+ return Mono.just(Optional.of(createFilePublishInformation(fileData)));
+ }
+ }
+
+ private void incFailedAttemptsCounter(FileData fileData) {
+ if (Scheme.isFtpScheme(fileData.scheme())) {
+ counters.incNoOfFailedFtpAttempts();
+ } else {
+ counters.incNoOfFailedHttpAttempts();
+ }
+ }
+
+ private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
+ switch (fileData.scheme()) {
+ case SFTP:
+ return createSftpClient(fileData);
+ case FTPES:
+ return createFtpesClient(fileData);
+ case HTTP:
+ return createHttpClient(fileData);
+ case HTTPS:
+ return createHttpsClient(fileData);
+ default:
+ throw new DatafileTaskException("Unhandled protocol: " + fileData.scheme());
+ }
+ }
+
+ public FilePublishInformation createFilePublishInformation(FileData fileData) {
+ FileReadyMessage.MessageMetaData metaData = fileData.messageMetaData;
+ return FilePublishInformation.builder() //
+ .productName(metaData.productName()) //
+ .vendorName(metaData.vendorName()) //
+ .lastEpochMicrosec(metaData.lastEpochMicrosec) //
+ .sourceName(metaData.sourceName) //
+ .startEpochMicrosec(metaData.startEpochMicrosec) //
+ .timeZoneOffset(metaData.timeZoneOffset) //
+ .name(metaData.sourceName + "/" + fileData.fileInfo.name) //
+ .compression(fileData.fileInfo.hashMap.compression) //
+ .fileFormatType(fileData.fileInfo.hashMap.fileFormatType) //
+ .fileFormatVersion(fileData.fileInfo.hashMap.fileFormatVersion) //
+ .changeIdentifier(fileData.messageMetaData.changeIdentifier) //
+ .objectStoreBucket(this.appConfig.isS3Enabled() ? this.appConfig.getS3Bucket() : null) //
+ .build();
+ }
+
+ protected SftpClient createSftpClient(FileData fileData) {
+ return new SftpClient(fileData.fileServerData(), new SftpClientSettings(appConfig.getSftpConfiguration()));
+ }
+
+ protected FtpesClient createFtpesClient(FileData fileData) throws DatafileTaskException {
+ CertificateConfig config = appConfig.getCertificateConfiguration();
+ Path trustedCa = config.trustedCa.isEmpty() ? null : Paths.get(config.trustedCa);
+
+ return new FtpesClient(fileData.fileServerData(), Paths.get(config.keyCert), config.keyPasswordPath, trustedCa,
+ config.trustedCaPasswordPath);
+ }
+
+ protected FileCollectClient createHttpClient(FileData fileData) {
+ return new DfcHttpClient(fileData.fileServerData());
+ }
+
+ protected FileCollectClient createHttpsClient(FileData fileData) throws DatafileTaskException {
+ return new DfcHttpsClient(fileData.fileServerData(), HttpsClientConnectionManagerUtil.instance());
+ }
+}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/KafkaTopicListener.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/KafkaTopicListener.java
new file mode 100644
index 0000000..969e5fa
--- /dev/null
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/KafkaTopicListener.java
@@ -0,0 +1,106 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.ToString;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+
+/**
+ * The class streams incoming requests from a Kafka topic and sends them further
+ * to a multi cast sink, which several other streams can connect to.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class KafkaTopicListener {
+
+ @ToString
+ public static class DataFromTopic {
+ public final String key;
+ public final String value;
+
+ public DataFromTopic(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
+
+ private final String inputTopic;
+ private final String kafkaBoostrapServers;
+ private final String kafkaClientId;
+ private Flux<DataFromTopic> dataFromTopic;
+
+ public KafkaTopicListener(String kafkaBoostrapServers, String clientId, String topic) {
+ this.kafkaClientId = clientId;
+ this.kafkaBoostrapServers = kafkaBoostrapServers;
+ this.inputTopic = topic;
+ }
+
+ public Flux<DataFromTopic> getFlux() {
+ if (this.dataFromTopic == null) {
+ this.dataFromTopic = startReceiveFromTopic();
+ }
+ return this.dataFromTopic;
+ }
+
+ private Flux<DataFromTopic> startReceiveFromTopic() {
+ logger.debug("Listening to kafka topic: {}, client id: {}", this.inputTopic, this.kafkaClientId);
+ return KafkaReceiver.create(kafkaInputProperties()) //
+ .receive() //
+ .doOnNext(input -> logger.debug("Received from kafka topic: {} :{}", this.inputTopic, input.value())) //
+ .doOnError(t -> logger.error("KafkaTopicReceiver error: {}", t.getMessage())) //
+ .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
+ .doFinally(sig -> this.dataFromTopic = null) //
+ .filter(t -> !t.value().isEmpty() || !t.key().isEmpty()) //
+ .map(input -> new DataFromTopic(input.key(), input.value())) //
+ .publish() //
+ .autoConnect();
+ }
+
+ private ReceiverOptions<String, String> kafkaInputProperties() {
+ Map<String, Object> consumerProps = new HashMap<>();
+ if (this.kafkaBoostrapServers.isEmpty()) {
+ logger.error("No kafka boostrap server is setup");
+ }
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaBoostrapServers);
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adapter-" + inputTopic);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, this.kafkaClientId);
+
+ return ReceiverOptions.<String, String>create(consumerProps)
+ .subscription(Collections.singleton(this.inputTopic));
+ }
+
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java
new file mode 100644
index 0000000..16a36c0
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java
@@ -0,0 +1,341 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2020 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
+import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.tasks.CollectAndReportFiles;
+import org.onap.dcaegen2.collectors.datafile.tasks.FileCollector;
+import org.onap.dcaegen2.collectors.datafile.tasks.KafkaTopicListener;
+import org.onap.dcaegen2.collectors.datafile.tasks.KafkaTopicListener.DataFromTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.TestPropertySource;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
+@TestPropertySource(properties = { //
+ "app.ssl.key-store-password-file=./config/ftps_keystore.pass", //
+ "app.ssl.key-store=./config/ftps_keystore.p12", //
+ "app.ssl.trust-store-password-file=./config/truststore.pass", //
+ "app.ssl.trust-store=", // No trust validation
+ "app.collected-files-path=/tmp/osc_datafile/", //
+ "logging.file.name=/tmp/datafile.log", //
+ "spring.main.allow-bean-definition-overriding=true", //
+ "app.s3.endpointOverride=http://localhost:9000", //
+ "app.s3.accessKeyId=minio", //
+ "app.s3.secretAccessKey=miniostorage", //
+ "app.s3.bucket=ropfiles", //
+ "app.s3.locksBucket=locks" })
+@SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
+class MockDatafile {
+
+ private static final int LAST_EPOCH_MICROSEC = 151983;
+ private static final String SOURCE_NAME = "5GRAN_DU";
+ private static final int START_EPOCH_MICROSEC = 15198378;
+ private static final String TIME_ZONE_OFFSET = "UTC+05:00";
+ private static final String PM_FILE_NAME = "PM_FILE_NAME";
+
+ // This can be any downloadable file on the net
+ private static final String FTPES_LOCATION = "ftpes:// onap:pano@ftp-ftpes-6:2021/A20000626.2315+0200-2330+0200_GNODEB-15-4.xml.gz";
+ private static final String LOCATION = "https://launchpad.net/ubuntu/+source/perf-tools-unstable/1.0+git7ffb3fd-1ubuntu1/+build/13630748/+files/perf-tools-unstable_1.0+git7ffb3fd-1ubuntu1_all.deb";
+ private static final String GZIP_COMPRESSION = "gzip";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String CHANGE_TYPE = "FileReady";
+
+ private static final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
+ private static Gson gson = new GsonBuilder() //
+ .disableHtmlEscaping() //
+ .create(); //
+
+ @LocalServerPort
+ private int port;
+
+ @Autowired
+ AppConfig appConfig;
+
+ @Autowired
+ CollectAndReportFiles scheduledTask;
+
+ private static KafkaReceiver kafkaReceiver;
+
+ private static class KafkaReceiver {
+ public final String topic;
+ private DataFromTopic receivedKafkaOutput;
+ private final Logger logger = LoggerFactory.getLogger(MockDatafile.class);
+
+ int count = 0;
+
+ public KafkaReceiver(AppConfig applicationConfig, String outputTopic) {
+ this.topic = outputTopic;
+
+ // Create a listener to the output topic. The KafkaTopicListener happens to be
+ // suitable for that,
+
+ KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig.getKafkaBootStrapServers(),
+ "MockDatafile", outputTopic);
+
+ topicListener.getFlux() //
+ .doOnNext(this::set) //
+ .doFinally(sig -> logger.info("Finally " + sig)) //
+ .subscribe();
+ }
+
+ private void set(DataFromTopic receivedKafkaOutput) {
+ this.receivedKafkaOutput = receivedKafkaOutput;
+ this.count++;
+ logger.debug("*** received {}, {}", topic, receivedKafkaOutput);
+ }
+
+ public synchronized String lastKey() {
+ return this.receivedKafkaOutput.key;
+ }
+
+ public synchronized String lastValue() {
+ return this.receivedKafkaOutput.value;
+ }
+
+ public void reset() {
+ count = 0;
+ this.receivedKafkaOutput = new DataFromTopic("", "");
+ }
+ }
+
+ static class FileCollectorMock extends FileCollector {
+ final AppConfig appConfig;
+
+ public FileCollectorMock(AppConfig appConfig) {
+ super(appConfig, new Counters());
+ this.appConfig = appConfig;
+ }
+
+ @Override // (override fetchFile to disable the actual file fetching)
+ public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
+ FileCollector fc = new FileCollector(this.appConfig, new Counters());
+ FilePublishInformation i = fc.createFilePublishInformation(fileData);
+
+ try {
+ File from = new File("config/application.yaml");
+ File to = new File(this.appConfig.collectedFilesPath + "/" + fileData.name());
+ FileUtils.forceMkdirParent(to);
+ com.google.common.io.Files.copy(from, to);
+ } catch (Exception e) {
+ logger.error("Could not copy file {}", e.getMessage());
+ }
+ return Mono.just(i);
+ }
+ }
+
+ static class CollectAndReportFilesMock extends CollectAndReportFiles {
+ final AppConfig appConfig;
+
+ public CollectAndReportFilesMock(AppConfig appConfig) {
+ super(appConfig);
+ this.appConfig = appConfig;
+ }
+
+ @Override // (override fetchFile to disable the actual file fetching)
+ protected FileCollector createFileCollector() {
+ return new FileCollectorMock(appConfig);
+ }
+ }
+
+ @TestConfiguration
+ static class TestBeanFactory {
+
+ @Bean
+ CollectAndReportFiles collectAndReportFiles(@Autowired AppConfig appConfig) {
+ return new CollectAndReportFilesMock(appConfig);
+ }
+ }
+
+ @BeforeEach
+ void init() {
+ if (kafkaReceiver == null) {
+ kafkaReceiver = new KafkaReceiver(this.appConfig, this.appConfig.collectedFileTopic);
+ }
+ kafkaReceiver.reset();
+ deleteAllFiles();
+ }
+
+ @AfterEach
+ void afterEach() {
+ DataStore store = DataStore.create(this.appConfig);
+ store.deleteBucket(Bucket.FILES).block();
+ store.deleteBucket(Bucket.LOCKS).block();
+ deleteAllFiles();
+
+ }
+
+ private void deleteAllFiles() {
+
+ try {
+ FileUtils.deleteDirectory(new File(this.appConfig.collectedFilesPath));
+ } catch (IOException e) {
+ }
+ }
+
+ @Test
+ void clear() {
+
+ }
+
+ @Test
+ void testKafka() throws InterruptedException {
+ waitForKafkaListener();
+
+ this.scheduledTask.sendDataToStream(this.appConfig.fileReadyEventTopic, "key", "junk").blockLast();
+
+ String fileReadyMessage = gson.toJson(fileReadyMessage());
+ this.scheduledTask.sendDataToStream(this.appConfig.fileReadyEventTopic, "key", fileReadyMessage).blockLast();
+
+ await().untilAsserted(() -> assertThat(kafkaReceiver.count).isEqualTo(1));
+ String rec = kafkaReceiver.lastValue();
+
+ assertThat(rec).contains("Ericsson");
+
+ FilePublishInformation recObj = gson.fromJson(rec, FilePublishInformation.class);
+
+ assertThat(recObj.getName()).isEqualTo(SOURCE_NAME + "/" + PM_FILE_NAME);
+ }
+
+ @Test
+ void testS3Concurrency() throws Exception {
+ waitForKafkaListener();
+
+ final int NO_OF_OBJECTS = 10;
+
+ Instant startTime = Instant.now();
+
+ Flux.range(1, NO_OF_OBJECTS) //
+ .map(i -> gson.toJson(fileReadyMessage("testS3Concurrency_" + i))) //
+ .flatMap(fileReadyMessage -> scheduledTask.sendDataToStream(appConfig.fileReadyEventTopic, "key",
+ fileReadyMessage)) //
+ .blockLast(); //
+
+ while (kafkaReceiver.count < NO_OF_OBJECTS) {
+ logger.info("sleeping {}", kafkaReceiver.count);
+ Thread.sleep(1000 * 1);
+ }
+
+ String rec = kafkaReceiver.lastValue();
+ assertThat(rec).contains("Ericsson");
+
+ final long durationSeconds = Instant.now().getEpochSecond() - startTime.getEpochSecond();
+ logger.info("*** Duration :" + durationSeconds + ", objects/second: " + NO_OF_OBJECTS / durationSeconds);
+ }
+
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ private static void waitForKafkaListener() throws InterruptedException {
+ Thread.sleep(4000);
+ }
+
+ @Test
+ @SuppressWarnings("squid:S2699")
+ void runMock() throws Exception {
+ logger.warn("**************** Keeping server alive! " + this.port);
+ synchronized (this) {
+ this.wait();
+ }
+ }
+
+ FileReadyMessage.Event event(String fileName) {
+ MessageMetaData messageMetaData = MessageMetaData.builder() //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .eventName("Noti_RnNode-Ericsson_FileReady").build();
+
+ FileReadyMessage.FileInfo fileInfo = FileReadyMessage.FileInfo //
+ .builder() //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .location(LOCATION) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .compression(GZIP_COMPRESSION) //
+ .build();
+
+ FileReadyMessage.ArrayOfNamedHashMap arrayOfNamedHashMap = FileReadyMessage.ArrayOfNamedHashMap //
+ .builder() //
+ .name(fileName) //
+ .hashMap(fileInfo) //
+ .build();
+
+ List<FileReadyMessage.ArrayOfNamedHashMap> arrayOfNamedHashMapList = new ArrayList<>();
+ arrayOfNamedHashMapList.add(arrayOfNamedHashMap);
+
+ FileReadyMessage.NotificationFields notificationFields = FileReadyMessage.NotificationFields //
+ .builder().notificationFieldsVersion("notificationFieldsVersion") //
+ .changeType(CHANGE_TYPE).changeIdentifier(CHANGE_IDENTIFIER) //
+ .arrayOfNamedHashMap(arrayOfNamedHashMapList) //
+ .build();
+
+ return FileReadyMessage.Event.builder() //
+ .commonEventHeader(messageMetaData) //
+ .notificationFields(notificationFields).build();
+ }
+
+ private FileReadyMessage fileReadyMessage(String fileName) {
+ FileReadyMessage message = FileReadyMessage.builder() //
+ .event(event(fileName)) //
+ .build();
+ return message;
+ }
+
+ private FileReadyMessage fileReadyMessage() {
+ return fileReadyMessage(PM_FILE_NAME);
+ }
+
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
new file mode 100644
index 0000000..8bc6330
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
@@ -0,0 +1,73 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.controllers;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doReturn;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+import org.onap.dcaegen2.collectors.datafile.tasks.CollectAndReportFiles;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.ResponseEntity;
+import reactor.core.publisher.Mono;
+
+@ExtendWith(MockitoExtension.class)
+public class StatusControllerTest {
+ @Mock
+ CollectAndReportFiles scheduledTasksMock;
+
+ StatusController controllerUnderTest;
+
+ @BeforeEach
+ public void setup() {
+ controllerUnderTest = new StatusController(scheduledTasksMock);
+ }
+
+ @Test
+ public void heartbeat_success() {
+ HttpHeaders httpHeaders = new HttpHeaders();
+
+ Mono<ResponseEntity<String>> result = controllerUnderTest.heartbeat(httpHeaders);
+
+ String body = result.block().getBody();
+ assertTrue(body.startsWith("I'm living!"));
+ }
+
+ @Test
+ public void status() {
+ Counters counters = new Counters();
+ doReturn(counters).when(scheduledTasksMock).getCounters();
+
+ HttpHeaders httpHeaders = new HttpHeaders();
+
+ Mono<ResponseEntity<String>> result = controllerUnderTest.status(httpHeaders);
+
+ String body = result.block().getBody();
+ System.out.println(body);
+ }
+
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java
new file mode 100644
index 0000000..3423826
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpesClientTest.java
@@ -0,0 +1,238 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPSClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.springframework.http.HttpStatus;
+
+public class FtpesClientTest {
+
+ private static final String REMOTE_FILE_PATH = "/dir/sample.txt";
+ private static final Path LOCAL_FILE_PATH = Paths.get("target/sample.txt");
+ private static final String XNF_ADDRESS = "127.0.0.1";
+ private static final int PORT = 8021;
+ private static final String FTP_KEY_PATH = "ftpKeyPath";
+ private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
+ private static final Path TRUSTED_CA_PATH = Paths.get("trustedCaPath");
+ private static final String TRUSTED_CA_PASSWORD = "trustedCaPassword";
+
+ private static final String USERNAME = "bob";
+ private static final String PASSWORD = "123";
+
+ private FTPSClient ftpsClientMock = mock(FTPSClient.class);
+ private KeyManager keyManagerMock = mock(KeyManager.class);
+ private TrustManager trustManagerMock = mock(TrustManager.class);
+ private InputStream inputStreamMock = mock(InputStream.class);
+ private OutputStream outputStreamMock = mock(OutputStream.class);
+
+ FtpesClient clientUnderTestSpy;
+
+ private FileServerData createFileServerData() {
+ return FileServerData.builder() //
+ .serverAddress(XNF_ADDRESS) //
+ .userId(USERNAME).password(PASSWORD) //
+ .port(PORT) //
+ .build();
+ }
+
+ @BeforeEach
+ protected void setUp() throws Exception {
+ clientUnderTestSpy = spy(new FtpesClient(createFileServerData(), Paths.get(FTP_KEY_PATH), FTP_KEY_PASSWORD,
+ TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD));
+ clientUnderTestSpy.realFtpsClient = ftpsClientMock;
+ }
+
+ private void verifyFtpsClientMock_openOk() throws Exception {
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+
+ when(ftpsClientMock.retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH),
+ ArgumentMatchers.any(OutputStream.class))).thenReturn(true);
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ verify(ftpsClientMock).getReplyCode();
+ verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
+ verify(ftpsClientMock).execPBSZ(0);
+ verify(ftpsClientMock).execPROT("P");
+ verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+ verify(ftpsClientMock).setBufferSize(1024 * 1024);
+ }
+
+ @Test
+ public void collectFile_allOk() throws Exception {
+
+ doReturn(keyManagerMock).when(clientUnderTestSpy).getKeyManager(Paths.get(FTP_KEY_PATH), FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
+ doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
+
+ clientUnderTestSpy.open();
+
+ doReturn(true).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
+ clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
+
+ doReturn(true).when(ftpsClientMock).isConnected();
+ clientUnderTestSpy.close();
+
+ verifyFtpsClientMock_openOk();
+ verify(ftpsClientMock, times(1)).isConnected();
+ verify(ftpsClientMock, times(1)).logout();
+ verify(ftpsClientMock, times(1)).disconnect();
+ verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any());
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void collectFileFaultyOwnKey_shouldFail() throws Exception {
+
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessageContaining("Could not open connection: java.io.FileNotFoundException:");
+
+ verify(ftpsClientMock).setNeedClientAuth(true);
+
+ doReturn(false).when(ftpsClientMock).isConnected();
+ clientUnderTestSpy.close();
+ verify(ftpsClientMock).isConnected();
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void collectFileFaultTrustedCA_shouldFail_no_trustedCA_file() throws Exception {
+
+ doReturn(keyManagerMock).when(clientUnderTestSpy).getKeyManager(Paths.get(FTP_KEY_PATH), FTP_KEY_PASSWORD);
+ doThrow(new IOException("problem")).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Could not open connection: java.io.IOException: problem");
+ }
+
+ @Test
+ public void collectFileFaultTrustedCA_shouldFail_empty_trustedCA_file() throws Exception {
+
+ doReturn(keyManagerMock).when(clientUnderTestSpy).getKeyManager(Paths.get(FTP_KEY_PATH), FTP_KEY_PASSWORD);
+ doReturn(inputStreamMock).when(clientUnderTestSpy).createInputStream(TRUSTED_CA_PATH);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Could not open connection: java.io.EOFException");
+ }
+
+ @Test
+ public void collectFileFaultyLogin_shouldFail() throws Exception {
+
+ doReturn(keyManagerMock).when(clientUnderTestSpy).getKeyManager(Paths.get(FTP_KEY_PATH), FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(false).when(ftpsClientMock).login(USERNAME, PASSWORD);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.open()).hasMessage("Unable to log in to xNF. 127.0.0.1");
+
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ }
+
+ @Test
+ public void collectFileBadRequestResponse_shouldFail() throws Exception {
+ doReturn(keyManagerMock).when(clientUnderTestSpy).getKeyManager(Paths.get(FTP_KEY_PATH), FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
+ doReturn(503).when(ftpsClientMock).getReplyCode();
+
+ assertThatThrownBy(() -> clientUnderTestSpy.open())
+ .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
+
+ verify(ftpsClientMock).setNeedClientAuth(true);
+ verify(ftpsClientMock).setKeyManager(keyManagerMock);
+ verify(ftpsClientMock).setTrustManager(trustManagerMock);
+ verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+ verify(ftpsClientMock).login(USERNAME, PASSWORD);
+ verify(ftpsClientMock, times(2)).getReplyCode();
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void collectFile_shouldFail() throws Exception {
+ doReturn(keyManagerMock).when(clientUnderTestSpy).getKeyManager(Paths.get(FTP_KEY_PATH), FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
+ doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
+ clientUnderTestSpy.open();
+
+ doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessageContaining(REMOTE_FILE_PATH).hasMessageContaining("No retry");
+
+ verifyFtpsClientMock_openOk();
+ verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any());
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+
+ @Test
+ public void collectFile_shouldFail_ioexception() throws Exception {
+ doReturn(keyManagerMock).when(clientUnderTestSpy).getKeyManager(Paths.get(FTP_KEY_PATH), FTP_KEY_PASSWORD);
+ doReturn(trustManagerMock).when(clientUnderTestSpy).getTrustManager(TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD);
+ doReturn(outputStreamMock).when(clientUnderTestSpy).createOutputStream(LOCAL_FILE_PATH);
+ doReturn(true).when(ftpsClientMock).login(USERNAME, PASSWORD);
+ doReturn(HttpStatus.OK.value()).when(ftpsClientMock).getReplyCode();
+ clientUnderTestSpy.open();
+ when(ftpsClientMock.isConnected()).thenReturn(false);
+
+ doThrow(new IOException("problem")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock);
+
+ assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+ .hasMessage("Could not fetch file: java.io.IOException: problem");
+
+ verifyFtpsClientMock_openOk();
+ verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any());
+ verifyNoMoreInteractions(ftpsClientMock);
+ }
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettingsTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettingsTest.java
new file mode 100644
index 0000000..5ee379b
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettingsTest.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
+
+public class SftpClientSettingsTest {
+
+ @Test
+ public void shouldUseFtpStrictHostChecking(@TempDir Path tempDir) throws Exception {
+ File knowHostsFile = new File(tempDir.toFile(), "known_hosts");
+ knowHostsFile.createNewFile();
+
+ SftpConfig config = createSampleSftpConfigWithStrictHostChecking(knowHostsFile.getAbsolutePath());
+ SftpClientSettings sftpClient = new SftpClientSettings(config);
+
+ assertThat(sftpClient.shouldUseStrictHostChecking()).isTrue();
+ }
+
+ @Test
+ public void shouldNotUseFtpStrictHostChecking_whenFileDoesNotExist() {
+ SftpConfig config = createSampleSftpConfigWithStrictHostChecking("unknown_file");
+ SftpClientSettings sftpClient = new SftpClientSettings(config);
+
+ sftpClient.shouldUseStrictHostChecking();
+ assertThat(sftpClient.shouldUseStrictHostChecking()).isFalse();
+ }
+
+ @Test
+ public void shouldNotUseFtpStrictHostChecking_whenExplicitlySwitchedOff() {
+ SftpClientSettings sftpClient = new SftpClientSettings(createSampleSftpConfigNoStrictHostChecking());
+ sftpClient.shouldUseStrictHostChecking();
+ assertThat(sftpClient.shouldUseStrictHostChecking()).isFalse();
+ }
+
+ private SftpConfig createSampleSftpConfigNoStrictHostChecking() {
+ return SftpConfig.builder() //
+ .strictHostKeyChecking(false).knownHostsFilePath("N/A").build();
+ }
+
+ private SftpConfig createSampleSftpConfigWithStrictHostChecking(String pathToKnownHostsFile) {
+ return SftpConfig.builder() //
+ .strictHostKeyChecking(true).knownHostsFilePath(pathToKnownHostsFile).build();
+ }
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
new file mode 100644
index 0000000..596bec8
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -0,0 +1,237 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.SftpException;
+
+import java.nio.file.Paths;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+
+@ExtendWith(MockitoExtension.class)
+public class SftpClientTest {
+
+ private static final String HOST = "127.0.0.1";
+ private static final int SFTP_PORT = 1021;
+ private static final String USERNAME = "bob";
+ private static final String PASSWORD = "123";
+
+ @Mock
+ private JSch jschMock;
+
+ @Mock
+ private Session sessionMock;
+
+ @Mock
+ private ChannelSftp channelMock;
+
+ @Test
+ public void openWithPort_success() throws Exception {
+ FileServerData expectedFileServerData = FileServerData.builder() //
+ .serverAddress(HOST) //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(SFTP_PORT) //
+ .build();
+
+ SftpClient sftpClientSpy = spy(new SftpClient(expectedFileServerData, createSampleSftpClientSettings()));
+
+ doReturn(jschMock).when(sftpClientSpy).createJsch();
+ when(jschMock.getSession(anyString(), anyString(), anyInt())).thenReturn(sessionMock);
+ when(sessionMock.openChannel(anyString())).thenReturn(channelMock);
+
+ sftpClientSpy.open();
+
+ verify(jschMock).getSession(USERNAME, HOST, SFTP_PORT);
+ verify(sessionMock).setConfig("StrictHostKeyChecking", "no");
+ verify(sessionMock).setPassword(PASSWORD);
+ verify(sessionMock).connect();
+ verify(sessionMock).openChannel("sftp");
+ verifyNoMoreInteractions(sessionMock);
+
+ verify(channelMock).connect();
+ verifyNoMoreInteractions(channelMock);
+ }
+
+ @Test
+ public void openWithoutPort_success() throws Exception {
+ FileServerData expectedFileServerData = FileServerData.builder() //
+ .serverAddress(HOST) //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(null) //
+ .build();
+
+ SftpClient sftpClientSpy = spy(new SftpClient(expectedFileServerData, createSampleSftpClientSettings()));
+
+ doReturn(jschMock).when(sftpClientSpy).createJsch();
+ when(jschMock.getSession(anyString(), anyString(), anyInt())).thenReturn(sessionMock);
+ when(sessionMock.openChannel(anyString())).thenReturn(channelMock);
+
+ sftpClientSpy.open();
+
+ verify(jschMock).getSession(USERNAME, HOST, 22);
+ }
+
+ @Test
+ public void open_throwsExceptionWithRetry() throws Exception {
+ FileServerData expectedFileServerData = FileServerData.builder() //
+ .serverAddress(HOST) //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(SFTP_PORT) //
+ .build();
+
+ SftpClient sftpClientSpy = spy(new SftpClient(expectedFileServerData, createSampleSftpClientSettings()));
+
+ doReturn(jschMock).when(sftpClientSpy).createJsch();
+ when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Failed"));
+
+ DatafileTaskException exception = assertThrows(DatafileTaskException.class, () -> sftpClientSpy.open());
+ assertEquals("Could not open Sftp client. com.jcraft.jsch.JSchException: Failed", exception.getMessage());
+ }
+
+ @Test
+ public void openAuthFail_throwsExceptionWithoutRetry() throws Exception {
+ FileServerData expectedFileServerData = FileServerData.builder() //
+ .serverAddress(HOST) //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(SFTP_PORT) //
+ .build();
+
+ SftpClient sftpClientSpy = spy(new SftpClient(expectedFileServerData, createSampleSftpClientSettings()));
+
+ doReturn(jschMock).when(sftpClientSpy).createJsch();
+ when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Auth fail"));
+
+ NonRetryableDatafileTaskException exception =
+ assertThrows(NonRetryableDatafileTaskException.class, () -> sftpClientSpy.open());
+ assertEquals(
+ "Could not open Sftp client, no retry attempts will be done. com.jcraft.jsch.JSchException: Auth fail",
+ exception.getMessage());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void collectFile_success() throws DatafileTaskException, SftpException {
+ FileServerData expectedFileServerData = FileServerData.builder() //
+ .serverAddress(HOST) //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(SFTP_PORT) //
+ .build();
+ SftpClient sftpClient = new SftpClient(expectedFileServerData, createSampleSftpClientSettings());
+
+ sftpClient.sftpChannel = channelMock;
+
+ sftpClient.collectFile("remote.xml", Paths.get("local.xml"));
+
+ verify(channelMock).get("remote.xml", "local.xml");
+ verifyNoMoreInteractions(channelMock);
+ }
+
+ @Test
+ public void collectFile_throwsExceptionWithRetry() throws SftpException {
+ FileServerData expectedFileServerData = FileServerData.builder() //
+ .serverAddress(HOST) //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(SFTP_PORT) //
+ .build();
+
+ try (SftpClient sftpClient = new SftpClient(expectedFileServerData, createSampleSftpClientSettings())) {
+ sftpClient.sftpChannel = channelMock;
+ doThrow(new SftpException(ChannelSftp.SSH_FX_BAD_MESSAGE, "Failed")).when(channelMock).get(anyString(),
+ anyString());
+
+ assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile")))
+ .isInstanceOf(DatafileTaskException.class).hasMessageStartingWith("Unable to get file from xNF. ")
+ .hasMessageContaining(HOST);
+ }
+ }
+
+ @Test
+ public void collectFileFileMissing_throwsExceptionWithoutRetry() throws SftpException {
+ FileServerData expectedFileServerData = FileServerData.builder() //
+ .serverAddress(HOST) //
+ .userId(USERNAME) //
+ .password(PASSWORD) //
+ .port(SFTP_PORT) //
+ .build();
+
+ try (SftpClient sftpClient = new SftpClient(expectedFileServerData, createSampleSftpClientSettings())) {
+ sftpClient.sftpChannel = channelMock;
+ doThrow(new SftpException(ChannelSftp.SSH_FX_NO_SUCH_FILE, "Failed")).when(channelMock).get(anyString(),
+ anyString());
+
+ assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile")))
+ .isInstanceOf(NonRetryableDatafileTaskException.class)
+ .hasMessageStartingWith("Unable to get file from xNF. No retry attempts will be done")
+ .hasMessageContaining("" + SFTP_PORT);
+ }
+ }
+
+ @Test
+ public void close_success() {
+ SftpClient sftpClient = new SftpClient(null, createSampleSftpClientSettings());
+
+ sftpClient.session = sessionMock;
+ sftpClient.sftpChannel = channelMock;
+
+ sftpClient.close();
+
+ verify(sessionMock).disconnect();
+ verifyNoMoreInteractions(sessionMock);
+
+ verify(channelMock).exit();;
+ verifyNoMoreInteractions(channelMock);
+ }
+
+ private SftpClientSettings createSampleSftpClientSettings() {
+ return new SftpClientSettings(createSampleSftpConfigNoStrictHostChecking());
+ }
+
+ private SftpConfig createSampleSftpConfigNoStrictHostChecking() {
+ return SftpConfig.builder() //
+ .strictHostKeyChecking(false).knownHostsFilePath("N/A").build();
+ }
+
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java
new file mode 100644
index 0000000..8550644
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java
@@ -0,0 +1,159 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+
+import org.apache.hc.core5.net.URIBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.netty.http.client.HttpClientConfig;
+
+@ExtendWith(MockitoExtension.class)
+class DfcHttpClientTest {
+
+ private static final String USERNAME = "bob";
+ private static final String PASSWORD = "123";
+ private static final String XNF_ADDRESS = "127.0.0.1";
+ private static final int PORT = 80;
+ private static final String JWT_PASSWORD = "thisIsThePassword";
+ private static String ACCESS_TOKEN = "access_token";
+
+ @Mock
+ private Path pathMock;
+
+ DfcHttpClient dfcHttpClientSpy;
+
+ @BeforeEach
+ public void setup() {
+ dfcHttpClientSpy = spy(new DfcHttpClient(createFileServerData()));
+ }
+
+ @Test
+ void openConnection_successBasicAuthSetup() throws DatafileTaskException {
+ dfcHttpClientSpy.open();
+ HttpClientConfig config = dfcHttpClientSpy.client.configuration();
+ assertEquals(HttpUtils.basicAuthContent(USERNAME, PASSWORD), config.headers().get("Authorization"));
+ }
+
+ @Test
+ void openConnection_failedBasicAuthSetupThrowException() {
+ FileServerData serverData =
+ FileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password("").port(PORT).build();
+
+ DfcHttpClient dfcHttpClientSpy = spy(new DfcHttpClient(serverData));
+
+ assertThatThrownBy(() -> dfcHttpClientSpy.open())
+ .hasMessageContaining("Not sufficient basic auth data for file.");
+ }
+
+ @Test
+ void collectFile_AllOk() throws Exception {
+ String REMOTE_FILE = "any";
+ Flux<InputStream> fis = Flux.just(new ByteArrayInputStream("ReturnedString".getBytes()));
+
+ dfcHttpClientSpy.open();
+
+ when(dfcHttpClientSpy.getServerResponse(any())).thenReturn(fis);
+ doReturn(false).when(dfcHttpClientSpy).isDownloadFailed(any());
+
+ dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock);
+ dfcHttpClientSpy.close();
+
+ verify(dfcHttpClientSpy, times(1)).getServerResponse(REMOTE_FILE);
+ verify(dfcHttpClientSpy, times(1)).processDataFromServer(any(), any(), any());
+ verify(dfcHttpClientSpy, times(1)).isDownloadFailed(any());
+ }
+
+ @Test
+ void collectFile_AllOkWithJWTToken() throws Exception {
+ dfcHttpClientSpy = spy(new DfcHttpClient(fileServerDataWithJWTToken()));
+ String REMOTE_FILE = "any";
+ Flux<InputStream> fis = Flux.just(new ByteArrayInputStream("ReturnedString".getBytes()));
+
+ dfcHttpClientSpy.open();
+ HttpClientConfig config = dfcHttpClientSpy.client.configuration();
+ assertEquals(HttpUtils.jwtAuthContent(JWT_PASSWORD), config.headers().get("Authorization"));
+
+ when(dfcHttpClientSpy.getServerResponse(any())).thenReturn(fis);
+ doReturn(false).when(dfcHttpClientSpy).isDownloadFailed(any());
+
+ dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock);
+ dfcHttpClientSpy.close();
+
+ verify(dfcHttpClientSpy, times(1)).getServerResponse(ArgumentMatchers.eq(REMOTE_FILE));
+ verify(dfcHttpClientSpy, times(1)).processDataFromServer(any(), any(), any());
+ verify(dfcHttpClientSpy, times(1)).isDownloadFailed(any());
+ }
+
+ @Test
+ void collectFile_No200ResponseWriteToErrorMessage() throws DatafileTaskException {
+ String ERROR_RESPONSE = "This is unexpected message";
+ String REMOTE_FILE = "any";
+ Flux<Throwable> fis = Flux.error(new Throwable(ERROR_RESPONSE));
+
+ dfcHttpClientSpy.open();
+
+ doReturn(fis).when(dfcHttpClientSpy).getServerResponse(any());
+
+ assertThatThrownBy(() -> dfcHttpClientSpy.collectFile(REMOTE_FILE, pathMock))
+ .hasMessageContaining(ERROR_RESPONSE);
+ verify(dfcHttpClientSpy, times(1)).getServerResponse(REMOTE_FILE);
+ verify(dfcHttpClientSpy, times(1)).processFailedConnectionWithServer(any(), any());
+ dfcHttpClientSpy.close();
+ }
+
+ @Test
+ void isResponseOk_validateResponse() {
+ assertTrue(dfcHttpClientSpy.isResponseOk(HttpClientResponseHelper.NETTY_RESPONSE_OK));
+ assertFalse(dfcHttpClientSpy.isResponseOk(HttpClientResponseHelper.RESPONSE_ANY_NO_OK));
+ }
+
+ private FileServerData createFileServerData() {
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD).port(PORT)
+ .build();
+ }
+
+ private FileServerData fileServerDataWithJWTToken() throws URISyntaxException {
+ String query = "?" + ACCESS_TOKEN + "=" + JWT_PASSWORD;
+
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+ .queryParameters(new URIBuilder(query).getQueryParams()).build();
+ }
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClientTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClientTest.java
new file mode 100644
index 0000000..4295fe8
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClientTest.java
@@ -0,0 +1,178 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+
+import org.apache.hc.core5.net.URIBuilder;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+
+@ExtendWith(MockitoExtension.class)
+class DfcHttpsClientTest {
+
+ private static final String USERNAME = "bob";
+ private static final String PASSWORD = "123";
+ private static final String XNF_ADDRESS = "127.0.0.1";
+ private static final int PORT = 443;
+ private static final String JWT_PASSWORD = "thisIsThePassword";
+ private static String ACCESS_TOKEN = "access_token";
+ private static String remoteFile = "remoteFile";
+
+ @Mock
+ private PoolingHttpClientConnectionManager connectionManager;
+ @Mock
+ private Path localFile;
+
+ DfcHttpsClient dfcHttpsClientSpy;
+
+ @BeforeEach
+ public void setup() {
+ dfcHttpsClientSpy = spy(new DfcHttpsClient(createFileServerData(), connectionManager));
+ }
+
+ @Test
+ void fileServerData_properLocationBasicAuth() throws Exception {
+ boolean result = dfcHttpsClientSpy.basicAuthValidNotPresentOrThrow();
+ assertEquals(true, result);
+ }
+
+ @Test
+ void fileServerData_properLocationNoBasicAuth() throws Exception {
+ dfcHttpsClientSpy = spy(new DfcHttpsClient(emptyUserInFileServerData(), connectionManager));
+
+ boolean result = dfcHttpsClientSpy.basicAuthValidNotPresentOrThrow();
+ assertEquals(false, result);
+ }
+
+ @Test
+ void fileServerData_improperAuthDataExceptionOccurred() throws Exception {
+ dfcHttpsClientSpy = spy(new DfcHttpsClient(invalidUserInFileServerData(), connectionManager));
+
+ assertThrows(DatafileTaskException.class, () -> dfcHttpsClientSpy.basicAuthValidNotPresentOrThrow());
+ }
+
+ @Test
+ void dfcHttpsClient_flow_successfulCallAndResponseProcessing() throws Exception {
+ doReturn(HttpClientResponseHelper.APACHE_RESPONSE_OK).when(dfcHttpsClientSpy)
+ .executeHttpClient(any(HttpGet.class));
+ doReturn((long) 3).when(dfcHttpsClientSpy).writeFile(eq(localFile), any(InputStream.class));
+
+ dfcHttpsClientSpy.open();
+ dfcHttpsClientSpy.collectFile(remoteFile, localFile);
+ dfcHttpsClientSpy.close();
+
+ verify(dfcHttpsClientSpy, times(1)).makeCall(any(HttpGet.class));
+ verify(dfcHttpsClientSpy, times(1)).executeHttpClient(any(HttpGet.class));
+ verify(dfcHttpsClientSpy, times(1)).processResponse(HttpClientResponseHelper.APACHE_RESPONSE_OK, localFile);
+ verify(dfcHttpsClientSpy, times(1)).writeFile(eq(localFile), any(InputStream.class));
+ }
+
+ @Test
+ void dfcHttpsClient_flow_successfulCallWithJWTAndResponseProcessing() throws Exception {
+ FileServerData serverData = jWTTokenInFileServerData();
+ dfcHttpsClientSpy = spy(new DfcHttpsClient(serverData, connectionManager));
+
+ doReturn(HttpClientResponseHelper.APACHE_RESPONSE_OK).when(dfcHttpsClientSpy)
+ .executeHttpClient(any(HttpGet.class));
+ doReturn((long) 3).when(dfcHttpsClientSpy).writeFile(eq(localFile), any(InputStream.class));
+
+ dfcHttpsClientSpy.open();
+ dfcHttpsClientSpy.collectFile(remoteFile, localFile);
+ dfcHttpsClientSpy.close();
+
+ verify(dfcHttpsClientSpy, times(1)).makeCall(any(HttpGet.class));
+ verify(dfcHttpsClientSpy, times(1)).executeHttpClient(any(HttpGet.class));
+ verify(dfcHttpsClientSpy, times(1)).processResponse(HttpClientResponseHelper.APACHE_RESPONSE_OK, localFile);
+ verify(dfcHttpsClientSpy, times(1)).writeFile(eq(localFile), any(InputStream.class));
+ String str = serverData.toString();
+ assertFalse(str.contains(JWT_PASSWORD));
+ }
+
+ @Test
+ void dfcHttpsClient_flow_failedCallUnexpectedResponseCode() throws Exception {
+ doReturn(HttpClientResponseHelper.APACHE_RESPONSE_OK).when(dfcHttpsClientSpy)
+ .executeHttpClient(any(HttpGet.class));
+ doReturn(false).when(dfcHttpsClientSpy).isResponseOk(any(HttpResponse.class));
+
+ dfcHttpsClientSpy.open();
+
+ assertThrows(DatafileTaskException.class, () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
+ }
+
+ @Test
+ void dfcHttpsClient_flow_failedCallConnectionTimeout() throws Exception {
+ doThrow(ConnectTimeoutException.class).when(dfcHttpsClientSpy).executeHttpClient(any(HttpGet.class));
+
+ dfcHttpsClientSpy.open();
+
+ assertThrows(NonRetryableDatafileTaskException.class,
+ () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
+ }
+
+ @Test
+ void dfcHttpsClient_flow_failedCallIOExceptionForExecuteHttpClient() throws Exception {
+ doThrow(IOException.class).when(dfcHttpsClientSpy).executeHttpClient(any(HttpGet.class));
+
+ dfcHttpsClientSpy.open();
+
+ assertThrows(DatafileTaskException.class, () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
+ }
+
+ private FileServerData createFileServerData() {
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD).port(PORT)
+ .build();
+ }
+
+ private FileServerData emptyUserInFileServerData() {
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT).build();
+ }
+
+ private FileServerData invalidUserInFileServerData() {
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password("").port(PORT).build();
+ }
+
+ private FileServerData jWTTokenInFileServerData() throws URISyntaxException {
+ String query = "?" + ACCESS_TOKEN + "=" + JWT_PASSWORD;
+
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+ .queryParameters(new URIBuilder(query).getQueryParams()).build();
+ }
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java
new file mode 100644
index 0000000..3df2cad
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java
@@ -0,0 +1,419 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.cookie.Cookie;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.http.Header;
+import org.apache.http.HeaderIterator;
+import org.apache.http.HttpEntity;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.params.HttpParams;
+
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.util.context.Context;
+import reactor.util.context.ContextView;
+
+public class HttpClientResponseHelper {
+
+ public static final HttpClientResponse NETTY_RESPONSE_OK = new HttpClientResponse() {
+
+ @Override
+ public Map<CharSequence, Set<Cookie>> cookies() {
+ return null;
+ }
+
+ @Override
+ public boolean isKeepAlive() {
+ return false;
+ }
+
+ @Override
+ public boolean isWebsocket() {
+ return false;
+ }
+
+ @Override
+ public HttpMethod method() {
+ return null;
+ }
+
+ @Override
+ public String path() {
+ return null;
+ }
+
+ @Override
+ public String fullPath() {
+ return null;
+ }
+
+ @Override
+ public String requestId() {
+ return null;
+ }
+
+ @Override
+ public String uri() {
+ return null;
+ }
+
+ @Override
+ public HttpVersion version() {
+ return null;
+ }
+
+ @Override
+ public Context currentContext() {
+ return null;
+ }
+
+ @Override
+ public ContextView currentContextView() {
+ return null;
+ }
+
+ @Override
+ public String[] redirectedFrom() {
+ return new String[0];
+ }
+
+ @Override
+ public HttpHeaders requestHeaders() {
+ return null;
+ }
+
+ @Override
+ public String resourceUrl() {
+ return null;
+ }
+
+ @Override
+ public HttpHeaders responseHeaders() {
+ return null;
+ }
+
+ @Override
+ public HttpResponseStatus status() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public Mono<HttpHeaders> trailerHeaders() {
+ return null;
+ }
+ };
+
+ public static final HttpClientResponse RESPONSE_ANY_NO_OK = new HttpClientResponse() {
+
+ @Override
+ public Map<CharSequence, Set<Cookie>> cookies() {
+ return null;
+ }
+
+ @Override
+ public boolean isKeepAlive() {
+ return false;
+ }
+
+ @Override
+ public boolean isWebsocket() {
+ return false;
+ }
+
+ @Override
+ public HttpMethod method() {
+ return null;
+ }
+
+ @Override
+ public String fullPath() {
+ return null;
+ }
+
+ @Override
+ public String requestId() {
+ return null;
+ }
+
+ @Override
+ public String uri() {
+ return null;
+ }
+
+ @Override
+ public HttpVersion version() {
+ return null;
+ }
+
+ @Override
+ public Context currentContext() {
+ return null;
+ }
+
+ @Override
+ public ContextView currentContextView() {
+ return null;
+ }
+
+ @Override
+ public String[] redirectedFrom() {
+ return new String[0];
+ }
+
+ @Override
+ public HttpHeaders requestHeaders() {
+ return null;
+ }
+
+ @Override
+ public String resourceUrl() {
+ return null;
+ }
+
+ @Override
+ public HttpHeaders responseHeaders() {
+ return null;
+ }
+
+ @Override
+ public HttpResponseStatus status() {
+ return HttpResponseStatus.NOT_IMPLEMENTED;
+ }
+
+ @Override
+ public Mono<HttpHeaders> trailerHeaders() {
+ return null;
+ }
+ };
+
+ public static final CloseableHttpResponse APACHE_RESPONSE_OK = new CloseableHttpResponse() {
+ @Override
+ public void close() throws IOException {
+ getEntity().getContent().close();
+ }
+
+ @Override
+ public StatusLine getStatusLine() {
+ return new StatusLine() {
+ @Override
+ public ProtocolVersion getProtocolVersion() {
+ return null;
+ }
+
+ @Override
+ public int getStatusCode() {
+ return 200;
+ }
+
+ @Override
+ public String getReasonPhrase() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void setStatusLine(StatusLine statusLine) {
+
+ }
+
+ @Override
+ public void setStatusLine(ProtocolVersion protocolVersion, int i) {
+
+ }
+
+ @Override
+ public void setStatusLine(ProtocolVersion protocolVersion, int i, String s) {
+
+ }
+
+ @Override
+ public void setStatusCode(int i) throws IllegalStateException {
+
+ }
+
+ @Override
+ public void setReasonPhrase(String s) throws IllegalStateException {
+
+ }
+
+ @Override
+ public HttpEntity getEntity() {
+ return new HttpEntity() {
+ @Override
+ public boolean isRepeatable() {
+ return false;
+ }
+
+ @Override
+ public boolean isChunked() {
+ return false;
+ }
+
+ @Override
+ public long getContentLength() {
+ return 0;
+ }
+
+ @Override
+ public Header getContentType() {
+ return null;
+ }
+
+ @Override
+ public Header getContentEncoding() {
+ return null;
+ }
+
+ @Override
+ public InputStream getContent() throws IOException, UnsupportedOperationException {
+ return new ByteArrayInputStream("abc".getBytes());
+ }
+
+ @Override
+ public void writeTo(OutputStream outputStream) throws IOException {
+
+ }
+
+ @Override
+ public boolean isStreaming() {
+ return false;
+ }
+
+ @Override
+ public void consumeContent() throws IOException {
+
+ }
+ };
+ }
+
+ @Override
+ public void setEntity(HttpEntity httpEntity) {
+
+ }
+
+ @Override
+ public Locale getLocale() {
+ return null;
+ }
+
+ @Override
+ public void setLocale(Locale locale) {
+
+ }
+
+ @Override
+ public ProtocolVersion getProtocolVersion() {
+ return null;
+ }
+
+ @Override
+ public boolean containsHeader(String s) {
+ return false;
+ }
+
+ @Override
+ public Header[] getHeaders(String s) {
+ return new Header[0];
+ }
+
+ @Override
+ public Header getFirstHeader(String s) {
+ return null;
+ }
+
+ @Override
+ public Header getLastHeader(String s) {
+ return null;
+ }
+
+ @Override
+ public Header[] getAllHeaders() {
+ return new Header[0];
+ }
+
+ @Override
+ public void addHeader(Header header) {
+
+ }
+
+ @Override
+ public void addHeader(String s, String s1) {
+
+ }
+
+ @Override
+ public void setHeader(Header header) {
+
+ }
+
+ @Override
+ public void setHeader(String s, String s1) {
+
+ }
+
+ @Override
+ public void setHeaders(Header[] headers) {
+
+ }
+
+ @Override
+ public void removeHeader(Header header) {
+
+ }
+
+ @Override
+ public void removeHeaders(String s) {
+
+ }
+
+ @Override
+ public HeaderIterator headerIterator() {
+ return null;
+ }
+
+ @Override
+ public HeaderIterator headerIterator(String s) {
+ return null;
+ }
+
+ @Override
+ public HttpParams getParams() {
+ return null;
+ }
+
+ @Override
+ public void setParams(HttpParams params) {
+ }
+
+ };
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtilTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtilTest.java
new file mode 100644
index 0000000..bb1a93f
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtilTest.java
@@ -0,0 +1,54 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+@ExtendWith(MockitoExtension.class)
+public class HttpsClientConnectionManagerUtilTest {
+
+ private static final String KEY_PATH = "src/test/resources/keystore.p12";
+ private static final String KEY_PASSWORD = "src/test/resources/keystore.pass";
+ private static final String KEY_IMPROPER_PASSWORD = "src/test/resources/dfc.jks.pass";
+ private static final String TRUSTED_CA_PATH = "src/test/resources/trust.jks";
+ private static final String TRUSTED_CA_PASSWORD = "src/test/resources/trust.pass";
+
+ @Test
+ public void emptyManager_shouldThrowException() {
+ assertThrows(DatafileTaskException.class, () -> HttpsClientConnectionManagerUtil.instance());
+ }
+
+ @Test
+ public void creatingManager_successfulCase() throws Exception {
+ HttpsClientConnectionManagerUtil.setupOrUpdate(KEY_PATH, KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD, //
+ true);
+ assertNotNull(HttpsClientConnectionManagerUtil.instance());
+ }
+
+ @Test
+ public void creatingManager_improperSecretShouldThrowException() {
+ assertThrows(DatafileTaskException.class, () -> HttpsClientConnectionManagerUtil.setupOrUpdate(KEY_PATH, //
+ KEY_IMPROPER_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD, true));
+ assertThrows(DatafileTaskException.class, () -> HttpsClientConnectionManagerUtil.instance());
+ }
+
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java
new file mode 100644
index 0000000..413cd13
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java
@@ -0,0 +1,48 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.scheme;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+public class SchemeTest {
+
+ @Test
+ public void shouldReturnSchemeForSupportedProtocol() throws DatafileTaskException {
+ assertEquals(Scheme.FTPES, Scheme.getSchemeFromString("FTPES"));
+ assertEquals(Scheme.SFTP, Scheme.getSchemeFromString("SFTP"));
+ assertEquals(Scheme.HTTP, Scheme.getSchemeFromString("HTTP"));
+ assertEquals(Scheme.HTTPS, Scheme.getSchemeFromString("HTTPS"));
+ }
+
+ @Test
+ public void shouldThrowExceptionForUnsupportedProtocol() {
+ assertThrows(DatafileTaskException.class, () -> Scheme.getSchemeFromString("FTPS"));
+ }
+
+ @Test
+ public void shouldThrowExceptionForInvalidProtocol() {
+ assertThrows(DatafileTaskException.class, () -> Scheme.getSchemeFromString("invalid"));
+ }
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
new file mode 100644
index 0000000..0ee9f72
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
@@ -0,0 +1,159 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Modifications Copyright (C) 2021 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.net.URIBuilder;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+
+class HttpUtilsTest {
+
+ private static final String XNF_ADDRESS = "127.0.0.1";
+ private static final int PORT = 443;
+ private static final String JWT_PASSWORD = "thisIsThePassword";
+ private static final String ACCESS_TOKEN = "access_token";
+ private static final String ANOTHER_TOKEN = "another_token";
+ private static final String ANOTHER_DATA = "another_data";
+ private static final String FRAGMENT = "thisIsTheFragment";
+ private static final String USERNAME = "bob";
+ private static final String PASSWORD = "123";
+
+ @Test
+ void shouldReturnSuccessfulResponse() {
+ assertTrue(HttpUtils.isSuccessfulResponseCodeWithDataRouter(200));
+ }
+
+ @Test
+ void shouldReturnBadResponse() {
+ assertFalse(HttpUtils.isSuccessfulResponseCodeWithDataRouter(404));
+ }
+
+ @Test
+ void isSingleQueryWithJWT_validToken() throws URISyntaxException {
+ assertTrue(HttpUtils.isQueryWithSingleJWT(validTokenSingleQueryData()));
+ assertTrue(HttpUtils.isQueryWithSingleJWT(validTokenDoubleQueryData()));
+ }
+
+ @Test
+ void isSingleQueryWithJWT_invalidToken() throws URISyntaxException {
+ assertFalse(HttpUtils.isQueryWithSingleJWT(validQueryNoToken()));
+ assertFalse(HttpUtils.isQueryWithSingleJWT(queryDataDoubleToken()));
+ assertFalse(HttpUtils.isQueryWithSingleJWT(null));
+ }
+
+ @Test
+ void getJWTToken_jWTTokenPresent() throws URISyntaxException {
+ assertEquals(JWT_PASSWORD, HttpUtils.getJWTToken(fileServerDataWithJWTToken()));
+ assertEquals(JWT_PASSWORD, HttpUtils.getJWTToken(fileServerDataWithJWTTokenLongQueryAndFragment()));
+ }
+
+ @Test
+ void getJWTToken_JWTTokenNotPresent() throws URISyntaxException {
+ assertEquals("", HttpUtils.getJWTToken(fileServerDataQueryWithoutToken()));
+ }
+
+ @Test
+ void prepareUri_UriWithoutPort() {
+ FileServerData serverData =
+ FileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD).build();
+ String REMOTE_FILE = "any";
+
+ String retrievedUri = HttpUtils.prepareUri("http", serverData, REMOTE_FILE, 80);
+ assertTrue(retrievedUri.startsWith("http://" + XNF_ADDRESS + ":80"));
+ }
+
+ @Test
+ void prepareUri_verifyUriWithTokenAndFragment() throws URISyntaxException {
+ String file = "/file";
+ String expected = "http://" + XNF_ADDRESS + ":" + PORT + file + "?" + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&"
+ + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&" + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "#" + FRAGMENT;
+ assertEquals(expected,
+ HttpUtils.prepareUri("http", fileServerDataWithJWTTokenLongQueryAndFragment(), file, 443));
+ }
+
+ @Test
+ void prepareUri_verifyUriWithoutTokenAndWithoutFragment() throws URISyntaxException {
+ String file = "/file";
+ String expected = "http://" + XNF_ADDRESS + ":" + PORT + file;
+ assertEquals(expected, HttpUtils.prepareUri("http", fileServerDataNoTokenNoFragment(), file, 443));
+ }
+
+ private List<NameValuePair> validTokenSingleQueryData() throws URISyntaxException {
+ String query = "?" + ACCESS_TOKEN + "=" + JWT_PASSWORD;
+ return new URIBuilder(query).getQueryParams();
+ }
+
+ private List<NameValuePair> validTokenDoubleQueryData() throws URISyntaxException {
+ StringBuilder doubleQuery = new StringBuilder();
+ doubleQuery.append("?" + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&");
+ doubleQuery.append(ACCESS_TOKEN + "=" + JWT_PASSWORD);
+ return new URIBuilder(doubleQuery.toString()).getQueryParams();
+ }
+
+ private List<NameValuePair> validQueryNoToken() throws URISyntaxException {
+ String query = "?" + ANOTHER_TOKEN + "=" + JWT_PASSWORD;
+ return new URIBuilder(query).getQueryParams();
+ }
+
+ private List<NameValuePair> queryDataDoubleToken() throws URISyntaxException {
+ StringBuilder doubleToken = new StringBuilder();
+ doubleToken.append("?" + ACCESS_TOKEN + "=" + JWT_PASSWORD + "&");
+ doubleToken.append(ACCESS_TOKEN + "=" + JWT_PASSWORD + "&");
+ doubleToken.append(ANOTHER_TOKEN + "=" + ANOTHER_DATA);
+ return new URIBuilder(doubleToken.toString()).getQueryParams();
+ }
+
+ private FileServerData fileServerDataWithJWTToken() throws URISyntaxException {
+ String query = "?" + ACCESS_TOKEN + "=" + JWT_PASSWORD;
+
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+ .queryParameters(new URIBuilder(query).getQueryParams()).build();
+ }
+
+ private FileServerData fileServerDataWithJWTTokenLongQueryAndFragment() throws URISyntaxException {
+ StringBuilder query = new StringBuilder();
+ query.append("?" + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&");
+ query.append(ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&");
+ query.append(ACCESS_TOKEN + "=" + JWT_PASSWORD + "&");
+ query.append(ANOTHER_TOKEN + "=" + ANOTHER_DATA);
+
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+ .queryParameters(new URIBuilder(query.toString()).getQueryParams()).uriRawFragment(FRAGMENT).build();
+ }
+
+ private FileServerData fileServerDataQueryWithoutToken() throws URISyntaxException {
+ StringBuilder query = new StringBuilder();
+ query.append("?" + ANOTHER_TOKEN + "=" + ANOTHER_DATA);
+
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+ .queryParameters(new URIBuilder(query.toString()).getQueryParams()).build();
+ }
+
+ private FileServerData fileServerDataNoTokenNoFragment() throws URISyntaxException {
+ return FileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+ .queryParameters(new URIBuilder("").getQueryParams()).uriRawFragment("").build();
+ }
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
new file mode 100644
index 0000000..6d437ae
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -0,0 +1,368 @@
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2020-2022 Nokia. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
+import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient;
+import org.onap.dcaegen2.collectors.datafile.model.Counters;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import reactor.test.StepVerifier;
+
+public class FileCollectorTest {
+
+ final static String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
+ private static final String PRODUCT_NAME = "NrRadio";
+ private static final String VENDOR_NAME = "Ericsson";
+ private static final int LAST_EPOCH_MICROSEC = 87457457;
+ private static final String SOURCE_NAME = "oteNB5309";
+ private static final int START_EPOCH_MICROSEC = 874575764;
+ private static final String TIME_ZONE_OFFSET = "UTC+05:00";
+ private static final String FTPES_SCHEME = "ftpes://";
+ private static final String SFTP_SCHEME = "sftp://";
+ private static final String HTTP_SCHEME = "http://";
+ private static final String HTTPS_SCHEME = "https://";
+ private static final String SERVER_ADDRESS = "192.168.0.101";
+ private static final int PORT_22 = 22;
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final Path LOCAL_FILE_LOCATION = Paths.get(DATAFILE_TMPDIR, SOURCE_NAME, PM_FILE_NAME);
+ private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
+ private static final String USER = "usr";
+ private static final String PWD = "pwd";
+ private static final String FTPES_LOCATION =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private static final String FTPES_LOCATION_NO_PORT =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+
+ private static final String HTTP_LOCATION =
+ HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String HTTP_LOCATION_NO_PORT =
+ HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+ private static final String HTTPS_LOCATION =
+ HTTPS_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String HTTPS_LOCATION_NO_PORT =
+ HTTPS_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+
+ private static final String GZIP_COMPRESSION = "gzip";
+ private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String FILE_FORMAT_VERSION = "V10";
+ private static final String CERTIFICATE_KEY_PASSWORD_PATH = "certificateKeyPassword";
+ private static final String TRUSTED_CA_PATH = "trustedCAPath";
+ private static final String TRUSTED_CA_PASSWORD_PATH = "trustedCAPassword";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+ private static final String CHANGE_TYPE = "FileReady";
+
+ private static AppConfig appConfigMock = mock(AppConfig.class);
+ private static CertificateConfig certificateConfigMock = mock(CertificateConfig.class);
+
+ private FtpesClient ftpesClientMock = mock(FtpesClient.class);
+
+ private SftpClient sftpClientMock = mock(SftpClient.class);
+
+ private DfcHttpClient dfcHttpClientMock = mock(DfcHttpClient.class);
+ private DfcHttpsClient dfcHttpsClientMock = mock(DfcHttpsClient.class);
+
+ private Counters counters;
+
+ FileReadyMessage.Event event(String location) {
+ FileReadyMessage.MessageMetaData messageMetaData = FileReadyMessage.MessageMetaData.builder() //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .eventName("Noti_NrRadio-Ericsson_FileReady").build();
+
+ FileReadyMessage.FileInfo fileInfo = FileReadyMessage.FileInfo //
+ .builder() //
+ .fileFormatType(FILE_FORMAT_TYPE) //
+ .location(location) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .compression(GZIP_COMPRESSION) //
+ .build();
+
+ FileReadyMessage.ArrayOfNamedHashMap arrayOfNamedHashMap = FileReadyMessage.ArrayOfNamedHashMap //
+ .builder().name(PM_FILE_NAME) //
+ .hashMap(fileInfo).build();
+
+ List<FileReadyMessage.ArrayOfNamedHashMap> arrayOfNamedHashMapList = new ArrayList<>();
+ arrayOfNamedHashMapList.add(arrayOfNamedHashMap);
+
+ FileReadyMessage.NotificationFields notificationFields = FileReadyMessage.NotificationFields //
+ .builder().notificationFieldsVersion("notificationFieldsVersion") //
+ .changeType(CHANGE_TYPE).changeIdentifier(CHANGE_IDENTIFIER) //
+ .arrayOfNamedHashMap(arrayOfNamedHashMapList) //
+ .build();
+
+ return FileReadyMessage.Event.builder() //
+ .commonEventHeader(messageMetaData) //
+ .notificationFields(notificationFields).build();
+ }
+
+ private FileReadyMessage fileReadyMessage(String location) {
+ FileReadyMessage message = FileReadyMessage.builder() //
+ .event(event(location)) //
+ .build();
+ return message;
+ }
+
+ private FileData createFileData(String location) {
+ return FileData.createFileData(fileReadyMessage(location)).iterator().next();
+ }
+
+ private FilePublishInformation createExpectedFilePublishInformation(String location) {
+ return FilePublishInformation.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(SOURCE_NAME + "/" + PM_FILE_NAME) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
+ .build();
+ }
+
+ @BeforeAll
+ static void setUpConfiguration() {
+ when(appConfigMock.getCertificateConfiguration()).thenReturn(certificateConfigMock);
+ appConfigMock.collectedFilesPath = DATAFILE_TMPDIR;
+ certificateConfigMock.keyPasswordPath = CERTIFICATE_KEY_PASSWORD_PATH;
+ certificateConfigMock.trustedCa = TRUSTED_CA_PATH;
+ certificateConfigMock.trustedCaPasswordPath = TRUSTED_CA_PASSWORD_PATH;
+ }
+
+ @BeforeEach
+ void setUpTest() {
+ counters = new Counters();
+ }
+
+ @Test
+ public void whenFtpesFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
+ doReturn(ftpesClientMock).when(collectorUndetTest).createFtpesClient(any());
+
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
+
+ FilePublishInformation expectedfilePublishInformation =
+ createExpectedFilePublishInformation(FTPES_LOCATION_NO_PORT);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
+
+ verify(ftpesClientMock, times(1)).open();
+ verify(ftpesClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(ftpesClientMock, times(1)).close();
+ verifyNoMoreInteractions(ftpesClientMock);
+
+ assertEquals(1, counters.getNoOfCollectedFiles(), "collectedFiles should have been 1");
+ assertEquals(0, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 0");
+ assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
+ }
+
+ @Test
+ public void whenSftpFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
+ doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any());
+
+ FileData fileData = createFileData(SFTP_LOCATION_NO_PORT);
+ FilePublishInformation expectedfilePublishInformation =
+ createExpectedFilePublishInformation(SFTP_LOCATION_NO_PORT);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
+
+ // The same again, but with port
+ fileData = createFileData(SFTP_LOCATION);
+ expectedfilePublishInformation = createExpectedFilePublishInformation(SFTP_LOCATION);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
+
+ verify(sftpClientMock, times(2)).open();
+ verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(sftpClientMock, times(2)).close();
+ verifyNoMoreInteractions(sftpClientMock);
+
+ assertEquals(2, counters.getNoOfCollectedFiles(), "collectedFiles should have been 2");
+ }
+
+ @Test
+ public void whenHttpFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
+ doReturn(dfcHttpClientMock).when(collectorUndetTest).createHttpClient(any());
+
+ FileData fileData = createFileData(HTTP_LOCATION_NO_PORT);
+
+ FilePublishInformation expectedfilePublishInformation =
+ createExpectedFilePublishInformation(HTTP_LOCATION_NO_PORT);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
+
+ // The same again, but with port
+ fileData = createFileData(HTTP_LOCATION);
+ expectedfilePublishInformation = createExpectedFilePublishInformation(HTTP_LOCATION);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
+
+ verify(dfcHttpClientMock, times(2)).open();
+ verify(dfcHttpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(dfcHttpClientMock, times(2)).close();
+ verifyNoMoreInteractions(dfcHttpClientMock);
+
+ assertEquals(2, counters.getNoOfCollectedFiles(), "collectedFiles should have been 1");
+ assertEquals(0, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 0");
+ assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
+ }
+
+ @Test
+ public void whenHttpsFile_returnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
+ doReturn(dfcHttpsClientMock).when(collectorUndetTest).createHttpsClient(any());
+
+ FileData fileData = createFileData(HTTPS_LOCATION_NO_PORT);
+
+ FilePublishInformation expectedfilePublishInformation =
+ createExpectedFilePublishInformation(HTTPS_LOCATION_NO_PORT);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
+
+ // The same again, but with port
+ fileData = createFileData(HTTPS_LOCATION);
+ expectedfilePublishInformation = createExpectedFilePublishInformation(HTTPS_LOCATION);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
+
+ verify(dfcHttpsClientMock, times(2)).open();
+ verify(dfcHttpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+ verify(dfcHttpsClientMock, times(2)).close();
+ verifyNoMoreInteractions(dfcHttpsClientMock);
+
+ assertEquals(2, counters.getNoOfCollectedFiles(), "collectedFiles should have been 1");
+ assertEquals(0, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 0");
+ assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
+ }
+
+ @Test
+ public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
+ doReturn(ftpesClientMock).when(collectorUndetTest).createFtpesClient(any());
+
+ FileData fileData = createFileData(FTPES_LOCATION);
+ doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpesClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectErrorMessage("Retries exhausted: 3/3") //
+ .verify();
+
+ verify(ftpesClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+ assertEquals(0, counters.getNoOfCollectedFiles(), "collectedFiles should have been 0");
+ assertEquals(4, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 4");
+ assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
+ }
+
+ @Test
+ public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception {
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
+ doReturn(ftpesClientMock).when(collectorUndetTest).createFtpesClient(any());
+
+ FileData fileData = createFileData(FTPES_LOCATION);
+ doThrow(new NonRetryableDatafileTaskException("Unable to collect file.")).when(ftpesClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectErrorMessage("Non retryable file transfer failure") //
+ .verify();
+
+ verify(ftpesClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+ assertEquals(0, counters.getNoOfCollectedFiles(), "collectedFiles should have been 0");
+ assertEquals(1, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 1");
+ assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
+ }
+
+ @Test
+ public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
+ FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
+ doReturn(ftpesClientMock).when(collectorUndetTest).createFtpesClient(any());
+ doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpesClientMock)
+ .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+ FilePublishInformation expectedfilePublishInformation =
+ createExpectedFilePublishInformation(FTPES_LOCATION_NO_PORT);
+
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
+
+ StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0)))
+ .expectNext(expectedfilePublishInformation) //
+ .verifyComplete();
+
+ verify(ftpesClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+ assertEquals(1, counters.getNoOfCollectedFiles(), "collectedFiles should have been 1");
+ assertEquals(1, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 1");
+ assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
+ }
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
new file mode 100644
index 0000000..7c2706d
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
@@ -0,0 +1,262 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.utils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Utility class to produce correctly formatted fileReady event Json messages.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> on 7/25/18
+ *
+ */
+public class JsonMessage {
+ private String eventName;
+ private String changeIdentifier;
+ private String changeType;
+ private String notificationFieldsVersion;
+ private List<AdditionalField> arrayOfAdditionalFields;
+
+ public List<AdditionalField> getAdditionalFields() {
+ return arrayOfAdditionalFields;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + getParsed() + "]";
+ }
+
+ /**
+ * Gets the message in parsed format.
+ *
+ * @return the massage in parsed format.
+ */
+ public String getParsed() {
+ StringBuffer additionalFieldsString = new StringBuffer();
+ if (arrayOfAdditionalFields.size() > 0) {
+ additionalFieldsString.append("\"arrayOfNamedHashMap\":[");
+ for (Iterator<AdditionalField> iterator = arrayOfAdditionalFields.iterator(); iterator.hasNext();) {
+ AdditionalField additionalField = iterator.next();
+ additionalFieldsString.append(additionalField.toString());
+ if (iterator.hasNext()) {
+ additionalFieldsString.append(",");
+ }
+ }
+ additionalFieldsString.append("]");
+ }
+ return "{" //
+ + "\"event\":" //
+ + "{" //
+ + "\"commonEventHeader\":" //
+ + "{" //
+ + "\"domain\":\"notification\"," //
+ + "\"eventId\":\"<<SerialNumber>>-reg\"," //
+ + "\"eventName\":\"" + eventName + "\"," //
+ + "\"eventType\":\"fileReady\"," //
+ + "\"internalHeaderFields\":{}," //
+ + "\"lastEpochMicrosec\":1519837825682," //
+ + "\"nfNamingCode\":\"5GRAN\"," //
+ + "\"nfcNamingCode\":\"5DU\"," //
+ + "\"priority\":\"Normal\"," //
+ + "\"reportingEntityName\":\"5GRAN_DU\"," //
+ + "\"sequence\":0," //
+ + "\"sourceId\":\"<<SerialNumber>>\"," //
+ + "\"sourceName\":\"5GRAN_DU\"," //
+ + "\"timeZoneOffset\":\"UTC+05:00\"," //
+ + "\"startEpochMicrosec\":\"1519837825682\"," //
+ + "\"version\":3" //
+ + "}," //
+ + "\"notificationFields\":" //
+ + "{" //
+ + getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier,
+ changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0)
+ + getAsStringIfParameterIsSet("changeType", changeType,
+ notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0)
+ + getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion,
+ arrayOfAdditionalFields.size() > 0)
+ + additionalFieldsString.toString() //
+ + "}" //
+ + "}" //
+ + "}";
+ }
+
+ private JsonMessage(final JsonMessageBuilder builder) {
+ this.eventName = builder.eventName;
+ this.changeIdentifier = builder.changeIdentifier;
+ this.changeType = builder.changeType;
+ this.notificationFieldsVersion = builder.notificationFieldsVersion;
+ this.arrayOfAdditionalFields = builder.arrayOfAdditionalFields;
+ }
+
+ public static class AdditionalField {
+ private String name;
+ private String location;
+ private String compression;
+ private String fileFormatType;
+ private String fileFormatVersion;
+
+ @Override
+ public String toString() {
+ return "{" //
+ + getAsStringIfParameterIsSet("name", name, true) //
+ + "\"hashMap\":" //
+ + "{"
+ + getAsStringIfParameterIsSet("location", location,
+ compression != null || fileFormatType != null || fileFormatVersion != null)
+ + getAsStringIfParameterIsSet("compression", compression,
+ fileFormatType != null || fileFormatVersion != null)
+ + getAsStringIfParameterIsSet("fileFormatType", fileFormatType, fileFormatVersion != null)
+ + getAsStringIfParameterIsSet("fileFormatVersion", fileFormatVersion, false) //
+ + "}" //
+ + "}";
+ }
+
+ private AdditionalField(AdditionalFieldBuilder builder) {
+ this.name = builder.name;
+ this.location = builder.location;
+ this.compression = builder.compression;
+ this.fileFormatType = builder.fileFormatType;
+ this.fileFormatVersion = builder.fileFormatVersion;
+ }
+
+ }
+
+ public static class AdditionalFieldBuilder {
+ private String name;
+ private String location;
+ private String compression;
+ private String fileFormatType;
+ private String fileFormatVersion;
+
+ public AdditionalFieldBuilder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public AdditionalFieldBuilder location(String location) {
+ this.location = location;
+ return this;
+ }
+
+ public AdditionalFieldBuilder compression(String compression) {
+ this.compression = compression;
+ return this;
+ }
+
+ public AdditionalFieldBuilder fileFormatType(String fileFormatType) {
+ this.fileFormatType = fileFormatType;
+ return this;
+ }
+
+ public AdditionalFieldBuilder fileFormatVersion(String fileFormatVersion) {
+ this.fileFormatVersion = fileFormatVersion;
+ return this;
+ }
+
+ public AdditionalField build() {
+ return new AdditionalField(this);
+ }
+ }
+
+ public static class JsonMessageBuilder {
+ private String eventName;
+ private String changeIdentifier;
+ private String changeType;
+ private String notificationFieldsVersion;
+ private List<AdditionalField> arrayOfAdditionalFields = new ArrayList<AdditionalField>();
+
+ public JsonMessageBuilder eventName(String eventName) {
+ this.eventName = eventName;
+ return this;
+ }
+
+ public JsonMessageBuilder changeIdentifier(String changeIdentifier) {
+ this.changeIdentifier = changeIdentifier;
+ return this;
+ }
+
+ public JsonMessageBuilder changeType(String changeType) {
+ this.changeType = changeType;
+ return this;
+ }
+
+ public JsonMessageBuilder notificationFieldsVersion(String notificationFieldsVersion) {
+ this.notificationFieldsVersion = notificationFieldsVersion;
+ return this;
+ }
+
+ public JsonMessageBuilder addAdditionalField(AdditionalField additionalField) {
+ this.arrayOfAdditionalFields.add(additionalField);
+ return this;
+ }
+
+ public JsonMessage build() {
+ return new JsonMessage(this);
+ }
+ }
+
+ private static String getAsStringIfParameterIsSet(String parameterName, String parameterValue,
+ boolean withSeparator) {
+ String result = "";
+ if (parameterValue != null) {
+ result = "\"" + parameterName + "\":\"" + parameterValue + "\"";
+
+ if (withSeparator) {
+ result = result + ",";
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Can be used to produce a correct test Json message. Tip! Check the formatting with
+ * <a href="https://jsonformatter.org/">Json formatter</a>
+ *
+ * @param args Not used
+ */
+ public static void main(String[] args) {
+ AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name("A20161224.1030-1045.bin.gz") //
+ .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz") //
+ .compression("gzip") //
+ .fileFormatType("org.3GPP.32.435#measCollec") //
+ .fileFormatVersion("V10") //
+ .build();
+ AdditionalField secondAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .name("A20161224.1030-1045.bin.gz") //
+ .location("sftp://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz") //
+ .compression("gzip") //
+ .fileFormatType("org.3GPP.32.435#measCollec") //
+ .fileFormatVersion("V10") //
+ .build();
+ JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+ .eventName("Noti_NrRadio-Ericsson_FileReady") //
+ .changeIdentifier("PM_MEAS_FILES") //
+ .changeType("FileReady") //
+ .notificationFieldsVersion("2.0") //
+ .addAdditionalField(additionalField) //
+ .addAdditionalField(secondAdditionalField) //
+ .build();
+ System.out.println(message.toString());
+ }
+}
diff --git a/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java
new file mode 100644
index 0000000..cfcb7bf
--- /dev/null
+++ b/datafilecollector/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java
@@ -0,0 +1,56 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.utils;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
+import org.slf4j.LoggerFactory;
+
+public class LoggingUtils {
+
+ /**
+ * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+ */
+ public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass) {
+ return getLogListAppender(logClass, false);
+ }
+
+ /**
+ * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+ *
+ * @param logClass class whose appender is wanted.
+ * @param allLevels true if all log levels should be activated.
+ */
+ public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass, boolean allLevels) {
+ Logger logger = (Logger) LoggerFactory.getLogger(logClass);
+ if (allLevels) {
+ logger.setLevel(Level.ALL);
+ }
+ ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
+ listAppender.start();
+ logger.addAppender(listAppender);
+
+ return listAppender;
+ }
+}
diff --git a/datafilecollector/src/test/resources/cert.jks b/datafilecollector/src/test/resources/cert.jks
new file mode 100755
index 0000000..ff0e95c
--- /dev/null
+++ b/datafilecollector/src/test/resources/cert.jks
Binary files differ
diff --git a/datafilecollector/src/test/resources/dfc.jks b/datafilecollector/src/test/resources/dfc.jks
new file mode 100644
index 0000000..cdd1191
--- /dev/null
+++ b/datafilecollector/src/test/resources/dfc.jks
Binary files differ
diff --git a/datafilecollector/src/test/resources/dfc.jks.pass b/datafilecollector/src/test/resources/dfc.jks.pass
new file mode 100644
index 0000000..d97c5ea
--- /dev/null
+++ b/datafilecollector/src/test/resources/dfc.jks.pass
@@ -0,0 +1 @@
+secret
diff --git a/datafilecollector/src/test/resources/ftp.jks.pass b/datafilecollector/src/test/resources/ftp.jks.pass
new file mode 100644
index 0000000..d97c5ea
--- /dev/null
+++ b/datafilecollector/src/test/resources/ftp.jks.pass
@@ -0,0 +1 @@
+secret
diff --git a/datafilecollector/src/test/resources/jks.pass b/datafilecollector/src/test/resources/jks.pass
new file mode 100755
index 0000000..b2c3df4
--- /dev/null
+++ b/datafilecollector/src/test/resources/jks.pass
@@ -0,0 +1 @@
+hD:!w:CxF]lGvM6Mz9l^j[7U
\ No newline at end of file
diff --git a/datafilecollector/src/test/resources/keystore.p12 b/datafilecollector/src/test/resources/keystore.p12
new file mode 100644
index 0000000..b847707
--- /dev/null
+++ b/datafilecollector/src/test/resources/keystore.p12
Binary files differ
diff --git a/datafilecollector/src/test/resources/keystore.pass b/datafilecollector/src/test/resources/keystore.pass
new file mode 100644
index 0000000..1e7befc
--- /dev/null
+++ b/datafilecollector/src/test/resources/keystore.pass
@@ -0,0 +1 @@
+HVpAf0kHGl4P#fdpblJLka6b
\ No newline at end of file
diff --git a/datafilecollector/src/test/resources/trust.jks b/datafilecollector/src/test/resources/trust.jks
new file mode 100755
index 0000000..fc62ad2
--- /dev/null
+++ b/datafilecollector/src/test/resources/trust.jks
Binary files differ
diff --git a/datafilecollector/src/test/resources/trust.pass b/datafilecollector/src/test/resources/trust.pass
new file mode 100755
index 0000000..047a411
--- /dev/null
+++ b/datafilecollector/src/test/resources/trust.pass
@@ -0,0 +1 @@
+jeQ2l]iyB62D{WbSHL]dN*8R
\ No newline at end of file