Implement support for new ProducerOptions
Following options will be supported: version minor, version major.
This allows to create Wire Transfer Protocol with versions
provided by sdk library client.
Change-Id: I604bb6e47486910a87f7d148a9d721653f69a8a5
Issue-ID: DCAEGEN2-1171
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java
index 921db52..98c77c5 100644
--- a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/ProducerOptions.java
@@ -51,10 +51,25 @@
@Nullable
SecurityKeys securityKeys();
+ /**
+ * Version of Wire Transfer Protocol interface frame
+ *
+ * @return Version of interface frame
+ * @since 1.1.1
+ */
+ @NotNull
+ @Value.Default
+ default WireFrameVersion wireFrameVersion() {
+ return ImmutableWireFrameVersion.builder().build();
+ }
+
+
@Value.Check
default void validate() {
if (collectorAddresses().isEmpty()) {
throw new IllegalArgumentException("address list cannot be empty");
}
+
}
+
}
diff --git a/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java
new file mode 100644
index 0000000..98a0386
--- /dev/null
+++ b/services/hv-ves-client/producer/api/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/api/options/WireFrameVersion.java
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * DCAEGEN2-SERVICES-SDK
+ * ================================================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options;
+
+import org.immutables.value.Value;
+
+@Value.Immutable
+public interface WireFrameVersion {
+ short SUPPORTED_VERSION_MAJOR = 0x01;
+ short SUPPORTED_VERSION_MINOR = 0x00;
+ /***
+ * Major version of Wire Transfer Protocol interface frame
+ * @return major version of interface frame
+ * @since 1.1.1
+ */
+ @Value.Default
+ @Value.Parameter
+ default short major() {
+ return SUPPORTED_VERSION_MAJOR;
+ }
+
+ /***
+ * Minor version of Wire Transfer Protocol interface frame
+ * @return minor version of interface frame
+ * @since 1.1.1
+ */
+ @Value.Default
+ @Value.Parameter
+ default short minor() {
+ return SUPPORTED_VERSION_MINOR;
+ }
+
+ @Value.Check
+ default void validate() {
+ if (!(major() > 0 && minor() >=0)) {
+ throw new IllegalArgumentException("Invalid version");
+ }
+ }
+
+}
+
diff --git a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
index ddc87bc..121b940 100644
--- a/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
+++ b/services/hv-ves-client/producer/ct/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/ct/SystemUnderTestWrapper.java
@@ -22,16 +22,20 @@
import io.netty.buffer.ByteBuf;
import io.vavr.collection.HashSet;
import io.vavr.control.Try;
+
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+
import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions.Builder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
@@ -69,8 +73,10 @@
public void start(ImmutableProducerOptions.Builder optionsBuilder) {
InetSocketAddress collectorAddress = collector.start();
+ WireFrameVersion WTPVersion = ImmutableWireFrameVersion.builder().build();
cut = HvVesProducerFactory.create(
- optionsBuilder.collectorAddresses(HashSet.of(collectorAddress)).build());
+ optionsBuilder.collectorAddresses(HashSet.of(collectorAddress))
+ .wireFrameVersion(WTPVersion).build());
}
public void stop() {
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
index ab10088..e65f246 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/HvVesProducerFactoryImpl.java
@@ -41,8 +41,8 @@
@Override
protected @NotNull HvVesProducer createProducer(ProducerOptions options) {
TcpClient tcpClient = TcpClient.create()
- .addressSupplier(() -> options.collectorAddresses().head());
- ProducerCore producerCore = new ProducerCore(new EncodersFactory());
+ .addressSupplier(() -> options.collectorAddresses().head());
+ ProducerCore producerCore = new ProducerCore(new EncodersFactory(), options.wireFrameVersion());
if (options.securityKeys() == null) {
LOGGER.warn("Using insecure connection");
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
index 3000e3d..49d54fe 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCore.java
@@ -25,6 +25,7 @@
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
import org.onap.ves.VesEventOuterClass.VesEvent;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -36,13 +37,15 @@
public class ProducerCore {
private final EncodersFactory encodersFactory;
+ private final WireFrameVersion wireFrameVersion;
- public ProducerCore(EncodersFactory encodersFactory) {
+ public ProducerCore(EncodersFactory encodersFactory, WireFrameVersion wireFrameVersion) {
this.encodersFactory = encodersFactory;
+ this.wireFrameVersion = wireFrameVersion;
}
public Flux<ByteBuf> encode(Publisher<VesEvent> messages, ByteBufAllocator allocator) {
- final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator);
+ final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(allocator, wireFrameVersion);
final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
return Flux.from(messages)
.map(protobufEncoder::encode)
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
index fbe8ea3..24da3b1 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactory.java
@@ -20,6 +20,7 @@
package org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders;
import io.netty.buffer.ByteBufAllocator;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
public class EncodersFactory {
@@ -27,7 +28,8 @@
return new ProtobufEncoder();
}
- public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator) {
- return new WireFrameEncoder(allocator);
+ public WireFrameEncoder createWireFrameEncoder(ByteBufAllocator allocator,
+ WireFrameVersion wireFrameVersion) {
+ return new WireFrameEncoder(allocator, wireFrameVersion);
}
}
diff --git a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
index a946cea..29e3347 100644
--- a/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
+++ b/services/hv-ves-client/producer/impl/src/main/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoder.java
@@ -23,6 +23,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.vavr.control.Try;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +35,6 @@
public class WireFrameEncoder {
private static final Logger LOGGER = LoggerFactory.getLogger(WireFrameEncoder.class);
private static final short MARKER_BYTE = 0xAA;
- private static final short SUPPORTED_VERSION_MAJOR = 0x01;
- private static final short SUPPORTED_VERSION_MINOR = 0x00;
private static final int RESERVED_BYTES_COUNT = 3;
private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker
2 * Byte.BYTES + // single byte fields (versions)
@@ -44,9 +43,11 @@
1 * Integer.BYTES; // payload length
private final ByteBufAllocator allocator;
+ private final WireFrameVersion wireFrameVersion;
- public WireFrameEncoder(ByteBufAllocator allocator) {
+ public WireFrameEncoder(ByteBufAllocator allocator, WireFrameVersion wireFrameVersion) {
this.allocator = allocator;
+ this.wireFrameVersion = wireFrameVersion;
}
public Try<ByteBuf> encode(ByteBuffer payload) {
@@ -72,8 +73,8 @@
private void writeBasicWTPFrameHeaderBeginning(ByteBuf encodedMessage) {
encodedMessage.writeByte(MARKER_BYTE);
- encodedMessage.writeByte(SUPPORTED_VERSION_MAJOR);
- encodedMessage.writeByte(SUPPORTED_VERSION_MINOR);
+ encodedMessage.writeByte(wireFrameVersion.major());
+ encodedMessage.writeByte(wireFrameVersion.minor());
encodedMessage.writeZero(RESERVED_BYTES_COUNT);
}
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
index 02cc6e5..b79b0cf 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/ProducerCoreTest.java
@@ -36,6 +36,7 @@
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.EncodersFactory;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.ProtobufEncoder;
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.impl.encoders.WireFrameEncoder;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
import org.onap.ves.VesEventOuterClass.VesEvent;
import reactor.core.publisher.Flux;
@@ -48,11 +49,13 @@
private ProducerCore producerCore;
private EncodersFactory encodersFactoryMock;
+ private WireFrameVersion wireFrameVersion;
@BeforeEach
public void setUp() {
encodersFactoryMock = mock(EncodersFactory.class);
- producerCore = new ProducerCore(encodersFactoryMock);
+ wireFrameVersion = mock(WireFrameVersion.class);
+ producerCore = new ProducerCore(encodersFactoryMock, wireFrameVersion);
}
@Test
@@ -65,7 +68,8 @@
when(protobufEncoder.encode(any(VesEvent.class))).thenReturn(protoBuffer);
when(wireFrameEncoder.encode(protoBuffer)).thenReturn(wireFrameBuffer);
when(encodersFactoryMock.createProtobufEncoder()).thenReturn(protobufEncoder);
- when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT)).thenReturn(wireFrameEncoder);
+ when(encodersFactoryMock.createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion)).
+ thenReturn(wireFrameEncoder);
// given
final int messageStreamSize = 2;
@@ -76,7 +80,7 @@
// then
verify(encodersFactoryMock).createProtobufEncoder();
- verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT);
+ verify(encodersFactoryMock).createWireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
verify(protobufEncoder, times(messageStreamSize)).encode(any(VesEvent.class));
verify(wireFrameEncoder, times(messageStreamSize)).encode(protoBuffer);
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
index 3065db2..81b5cdd 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/EncodersFactoryTest.java
@@ -24,6 +24,8 @@
import io.netty.buffer.ByteBufAllocator;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
/**
* @author <a href="mailto:jakub.dudycz@nokia.com">Jakub Dudycz</a>
@@ -36,7 +38,9 @@
public void factory_methods_should_create_non_null_encoders_objects() {
// when
final ProtobufEncoder protobufEncoder = encodersFactory.createProtobufEncoder();
- final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT);
+ final WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.builder().build();
+ final WireFrameEncoder wireFrameEncoder = encodersFactory.createWireFrameEncoder(ByteBufAllocator.DEFAULT,
+ wireFrameVersion);
// then
assertThat(protobufEncoder).isNotNull();
diff --git a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
index a0a67d9..d79d0dc 100644
--- a/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
+++ b/services/hv-ves-client/producer/impl/src/test/java/org/onap/dcaegen2/services/sdk/services/hvves/client/producer/impl/encoders/WireFrameEncoderTest.java
@@ -26,6 +26,8 @@
import org.junit.jupiter.api.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion;
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion;
import java.nio.ByteBuffer;
@@ -33,6 +35,8 @@
private static final byte MARKER_BYTE = (byte) 0xAA;
private static final byte SUPPORTED_VERSION_MAJOR = (byte) 0x01;
private static final byte SUPPORTED_VERSION_MINOR = (byte) 0x00;
+ private static final byte SAMPLE_VERSION_MAJOR = (byte) 0x02;
+ private static final byte SAMPLE_VERSION_MINOR = (byte) 0x01;
private static final int RESERVED_BYTES_COUNT = 3;
private static final int HEADER_SIZE = 1 * Byte.BYTES + // marker
2 * Byte.BYTES + // single byte fields (versions)
@@ -40,7 +44,8 @@
1 * Short.BYTES + // paylaod type
1 * Integer.BYTES; // payload length
- private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT);
+ private final WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.builder().build();
+ private final WireFrameEncoder wireFrameEncoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
@Test
void encode_givenNullPayload_shouldThrowEncodingException() {
@@ -81,6 +86,23 @@
assertAllBytesVerified(actualEncodedBuffer);
}
+ @Test
+ void encode_givenSomePayloadBytes_shouldCreateValidGPBFrameWithSpecifiedWTPVersion() {
+ // given
+ WireFrameVersion wireFrameVersion = ImmutableWireFrameVersion.of(SAMPLE_VERSION_MAJOR, SAMPLE_VERSION_MINOR);
+ final WireFrameEncoder encoder = new WireFrameEncoder(ByteBufAllocator.DEFAULT, wireFrameVersion);
+ final byte[] payloadBytes = new byte[]{0x1A, 0x2B, 0x3C};
+ final ByteBuffer buffer = ByteBuffer.wrap(payloadBytes);
+
+ // when
+ final Try<ByteBuf> encodedBuffer = encoder.encode(buffer);
+
+ // then
+ assertThat(encodedBuffer.isSuccess()).isTrue();
+ final ByteBuf versionBuffer = encodedBuffer.get();
+ assertValidHeaderBeggining(versionBuffer, SAMPLE_VERSION_MAJOR, SAMPLE_VERSION_MINOR);
+ }
+
private void assertNextBytesAreInOrder(ByteBuf encodedBuffer, byte... bytes) {
for (int i = 0; i < bytes.length; i++) {
assertThat(encodedBuffer.readByte())
@@ -90,10 +112,14 @@
}
private void assertValidHeaderBeggining(ByteBuf encodedBuffer) {
+ assertValidHeaderBeggining(encodedBuffer, SUPPORTED_VERSION_MAJOR, SUPPORTED_VERSION_MINOR);
+ }
+
+ private void assertValidHeaderBeggining(ByteBuf encodedBuffer, byte majorWTPVersion, byte minorWTPVersion) {
assertNextBytesAreInOrder(encodedBuffer,
MARKER_BYTE,
- SUPPORTED_VERSION_MAJOR,
- SUPPORTED_VERSION_MINOR);
+ majorWTPVersion,
+ minorWTPVersion);
}
private void assertBufferSizeIs(ByteBuf encodedBuffer, int headerSize) {