Implement support for new ProducerOptions

Following options will be supported: version minor, version major.
This allows to create Wire Transfer Protocol with versions
provided by sdk library client.

Change-Id: I604bb6e47486910a87f7d148a9d721653f69a8a5
Issue-ID: DCAEGEN2-1171
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java
index 921db52..98c77c5 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java
@@ -51,10 +51,25 @@
     @Nullable
     SecurityKeys securityKeys();
 
+    /**
+     * Version of Wire Transfer Protocol interface frame
+     *
+     * @return Version of interface frame
+     * @since 1.1.1
+     */
+    @NotNull
+    @Value.Default
+    default WireFrameVersion wireFrameVersion() {
+        return ImmutableWireFrameVersion.builder().build();
+    }
+
+
     @Value.Check
     default void validate() {
         if (collectorAddresses().isEmpty()) {
             throw new IllegalArgumentException("address list cannot be empty");
         }
+
     }
+
 }
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java
new file mode 100644
index 0000000..98a0386
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import org.immutables.value.Value;
+
+@Value.Immutable
+public interface WireFrameVersion {
+    short SUPPORTED_VERSION_MAJOR = 0x01;
+    short SUPPORTED_VERSION_MINOR = 0x00;
+    /***
+     * Major version of Wire Transfer Protocol interface frame
+     * @return major version of interface frame
+     * @since 1.1.1
+     */
+    @Value.Default
+    @Value.Parameter
+    default short major() {
+        return SUPPORTED_VERSION_MAJOR;
+    }
+
+    /***
+     * Minor version of Wire Transfer Protocol interface frame
+     * @return minor version of interface frame
+     * @since 1.1.1
+     */
+    @Value.Default
+    @Value.Parameter
+    default short minor() {
+        return SUPPORTED_VERSION_MINOR;
+    }
+
+    @Value.Check
+    default void validate() {
+        if (!(major() > 0 && minor() >=0)) {
+            throw new IllegalArgumentException("Invalid version");
+        }
+    }
+
+}
+
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
index ddc87bc..121b940 100644
--- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
+++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
@@ -22,16 +22,20 @@
 import io.netty.buffer.ByteBuf;
 import io.vavr.collection.HashSet;
 import io.vavr.control.Try;
+
 import java.net.InetSocketAddress;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
+
 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
 import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions.Builder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import reactor.core.publisher.Flux;
 
@@ -69,8 +73,10 @@
 
     public void start(ImmutableProducerOptions.Builder optionsBuilder) {
         InetSocketAddress collectorAddress = collector.start();
+        WireFrameVersion WTPVersion = ImmutableWireFrameVersion.builder().build();
         cut = HvVesProducerFactory.create(
-                optionsBuilder.collectorAddresses(HashSet.of(collectorAddress)).build());
+                optionsBuilder.collectorAddresses(HashSet.of(collectorAddress))
+                        .wireFrameVersion(WTPVersion).build());
     }
 
     public void stop() {
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
index ab10088..e65f246 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
@@ -41,8 +41,8 @@
     @Override
     protected @NotNull HvVesProducer createProducer(ProducerOptions options) {
         TcpClient tcpClient = TcpClient.create()
-            .addressSupplier(() -> options.collectorAddresses().head());
-        ProducerCore producerCore = new ProducerCore(new EncodersFactory());
+                .addressSupplier(() -> options.collectorAddresses().head());
+        ProducerCore producerCore = new ProducerCore(new EncodersFactory(), options.wireFrameVersion());
 
         if (options.securityKeys() == null) {
             LOGGER.warn("Using insecure connection");
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
index 3000e3d..49d54fe 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
@@ -25,6 +25,7 @@
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.Flux;
@@ -36,13 +37,15 @@
 public class ProducerCore {
 
     private final EncodersFactory encodersFactory;
+    private final WireFrameVersion wireFrameVersion;
 
-    public ProducerCore(EncodersFactory encodersFactory) {
+    public ProducerCore(EncodersFactory encodersFactory, WireFrameVersion wireFrameVersion) {
         this.encodersFactory = encodersFactory;
+        this.wireFrameVersion = wireFrameVersion;
     }
 
     public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) {
-        final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator);
+        final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion);
         final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
         return Flux.from(messages)
                 .map(protobufEncoder::encode)
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
index fbe8ea3..24da3b1 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
@@ -20,6 +20,7 @@
 package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
 
 import io.netty.buffer.ByteBufAllocator;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 
 public class EncodersFactory {
 
@@ -27,7 +28,8 @@
         return new ProtobufEncoder();
     }
 
-    public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator) {
-        return new WireFrameEncoder(allocator);
+    public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator,
+                                                   WireFrameVersion wireFrameVersion) {
+        return new WireFrameEncoder(allocator, wireFrameVersion);
     }
 }
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
index a946cea..29e3347 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
@@ -23,6 +23,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.vavr.control.Try;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +35,6 @@
 public class WireFrameEncoder {
     private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class);
     private static final short MARKER_BYTE = 0xAA;
-    private static final short SUPPORTED_VERSION_MAJOR = 0x01;
-    private static final short SUPPORTED_VERSION_MINOR = 0x00;
     private static final int RESERVED_BYTES_COUNT = 3;
     private static final int HEADER_SIZE = 1 * Byte.BYTES +         // marker
             2 * Byte.BYTES +                                        // single byte fields (versions)
@@ -44,9 +43,11 @@
             1 * Integer.BYTES;                                      // payload length
 
     private final ByteBufAllocator allocator;
+    private final WireFrameVersion wireFrameVersion;
 
-    public WireFrameEncoder(ByteBufAllocator allocator) {
+    public WireFrameEncoder(ByteBufAllocator allocator, WireFrameVersion wireFrameVersion) {
         this.allocator = allocator;
+        this.wireFrameVersion = wireFrameVersion;
     }
 
     public Try<ByteBuf> encode(ByteBuffer payload) {
@@ -72,8 +73,8 @@
 
     private void writeBasicWTPFrameHeaderBeginning(ByteBuf encodedMessage) {
         encodedMessage.writeByte(MARKER_BYTE);
-        encodedMessage.writeByte(SUPPORTED_VERSION_MAJOR);
-        encodedMessage.writeByte(SUPPORTED_VERSION_MINOR);
+        encodedMessage.writeByte(wireFrameVersion.major());
+        encodedMessage.writeByte(wireFrameVersion.minor());
         encodedMessage.writeZero(RESERVED_BYTES_COUNT);
     }
 
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
index 02cc6e5..b79b0cf 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
@@ -36,6 +36,7 @@
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 import org.onap.ves.VesEventOuterClass.VesEvent;
 import reactor.core.publisher.Flux;
 
@@ -48,11 +49,13 @@
 
     private ProducerCore producerCore;
     private EncodersFactory encodersFactoryMock;
+    private WireFrameVersion wireFrameVersion;
 
     @BeforeEach
     public void setUp() {
         encodersFactoryMock = mock(EncodersFactory.class);
-        producerCore = new ProducerCore(encodersFactoryMock);
+        wireFrameVersion = mock(WireFrameVersion.class);
+        producerCore = new ProducerCore(encodersFactoryMock, wireFrameVersion);
     }
 
     @Test
@@ -65,7 +68,8 @@
         when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer);
         when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer);
         when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder);
-        when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT)).thenReturn(wireFrameEncoder);
+        when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)).
+                thenReturn(wireFrameEncoder);
 
         // given
         final int messageStreamSize = 2;
@@ -76,7 +80,7 @@
 
         // then
         verify(encodersFactoryMock).createProtobufEncoder();
-        verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT);
+        verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
         verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class));
         verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer);
 
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
index 3065db2..81b5cdd 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
@@ -24,6 +24,8 @@
 
 import io.netty.buffer.ByteBufAllocator;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 
 /**
  * @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
@@ -36,7 +38,9 @@
     public void factory_methods_should_create_non_null_encoders_objects() {
         // when
         final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
-        final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT);
+        final WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.builder().build();
+        final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT,
+                wireFrameVersion);
 
         // then
         assertThat(protobufEncoder).isNotNull();
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
index a0a67d9..d79d0dc 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
@@ -26,6 +26,8 @@
 import org.junit.jupiter.api.Test;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
 
 import java.nio.ByteBuffer;
 
@@ -33,6 +35,8 @@
     private static final byte MARKER_BYTE = (byte) 0xAA;
     private static final byte SUPPORTED_VERSION_MAJOR = (byte) 0x01;
     private static final byte SUPPORTED_VERSION_MINOR = (byte) 0x00;
+    private static final byte SAMPLE_VERSION_MAJOR = (byte) 0x02;
+    private static final byte SAMPLE_VERSION_MINOR = (byte) 0x01;
     private static final int RESERVED_BYTES_COUNT = 3;
     private static final int HEADER_SIZE = 1 * Byte.BYTES +         // marker
             2 * Byte.BYTES +                                        // single byte fields (versions)
@@ -40,7 +44,8 @@
             1 * Short.BYTES +                                       // paylaod type
             1 * Integer.BYTES;                                      // payload length
 
-    private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT);
+    private final WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.builder().build();
+    private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
 
     @Test
     void encode_givenNullPayload_shouldThrowEncodingException() {
@@ -81,6 +86,23 @@
         assertAllBytesVerified(actualEncodedBuffer);
     }
 
+    @Test
+    void encode_givenSomePayloadBytes_shouldCreateValidGPBFrameWithSpecifiedWTPVersion() {
+        // given
+        WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.of(SAMPLE_VERSION_MAJOR, SAMPLE_VERSION_MINOR);
+        final WireFrameEncoder encoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
+        final byte[] payloadBytes = new byte[]{0x1A, 0x2B, 0x3C};
+        final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
+
+        // when
+        final Try<ByteBuf> encodedBuffer = encoder.encode(buffer);
+
+        // then
+        assertThat(encodedBuffer.isSuccess()).isTrue();
+        final ByteBuf versionBuffer = encodedBuffer.get();
+        assertValidHeaderBeggining(versionBuffer, SAMPLE_VERSION_MAJOR, SAMPLE_VERSION_MINOR);
+    }
+
     private void assertNextBytesAreInOrder(ByteBuf encodedBuffer, byte... bytes) {
         for (int i = 0; i < bytes.length; i++) {
             assertThat(encodedBuffer.readByte())
@@ -90,10 +112,14 @@
     }
 
     private void assertValidHeaderBeggining(ByteBuf encodedBuffer) {
+        assertValidHeaderBeggining(encodedBuffer, SUPPORTED_VERSION_MAJOR, SUPPORTED_VERSION_MINOR);
+    }
+
+    private void assertValidHeaderBeggining(ByteBuf encodedBuffer, byte majorWTPVersion, byte minorWTPVersion) {
         assertNextBytesAreInOrder(encodedBuffer,
                 MARKER_BYTE,
-                SUPPORTED_VERSION_MAJOR,
-                SUPPORTED_VERSION_MINOR);
+                majorWTPVersion,
+                minorWTPVersion);
     }
 
     private void assertBufferSizeIs(ByteBuf encodedBuffer, int headerSize) {