Initial add of APPC client libraries

Issue-ID: APPC-180
Change-Id: Ie0be2b518b90bb7f9996e9260c43fef75d1a5821
Signed-off-by: Skip Wonnell <kw5258@att.com>
diff --git a/appc-client/client-lib/.gitignore b/appc-client/client-lib/.gitignore
new file mode 100644
index 0000000..1b08ea4
--- /dev/null
+++ b/appc-client/client-lib/.gitignore
@@ -0,0 +1,2 @@
+/target/
+client-lib/logs/EELF/application.log
diff --git a/appc-client/client-lib/pom.xml b/appc-client/client-lib/pom.xml
new file mode 100644
index 0000000..b51118a
--- /dev/null
+++ b/appc-client/client-lib/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>

+<!--

+  ============LICENSE_START=======================================================

+  ONAP : APPC

+  ================================================================================

+  Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.

+  ================================================================================

+  Copyright (C) 2017 Amdocs

+  =============================================================================

+  Licensed under the Apache License, Version 2.0 (the "License");

+  you may not use this file except in compliance with the License.

+  You may obtain a copy of the License at

+

+       http://www.apache.org/licenses/LICENSE-2.0

+

+  Unless required by applicable law or agreed to in writing, software

+  distributed under the License is distributed on an "AS IS" BASIS,

+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+  See the License for the specific language governing permissions and

+  limitations under the License.

+

+  ECOMP is a trademark and service mark of AT&T Intellectual Property.

+  ============LICENSE_END=========================================================

+  -->

+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

+    <modelVersion>4.0.0</modelVersion>

+    <parent>

+        <artifactId>appc-java-client</artifactId>

+        <groupId>org.openecomp.appc.client</groupId>

+        <version>1.1.0-SNAPSHOT</version>

+    </parent>

+

+    <artifactId>client-lib</artifactId>

+    <properties>

+            <licenseDir>${project.parent.parent.basedir}</licenseDir>

+    </properties>

+

+    <dependencies>

+        <dependency>

+            <groupId>com.fasterxml.jackson.core</groupId>

+            <artifactId>jackson-databind</artifactId>

+            <version>${jackson.version}</version>

+        </dependency>

+        <dependency>

+            <groupId>com.fasterxml.jackson.core</groupId>

+            <artifactId>jackson-core</artifactId>

+            <version>${jackson.version}</version>

+        </dependency>

+        <dependency>

+            <groupId>com.fasterxml.jackson.core</groupId>

+            <artifactId>jackson-annotations</artifactId>

+            <version>${jackson.version}</version>

+        </dependency>

+        <dependency>

+                <groupId>com.att.nsa</groupId>

+                <artifactId>cambriaClient</artifactId>

+        </dependency>

+        <dependency>

+            <groupId>com.att.eelf</groupId>

+            <artifactId>eelf-core</artifactId>

+        </dependency>

+    </dependencies>

+</project>

diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java
new file mode 100644
index 0000000..82d6319
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AbstractRequestResponseHandler.java
@@ -0,0 +1,85 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/** Abstract request response handler class, responsible for common functionality of
+ * @{@link AsyncRequestResponseHandler} and @{@link SyncRequestResponseHandler}
+ */
+abstract class AbstractRequestResponseHandler implements RequestResponseHandler {
+
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(AbstractRequestResponseHandler.class);
+    ICoreResponseHandler businessCallback;
+    protected String corrID;
+    CoreManager coreManager;
+
+
+    AbstractRequestResponseHandler(String corrID,
+                                   ICoreResponseHandler businessCallback,
+                                   CoreManager coreManager)
+    {
+        this.businessCallback = businessCallback;
+        this.corrID = corrID;
+        this.coreManager = coreManager;
+    }
+
+    public synchronized void handleResponse(final MessageContext ctx, final String response) {
+        try {
+            coreManager.submitTask(ctx.getCorrelationID(), new Runnable() {
+                @Override
+                public void run() {
+                    LOG.info("handling response of corrID <" + corrID + ">" + "response " + response);
+                    if(coreManager.isExistHandler(corrID)) {
+                        runTask(response, ctx.getType());
+                    }
+
+                }
+            });
+        } catch (InterruptedException e) {
+            LOG.error("could not handle response <" + response + "> of corrID <" + corrID + ">", e);
+        }
+    }
+
+    /**
+     *
+     * @param response - Response
+     * @param type - Type of Response
+     */
+    abstract void runTask(String response, String type);
+
+    @Override
+    public void sendRequest(String request, String corrId, String rpcName) throws CoreException {
+        if(!coreManager.isShutdownInProgress()) {
+            coreManager.registerHandler(corrId, this);
+            coreManager.sendRequest(request, corrId, rpcName);
+            coreManager.startTimer(corrId);
+        }else{
+            throw new CoreException("Shutdown is in progress. Request will not be handled");
+        }
+    }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java
new file mode 100644
index 0000000..86756e6
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/AsyncRequestResponseHandler.java
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.concurrent.TimeoutException;
+
+/** Handles async responses
+ */
+class AsyncRequestResponseHandler extends AbstractRequestResponseHandler {
+
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncRequestResponseHandler.class);
+
+    AsyncRequestResponseHandler(String corrID,
+                                ICoreResponseHandler businessCallback,
+                                CoreManager coreManager)
+    {
+        super(corrID, businessCallback, coreManager);
+    }
+
+    /**
+     *  Calls API callback for sending response to consumer's listener. in case of complete response cleans timer and
+     *  unregisters the handler.
+     * @param response - Response
+     * @param type - Type of Response
+     */
+    public void runTask(String response, String type) {
+        boolean finalTask = false;
+        try {
+            finalTask = ((ICoreAsyncResponseHandler) businessCallback).onResponse(response, type);
+        } catch (Exception e){
+            LOG.error("Error on API layer, for request with correlation-id " + corrID,  e);
+        }
+        if (finalTask){
+            coreManager.cancelTimer(corrID);
+            coreManager.unregisterHandler(corrID);
+        }
+        else{
+            response = null;
+            type = null;
+        }
+    }
+
+    /**
+     * Calls to API layer for sending timeout exception.
+     */
+    @Override
+    public void onTimeOut() {
+        LOG.info("timeout for request with correlation-id " + corrID);
+        ((ICoreAsyncResponseHandler)businessCallback).onException(new TimeoutException("timeout for request with correlation-id " + corrID));
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java
new file mode 100644
index 0000000..009a1f4
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreException.java
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+
+public class CoreException extends Exception {
+
+    public CoreException() {
+        super();
+    }
+
+    public CoreException(String message) {
+        super(message);
+    }
+
+    public CoreException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public CoreException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java
new file mode 100644
index 0000000..f299dc7
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreManager.java
@@ -0,0 +1,314 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import org.openecomp.appc.client.impl.protocol.*;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
+ */
+class CoreManager{
+
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
+    private final ProtocolFactory protocolFactory;
+    protected  AsyncProtocol protocol;
+    private final RetrieveMessageCallback protocolCallback = null;
+    private final CoreRegistry registry;
+    private final ITimerService timerService;
+    private final TaskQueueManager queueManager;
+    private String DEFAULT_TIMEOUT = "300000";
+    private final static String RESPONSE_TIMEOUT = "client.response.timeout";
+    private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
+    private boolean isForceShutdown = false;
+    private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
+    private long shutdownTimeout;
+
+    CoreManager(Properties prop) throws CoreException {
+        protocolFactory = ProtocolFactory.getInstance();
+        try {
+            initProtocol(prop);
+        }catch (ProtocolException e){
+            throw new CoreException(e);
+        }
+        registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
+        String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
+        long responseTimeout = Long.parseLong(timeoutProp);
+        String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
+        shutdownTimeout = Long.parseLong(gracefulTimeout);
+        timerService = new TimerServiceImpl(responseTimeout);
+        queueManager = new TaskQueueManager(prop);
+        listenShutdown();
+    }
+
+    /**
+     * initiates protocol layer services.
+     * @param prop - Properties
+     */
+    private void initProtocol(Properties prop) throws ProtocolException {
+        protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
+        protocol.init(prop, getProtocolCallback());
+    }
+
+    /**
+     * Creates protocol response callback
+     * @return - @{@link ProtocolResponseCallbackImpl}
+     */
+    RetrieveMessageCallback getProtocolCallback(){
+        return new ProtocolResponseCallbackImpl();
+    }
+
+    /**
+     * Registers a new handler in registry
+     * @param corrID - Correlation ID
+     * @param requestResponseHandler handler to be called when response arrives
+     */
+    void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
+        registry.register(corrID, requestResponseHandler);
+    }
+
+    /**
+     * Remove a handler from registry service by correlation ID.
+     * @param corrID - Correlation ID
+     * @return - @{@link RequestResponseHandler}
+     */
+    RequestResponseHandler unregisterHandler(String corrID){
+        return (RequestResponseHandler) registry.unregister(corrID);
+    }
+
+    /**
+     * Checks in registry service if a handler is existing.
+     * @param corrID - Correlation ID
+     * @return - boolean
+     */
+    boolean isExistHandler(String corrID) {
+        return registry.isExist(corrID);
+    }
+
+    /**
+     * Starts timer for timeout event when a request was send successfully.
+     * @param corrID - Correlation ID
+     */
+    void startTimer(String corrID){
+        timerService.add(corrID, new TimeoutHandlerImpl(corrID));
+    }
+
+    /**
+     * Cancels timer for fimeout event, in case when complete response was received
+     * @param corrID
+     */
+    void cancelTimer(String corrID){
+        timerService.cancel(corrID);
+    }
+
+    /**
+     * Submits a new task to Queue manager. it is using for both response and timeout tasks
+     * @param corrID - Correlation ID
+     * @param task - @{@link Runnable} task.
+     * @throws InterruptedException
+     */
+    void submitTask(String corrID, Runnable task) throws InterruptedException {
+        queueManager.submit(corrID, task);
+    }
+
+    /**
+     * Sends request to protocol.
+     * @param request - Request
+     * @param corrId - Correlation ID
+     * @param rpcName - RPC name
+     * @throws CoreException - @{@link CoreException}
+     */
+    void sendRequest(String request, String corrId, String rpcName) throws CoreException {
+        MessageContext ctx = getMessageContext(corrId, rpcName);
+        try {
+            protocol.sendRequest(request, ctx);
+        } catch (ProtocolException e) {
+            unregisterHandler(corrId);
+            throw new CoreException(e);
+        }
+    }
+
+    /**
+     * Creates @{@link MessageContext}
+     * @param correlationId - Correlation ID
+     * @param rpcName - RPC Name
+     * @return - @{@link MessageContext}
+     */
+    private MessageContext getMessageContext(String correlationId, String rpcName){
+        MessageContext msgCtx = new MessageContext();
+        msgCtx.setCorrelationID(correlationId);
+        msgCtx.setRpc(rpcName);
+        return msgCtx;
+    }
+
+    /**
+     * Implements response callback from protocol and filters responses by correlation ID.
+     * Only registered events(by correlation ID) will be handled.
+     */
+    private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
+        @Override
+        public void onResponse(String response, MessageContext context) {
+            String corrID = context.getCorrelationID();
+            if (corrID != null) {
+                RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
+                if (messageHandler != null) {
+                    LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
+                    messageHandler.handleResponse(context, response);
+                }
+            }
+        }
+    }
+
+
+    /**
+     * listens to @{@link Runtime} shutdown event
+     */
+    private void listenShutdown() {
+        Runtime.getRuntime().addShutdownHook(new Thread(){
+            public void run(){
+                gracefulShutdown();
+            }
+        });
+    }
+
+    /**
+     * Implements shutdown for client library.
+     * @param isForceShutdown - true force shutdown, false graceful shutdown
+     */
+    void shutdown(boolean isForceShutdown){
+        if(isForceShutdown){
+            forceShutdown();
+        }else{
+            gracefulShutdown();
+        }
+    }
+
+    /**
+     * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
+     * shutdown only when either all request will be handled or graceful shutdown will be time out.
+     */
+    synchronized void gracefulShutdown(){
+        isGracefulShutdown.set(true);
+        if(registry.isEmpty()){
+            forceShutdown();
+        }
+        else{
+            try {
+                LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
+                wait(shutdownTimeout);
+                LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
+                forceShutdown();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+        }
+    }
+
+    /**
+     * Closes Protocol, stops Queue Manager and shutdowns Time Service.
+     */
+    private void forceShutdown(){
+        isForceShutdown = true;
+        try {
+            LOG.info("Starting shutdown process.");
+            protocol.shutdown();
+            queueManager.stopQueueManager();
+            timerService.shutdown();
+        } catch (InterruptedException e) {
+            LOG.info("Client library shutdown in progress ", e);
+        }
+    }
+
+    /**
+     *
+     * @return - true when shutdown is in process
+     */
+    boolean isShutdownInProgress(){
+        return isForceShutdown || isGracefulShutdown.get();
+    }
+
+    /**
+     * Timeout handler implementation.
+     * This handler is responsible to assign a task for handling of timeout events.
+     *
+     */
+    private class TimeoutHandlerImpl implements ITimeoutHandler {
+
+        private final String corrID;
+
+        TimeoutHandlerImpl(String corrID) {
+            this.corrID = corrID;
+        }
+
+        /**
+         * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
+         * this queue is shared between both timeout and handlers which belong to same correlation ID.
+         */
+        @Override
+        public void onTimeout() {
+            try {
+                submitTask(corrID, new Runnable() {
+                    @Override
+                    public void run() {
+                        RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
+                        if (requestResponseHandler != null) {
+                            requestResponseHandler.onTimeOut();
+                        }
+                    }
+                });
+            } catch (InterruptedException e) {
+                LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
+            }
+        }
+    }
+
+
+    /**
+     * Wakes Up graceful shutdown.
+     */
+    class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
+        @Override
+        public synchronized void emptyCallback() {
+            LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
+            if(isGracefulShutdown.get()){
+                wakeUpShutdown();
+            }
+        }
+    }
+
+    /**
+     * wakes up waiting shutdown.
+     */
+    private synchronized void wakeUpShutdown(){
+        notifyAll();
+    }
+
+}
+
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java
new file mode 100644
index 0000000..8d5d0b7
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/CoreRegistry.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** client lib Registry
+ */
+class CoreRegistry<T>{
+    private Map<String, T> registry =
+            new ConcurrentHashMap<String, T>();
+
+    final private EmptyRegistryCallback emptyRegistryCallback;
+
+
+    CoreRegistry(EmptyRegistryCallback emptyRegistryCallback){
+        this.emptyRegistryCallback = emptyRegistryCallback;
+    }
+
+    void register(String key, T obj) {
+        registry.put(key, obj);
+    }
+
+    <T> T unregister(String key) {
+        T item = (T) registry.remove(key);
+        if(registry.isEmpty()) {
+            emptyRegistryCallback.emptyCallback();
+        }
+        return item;
+    }
+
+    <T> T get(String key){
+        return (T) registry.get(key);
+    }
+
+    synchronized boolean isExist(String key) {
+        return registry.containsKey(key);
+    }
+
+    boolean isEmpty(){
+        return registry.isEmpty();
+    }
+
+    public interface EmptyRegistryCallback{
+        void emptyCallback();
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java
new file mode 100644
index 0000000..23fce28
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreAsyncResponseHandler.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+public interface ICoreAsyncResponseHandler extends  ICoreResponseHandler{
+
+    /**
+     * Core response to incoming message
+     * @param message response accepted from protocol
+     * @param type type of response
+     * @return true if message is final, false otherwise
+     */
+    boolean onResponse(String message, String type);
+
+    /**
+     * Core reaction to an event of exception
+     * @param e the exception which have been thrown
+     */
+    void onException(Exception e);
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java
new file mode 100644
index 0000000..4f0cb8a
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreResponseHandler.java
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+public interface ICoreResponseHandler {
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java
new file mode 100644
index 0000000..3329867
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ICoreSyncResponseHandler.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+public interface ICoreSyncResponseHandler extends ICoreResponseHandler{
+
+    /**
+     * Core response to incoming message, should return completed message only
+     * @param message response accepted from protocol
+     * @param type type of response
+     * @return true if message is final, false otherwise
+     */
+    <T> T onResponse(String message, String type) throws CoreException;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java
new file mode 100644
index 0000000..831263a
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/IInvocationManager.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+/**
+ */
+public interface IInvocationManager {
+
+    /**
+     * initializes the manager
+     * @param prop properties to read from
+     * @throws CoreException thrown if madatory fields are not set right
+     */
+    void init(Properties prop) throws CoreException;
+
+    /**
+     * handles the flow of an async request
+     * @param request the request body
+     * @param listener business response handler
+     * @param correlationId unique id of the request
+     * @param rpcName rpc call name
+     * @throws CoreException thrown if the request failed to be sent
+     */
+    void asyncRequest(String request, ICoreAsyncResponseHandler listener, String correlationId, String rpcName) throws CoreException;
+
+    /**
+     * handles to flow of a sync request
+     * @param request the request body
+     * @param callback business response handler
+     * @param correlationId unique id of the request
+     * @param rpcName rpc call name
+     * @return the output object to be returned
+     * @throws CoreException thrown if the request failed to be sent
+     * @throws TimeoutException thrown if timeout has exceeded
+     */
+    <T> T syncRequest(String request, ICoreSyncResponseHandler callback, String correlationId, String rpcName) throws CoreException, TimeoutException;
+
+    /**
+     * shuts the invocation manager down.
+     * @param isForceShutdown if true, shutdown will be forced, otherwise it will be gracefully
+     */
+    void shutdown(boolean isForceShutdown);
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java
new file mode 100644
index 0000000..e2ec1d6
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimeoutHandler.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+interface ITimeoutHandler {
+
+    /**
+     * handles timeout event
+     */
+    void onTimeout();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java
new file mode 100644
index 0000000..c5a4145
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/ITimerService.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+interface ITimerService {
+
+    /**
+     * add a new timeout handler to a request
+     * @param correlationID the id of the request
+     * @param handler to be called once "timeout' time has arrived
+     */
+    void add(String correlationID, ITimeoutHandler handler);
+
+    /**
+     * cancel the timeout handler of a request
+     * @param correlationID the id of the request
+     */
+    void cancel(String correlationID);
+
+
+    /**
+     * shuts the timer service down immediately
+     */
+    void shutdown();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java
new file mode 100644
index 0000000..0582244
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManager.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * layer for passing requests from API to Core
+ */
+class InvocationManager implements IInvocationManager{
+
+    protected CoreManager coreManager = null;
+
+    InvocationManager(){
+    }
+
+    public void init(Properties properties) throws CoreException {
+        coreManager = new CoreManager(properties);
+    }
+
+    /**
+     *
+     * @param request
+     * @param businessCallback
+     * @param correlationId
+     * @param rpcName
+     * @throws CoreException
+     */
+    public void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException {
+        AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager);
+        requestResponseHandler.sendRequest(request, correlationId, rpcName);
+    }
+
+    public <T> T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException {
+        SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager);
+        requestResponseHandler.sendRequest(request, correlationId, rpcName);
+        T responseObject = (T) requestResponseHandler.getResponse();
+        return responseObject;
+    }
+
+    @Override
+    public void shutdown(boolean isForceShutdown) {
+        coreManager.shutdown(isForceShutdown);
+    }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java
new file mode 100644
index 0000000..32b3ff2
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/InvocationManagerFactory.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+public abstract class InvocationManagerFactory {
+    private static IInvocationManager invocationManager = null;
+
+    public static synchronized IInvocationManager getInstance(){
+        if(invocationManager == null){
+            invocationManager = new InvocationManager();
+        }
+        return invocationManager;
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java
new file mode 100644
index 0000000..abd644c
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/MessageContext.java
@@ -0,0 +1,85 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+/** Helper class for wrapping request/response information.
+ */
+public class MessageContext {
+
+    /**
+     * valid values of type are response/error
+     */
+    private String type;
+
+    /**
+     * RPC name
+     */
+    private String rpc;
+
+    /**
+     * correlation ID
+     */
+    private String correlationID;
+
+    /**
+     * partitioner for message bus usage
+     */
+    private String partitioner;
+
+
+    public String getRpc() {
+        return rpc;
+    }
+
+    public void setRpc(String rpc) {
+        this.rpc = rpc;
+    }
+
+    public String getCorrelationID() {
+        return correlationID;
+    }
+
+    public void setCorrelationID(String correlationID) {
+        this.correlationID = correlationID;
+    }
+
+    public String getPartiton() {
+        return partitioner;
+    }
+
+    public void setPartiton(String partitioner) {
+        this.partitioner = partitioner;
+    }
+
+    public void setType(String type){
+        this.type = type;
+    }
+
+    public String getType(){
+        return type;
+    }
+
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java
new file mode 100644
index 0000000..7264e35
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/RequestResponseHandler.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+interface RequestResponseHandler {
+
+    /**
+     * sends request, registers handler of response and start timer.
+     * @param request - Request
+     * @param corrId - correlation ID
+     * @param rpcName - RPC name
+     * @throws CoreException - @{@link CoreException}
+     */
+    void sendRequest(String request, String corrId, String rpcName) throws CoreException;
+
+    /**
+     * submits a handler task to task queue @{@link TaskQueue}, this task will be performed only if this handler is
+     * still existing in core registry @{@link CoreRegistry}, others timeout was occurred .
+     * @param ctx - Message Context @{@link MessageContext}
+     * @param response - Response from backend
+     */
+    void handleResponse(MessageContext ctx, String response);
+
+    /**
+     * handles timeout event
+     */
+    void onTimeOut();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java
new file mode 100644
index 0000000..e7a6576
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/SyncRequestResponseHandler.java
@@ -0,0 +1,107 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.concurrent.TimeoutException;
+
+/** Handles sync requests
+ */
+class SyncRequestResponseHandler<T> extends AbstractRequestResponseHandler {
+
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(SyncRequestResponseHandler.class);
+    private T responseObject = null;
+    private CoreException coreException = null;
+    private TimeoutException timeoutException = null;
+
+    SyncRequestResponseHandler(String corrID,
+                               ICoreResponseHandler callback,
+                               CoreManager coreManager){
+        super(corrID, callback, coreManager);
+    }
+
+    /**
+     *  Calls API callback for getting response object. in case of complete response notifies consumer
+     *  thread for receiving response
+     * @param response - Response
+     * @param type - Type of Response
+     */
+    synchronized void runTask(String response, String type) {
+        try {
+            responseObject = ((ICoreSyncResponseHandler) businessCallback).onResponse(response, type);
+        } catch (CoreException e) {
+            coreException = e;
+        }
+        if(responseObject != null || coreException != null) {
+            notify();
+        }
+    }
+
+
+    /**
+     * Returns response. goes sleep until coming either timeout event or complete response
+     */
+    public synchronized  <T> T getResponse() throws CoreException, TimeoutException {
+        try{
+            if(!isResponseReceived()){
+                wait();
+            }
+            if (coreException != null) {
+                throw coreException;
+            }
+            if ( timeoutException != null) {
+                throw timeoutException;
+            }
+
+        } catch (InterruptedException e) {
+            throw new CoreException(e);
+        } finally{
+            coreManager.unregisterHandler(corrID);
+            coreManager.cancelTimer(corrID);
+        }
+        return (T) responseObject;
+    }
+
+    /**
+     * indicates if a response received
+     * @return
+     */
+    private boolean isResponseReceived() {
+        return responseObject != null;
+    }
+
+    @Override
+    public synchronized void onTimeOut() {
+        LOG.error("sync response handler on timeout correlation ID <" + corrID + ">.");
+        timeoutException = new TimeoutException("timeout for request with correlation-id " + corrID);
+        notify();
+    }
+
+
+
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java
new file mode 100644
index 0000000..e75d33f
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueue.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/** Responsible to ensure synchronous handling of responses and timouts.
+ */
+class TaskQueue implements Runnable{
+
+    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueue.class);
+
+    private boolean isShutdown;
+
+    synchronized void addTask(Runnable task) throws InterruptedException {
+            queue.put(task);
+    }
+
+    public void run() {
+        Runnable task;
+        while(!Thread.currentThread().isInterrupted() && !isShutdown){
+            try {
+                task = queue.take();
+                task.run();
+            } catch (InterruptedException e) {
+                LOG.error("could not take task from queue", e);
+            } catch (RuntimeException e) {
+                LOG.error("could not run task", e);
+            }
+            LOG.info("THR# <" + Thread.currentThread().getId() + "> shutdown indicator " + isShutdown);
+        }
+        LOG.info("THR# <" + Thread.currentThread().getId() + "> in shutdown process.");
+    }
+
+    void stopQueue(){
+        isShutdown = true;
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java
new file mode 100644
index 0000000..1d1fc15
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TaskQueueManager.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Creates a task queue pool that reuses a fixed number of threads.
+ * Assigns one thread for each queue.
+ */
+class TaskQueueManager {
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(TaskQueueManager.class);
+    private ExecutorService executorService;
+    private final static String DEFAULT_POOL_SIZE = "10";
+    private final static String CLIENT_POOL_SIZE = "client.pool.size";
+    private TaskQueue[] queues;
+    private int poolInt;
+
+    TaskQueueManager(Properties properties){
+        String size = properties.getProperty(CLIENT_POOL_SIZE, DEFAULT_POOL_SIZE);
+        poolInt = Integer.parseInt(size);
+        this.executorService = Executors.newFixedThreadPool(poolInt);
+        initTaskQueues();
+    }
+
+    private void initTaskQueues(){
+        queues = new TaskQueue[poolInt];
+        for(int i=0; i<poolInt; i++){
+            queues[i] = new TaskQueue();
+            this.executorService.submit(queues[i]);
+        }
+    }
+
+    void submit(String corrID, Runnable task) throws InterruptedException {
+        TaskQueue queue = getTaskQueue(corrID);
+        queue.addTask(task);
+    }
+
+    /**
+     * ensures synchronous handling all responses and timeout belongs to same correlation ID
+     * @param corrID
+     * @return - @{@link TaskQueue}
+     */
+    private TaskQueue getTaskQueue(String corrID){
+        int index = Math.abs(corrID.hashCode()) % poolInt;
+        return queues[index];
+    }
+
+    /**
+     * goes over queues for stopping threads
+     * @throws InterruptedException
+     */
+    void stopQueueManager() throws InterruptedException {
+        for(int i=0; i<poolInt; i++){
+            queues[i].stopQueue();
+            queues[i].addTask(new Runnable() {
+                @Override
+                public void run() {
+                    /**
+                     * wake up the queue for stopping thread
+                     */
+                }
+            });
+        }
+        List<Runnable> listTask = executorService.shutdownNow();
+        if (!executorService.awaitTermination(6, TimeUnit.SECONDS))
+            System.err.println("Pool did not terminate");
+        LOG.info("the amount of tasks that never commenced execution " + listTask.size());
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java
new file mode 100644
index 0000000..10f664c
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/core/TimerServiceImpl.java
@@ -0,0 +1,82 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+class TimerServiceImpl implements ITimerService {
+
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(TimerServiceImpl.class);
+    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+    private final ConcurrentHashMap<String, Future> timeOutEvents = new ConcurrentHashMap<>();
+    private final long responseTimeout;
+
+    TimerServiceImpl(long responseTimeout) {
+        this.responseTimeout = responseTimeout;
+    }
+
+    @Override
+    public synchronized void cancel(String correlationID) {
+        Future timeOutEvent = timeOutEvents.remove(correlationID);
+        if (timeOutEvent != null){
+            timeOutEvent.cancel(true);
+        }
+    }
+
+    @Override
+    public synchronized void add(String correlationID, ITimeoutHandler handler) {
+        Future timeOutEvent = scheduler.schedule(new HandleTimeout(correlationID, handler), responseTimeout, TimeUnit.MILLISECONDS);
+        timeOutEvents.put(correlationID, timeOutEvent);
+    }
+
+    @Override
+    public void shutdown() {
+        List<Runnable> listTask = scheduler.shutdownNow();
+        LOG.info("the amount of tasks that never commenced execution " + listTask.size());
+    }
+
+    private class HandleTimeout implements Runnable {
+
+        String correlationID;
+        ITimeoutHandler handler;
+
+        HandleTimeout(String correlationID, ITimeoutHandler handler) {
+            this.correlationID = correlationID;
+            this.handler = handler;
+        }
+
+        @Override
+        public void run(){
+            System.out.println("Timeout event of request " + correlationID);
+            handler.onTimeout();
+            timeOutEvents.remove(correlationID);
+        }
+    }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java
new file mode 100644
index 0000000..1415514
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriter.java
@@ -0,0 +1,77 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+class APPCMessageReaderWriter implements MessageReader, MessageWriter {
+
+    private final ObjectMapper mapper;
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(APPCMessageReaderWriter.class);
+
+    APPCMessageReaderWriter() {
+        mapper = new ObjectMapper();
+    }
+
+    public String read(String payload, MessageContext context) throws ProtocolException {
+        try {
+            ProtocolMessage protocolMessage = mapper.readValue(payload, ProtocolMessage.class);
+            context.setType(protocolMessage.getType());
+            context.setRpc(protocolMessage.getRpcName());
+            context.setCorrelationID(protocolMessage.getCorrelationID());
+            context.setPartiton(protocolMessage.getPartition());
+            String body = protocolMessage.getBody().toString();
+            LOG.debug("Received body : <" + body + ">");
+            return body;
+        } catch (IOException e) {
+            throw new ProtocolException(e);
+        }
+
+    }
+
+    public String write(String payload, MessageContext context) throws ProtocolException {
+        try {
+            ProtocolMessage protocolMessage = new ProtocolMessage();
+            protocolMessage.setVersion("2.0");
+            protocolMessage.setType(context.getType());
+            protocolMessage.setRpcName(context.getRpc());
+            protocolMessage.setCorrelationID(context.getCorrelationID());
+            protocolMessage.setPartition(context.getPartiton());
+            JsonNode body = mapper.readTree(payload);
+            protocolMessage.setBody(body);
+            String message = mapper.writeValueAsString(protocolMessage);
+            return message;
+        } catch (IOException e) {
+            throw new ProtocolException(e);
+        }
+    }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java
new file mode 100644
index 0000000..6d42d7c
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocol.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+
+public interface AsyncProtocol extends Protocol {
+
+    /**
+     * sends a string message to underlying message bus/java API
+     * @param payload - meesage body
+     * @param context - message headers
+     * @throws ProtocolException
+     */
+    void sendRequest(String payload, MessageContext context) throws ProtocolException;
+
+    void shutdown();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java
new file mode 100644
index 0000000..a20c2a0
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/AsyncProtocolImpl.java
@@ -0,0 +1,157 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+class AsyncProtocolImpl implements AsyncProtocol {
+
+    /**
+     * message bus listener thread handler
+     */
+    private Future listenerHandler;
+    /**
+     * called when messages are fetched - called for a single message
+     */
+    private RetrieveMessageCallback callback;
+    /**
+     * message bus client used to send/fetch
+     */
+    private MessagingService messageService;
+    /**
+     * Message reader used to extract body and context from reponse message
+     */
+    private MessageReader messageReader;
+    /**
+     * Message writer used to construct meesage from body and context
+     */
+    private MessageWriter messageWriter;
+
+    /**
+     * shutdown indicator
+     */
+    private boolean isShutdown = false;
+
+    /**
+     * executor service for listener usage
+     */
+    private ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
+
+
+    AsyncProtocolImpl() {
+
+        messageService = new UEBMessagingService();
+        messageReader = new APPCMessageReaderWriter();
+        messageWriter = (MessageWriter) messageReader;
+    }
+
+    public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
+
+        if (callback == null) {
+            throw new ProtocolException("Callback param should not be null!");
+        }
+        this.callback = callback;
+
+        try {
+            messageService.init(props);
+            //get message bus listener thread
+            //start the thread after initializing services
+            listenerHandler = executorService.submit(new Listener());
+        } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
+            throw new ProtocolException(e);
+        }
+    }
+
+    public void sendRequest(String payload, MessageContext context) throws ProtocolException {
+
+        //get message to be sent to appc from payload and context
+        String message = messageWriter.write(payload, context);
+        try {
+            messageService.send(context.getPartiton(), message);
+            LOG.debug("Successfully send message: " + message);
+        } catch (IOException e) {
+            throw new ProtocolException(e);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        isShutdown = true;
+        messageService.close();
+        LOG.warn("The protocol layer in shutdown stage.");
+        executorService.shutdownNow();
+    }
+
+    public class Listener implements Runnable {
+
+
+        public void run() {
+
+            while (!isShutdown) {
+                List<String> messages = new ArrayList<>();
+                try {
+                    messages = messageService.fetch();
+                    LOG.debug("Successfully fetched " + messages.size() + " messages");
+                } catch (IOException e) {
+                    LOG.error("Fetching " + messages.size() + " messages failed");
+                }
+                for (String message : messages) {
+
+                    MessageContext context = new MessageContext();
+                    String payload = null;
+
+                    try {
+                        //get payload and context from message to be sent to core layer
+                        payload = messageReader.read(message, context);
+                        LOG.debug("Got body: " + payload);
+                        //call core layer response handler
+                        if(!isShutdown) {
+                            callback.onResponse(payload, context);
+                        }else{
+                            LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
+                                    context.getCorrelationID() + "> response ", message);
+                        }
+                    } catch (ProtocolException e) {
+                        LOG.error("Failed to read message from UEB. message is: " + message);
+                    }
+                }
+            }
+        }
+    }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java
new file mode 100644
index 0000000..ec9606c
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Consumer.java
@@ -0,0 +1,56 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.io.IOException;
+import java.util.List;
+
+interface Consumer {
+
+        /**
+         * Gets a batch of messages from the topic. Defaults to 1000 messages with 15s wait for messages if empty.
+         *
+         * @return A list of strings representing the messages pulled from the topic.
+         * @throws IOException
+         */
+        List<String> fetch() throws IOException;
+
+        /**
+         * Gets a batch of messages from the topic.
+         *
+         * @param limit The amount of messages to fetch
+         * @return A list of strings representing the messages pulled from the topic.
+         * @throws IOException
+         */
+        List<String> fetch(int limit) throws IOException;
+
+        /**
+        * Send dummy fetch request to register client to be able to fetch messages
+        * @throws IOException
+        */
+        void registerForRead() throws IOException;
+
+        void close();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java
new file mode 100644
index 0000000..99d884e
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ConsumerImpl.java
@@ -0,0 +1,125 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+class ConsumerImpl implements Consumer {
+
+    private static final int DEFAULT_LIMIT = 1000;
+
+    private Collection<String> hosts;
+    private String topic;
+    private String group;
+    private String groupId;
+    private int timeout;
+
+    private String authKey;
+    private String authSecret;
+
+    private CambriaConsumer consumer = null;
+
+    /**
+     * constructor
+     * @param urls
+     * @param topicName
+     * @param consumerName
+     * @param consumerId
+     * @param timeout
+     */
+    public ConsumerImpl(Collection<String> urls, String topicName, String consumerName, String consumerId, Integer timeout, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+        this.hosts = urls;
+        this.topic = topicName;
+        this.group = consumerName;
+        this.groupId = consumerId;
+        this.authKey = apiKey;
+        this.authSecret = apiSecret;
+        this.timeout = timeout;
+        consumer = getConsumer();
+    }
+
+
+    public List<String> fetch() throws IOException {
+
+        return fetch(DEFAULT_LIMIT);
+    }
+
+    public List<String> fetch(int limit) throws IOException {
+
+        List<String> out = new ArrayList<String>();
+        try {
+            for(String msg : consumer.fetch(timeout,limit)){
+                out.add(msg);
+            }
+        } catch (IOException e) {
+            throw e;
+        }
+        return out;
+    }
+
+    public void registerForRead() throws IOException {
+
+        int waitForRegisteration = 1; //return from fetch after 1ms, no need to read any messages
+        consumer.fetch(waitForRegisteration, 1);
+    }
+
+    /**
+     * init cambria consumer
+     * @return CambriaConsumer
+     */
+    private CambriaConsumer getConsumer() throws MalformedURLException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+
+        ConsumerBuilder builder = new ConsumerBuilder();
+
+        builder.usingHosts(hosts).onTopic(topic).knownAs(group, groupId);
+        builder.withSocketTimeout(timeout + 5000).waitAtServer(timeout);
+        builder.receivingAtMost(DEFAULT_LIMIT);
+
+        // Add credentials if provided
+        if (authKey != null && authSecret != null) {
+
+            Field apiKeyField = ConsumerBuilder.class.getDeclaredField("fApiKey");
+            apiKeyField.setAccessible(true);
+            apiKeyField.set(builder, "");
+            builder.authenticatedBy(authKey, authSecret);
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java
new file mode 100644
index 0000000..682220a
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageReader.java
@@ -0,0 +1,39 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+
+public interface MessageReader {
+
+    /**
+     * reads payload, fills the context out of payload headers, and returns the body of the payload
+     * @param payload incoming message
+     * @param context context to fill
+     * @return body of the payload
+     * @throws ProtocolException
+     */
+    String read(String payload, MessageContext context) throws ProtocolException;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java
new file mode 100644
index 0000000..a19fd3f
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessageWriter.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import com.fasterxml.jackson.databind.JsonNode;
+
+public interface MessageWriter {
+
+    /**
+     * builds a message out of context and payload
+     * @param payload body of the message
+     * @param context headers of the message
+     * @return the message to write/send
+     * @throws ProtocolException
+     */
+    String write(String payload, MessageContext context) throws ProtocolException;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java
new file mode 100644
index 0000000..791e122
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/MessagingService.java
@@ -0,0 +1,61 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import java.util.Properties;
+
+interface MessagingService {
+
+    /**
+     * initialize consumer/publisher
+     * @param props
+     */
+    void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException;
+
+    /**
+     * sends a string as is
+     * @param partition
+     * @param body
+     */
+    void send(String partition, String body) throws IOException;
+
+    /**
+     * retrieve messages from bus - timeout extracted from props or see impl
+     * @return
+     */
+    List<String> fetch() throws IOException;
+
+    /**
+     * retrieve messages from bus - timeout extracted from props or see impl
+     * @param limit
+     * @return
+     */
+    List<String> fetch(int limit) throws IOException;
+
+    void close();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java
new file mode 100644
index 0000000..2f2490a
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Producer.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.io.IOException;
+
+interface Producer {
+
+    /**
+     * send a message to a partition via ueb
+     * @param data
+     */
+    void post(String Partition, String data) throws IOException;
+
+    void close();
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java
new file mode 100644
index 0000000..1455fd9
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProducerImpl.java
@@ -0,0 +1,82 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.Collection;
+
+class ProducerImpl implements Producer {
+
+    private Collection<String> hosts;
+    private String topic;
+    private CambriaBatchingPublisher producer;
+
+    private String authKey;
+    private String authSecret;
+
+    public ProducerImpl(Collection<String> urls, String topicName, String apiKey, String apiSecret) throws MalformedURLException, GeneralSecurityException {
+
+        topic = topicName;
+        hosts = urls;
+        authKey = apiKey;
+        authSecret = apiSecret;
+        producer = getProducer();
+    }
+
+    public void post(String partition, String data) throws IOException {
+
+        producer.send(partition, data);
+    }
+
+    /**
+     * get cambria producer
+     * @return
+     */
+    private CambriaBatchingPublisher getProducer() throws MalformedURLException, GeneralSecurityException {
+
+        PublisherBuilder builder = new PublisherBuilder().usingHosts(hosts);
+
+        // Add credentials if provided
+        if (authKey != null && authSecret != null) {
+            builder.authenticatedBy(authKey, authSecret);
+        }
+
+        CambriaBatchingPublisher client = null;
+
+        client = builder.onTopic(topic).build();
+
+        return client;
+    }
+
+    @Override
+    public void close() {
+        producer.close();
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java
new file mode 100644
index 0000000..19e7d60
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/Protocol.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.util.Properties;
+
+public interface Protocol {
+
+    /**
+     * init protocol properties and callback
+     * @param props
+     * @param callback
+     * @throws ProtocolException
+     */
+    void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java
new file mode 100644
index 0000000..87e1318
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolException.java
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+public class ProtocolException extends Exception {
+
+    public ProtocolException() {
+        super();
+    }
+
+    public ProtocolException(String message) {
+        super(message);
+    }
+
+    public ProtocolException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ProtocolException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java
new file mode 100644
index 0000000..f9364e1
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolFactory.java
@@ -0,0 +1,79 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProtocolFactory {
+
+    private static ProtocolFactory instance;
+    private Map<ProtocolType,Protocol> protocols;
+
+    /**
+     * Singleton factory
+     */
+    private ProtocolFactory(){
+
+        protocols = new HashMap<ProtocolType, Protocol>();
+    }
+
+    /**
+     * get factory instance
+     * @return factory instance
+     */
+    public static synchronized ProtocolFactory getInstance(){
+
+        if (instance == null) {
+            instance = new ProtocolFactory();
+        }
+        return instance;
+    }
+
+    /**
+     * returns instantiated protocol object
+     * @param type of protocol object
+     * @return protocol object
+     */
+    public Protocol getProtocolObject(ProtocolType type) throws ProtocolException {
+
+        Protocol protocol = protocols.get(type);
+        synchronized (this) {
+            if (protocol == null) {
+                switch (type) {
+                    case SYNC:
+                        throw new ProtocolException("Protocol SYNC is not implemented");
+                    case ASYNC:
+                        protocol = new AsyncProtocolImpl();
+                        protocols.put(type, protocol);
+                        break;
+                    default:
+                        throw new ProtocolException("Protocol type not found");
+                }
+            }
+        }
+        return protocol;
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java
new file mode 100644
index 0000000..161a590
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolMessage.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+
+class ProtocolMessage {
+
+    private String version;
+    private String type;
+    private String rpcName;
+    private String correlationID; // correlation-id
+    private String partition; // cambria.partition
+    private JsonNode body;
+
+    @JsonProperty
+    String getVersion() {
+        return version;
+    }
+
+    @JsonProperty
+    void setVersion(String version) {
+        this.version = version;
+    }
+
+    @JsonProperty
+    String getType() {
+        return type;
+    }
+
+    @JsonProperty
+     void setType(String type) {
+        this.type = type;
+    }
+
+    @JsonProperty("rpc-name")
+     String getRpcName() {
+        return rpcName;
+    }
+
+    @JsonProperty("rpc-name")
+     void setRpcName(String rpcName) {
+        this.rpcName = rpcName;
+    }
+
+    @JsonProperty("correlation-id")
+     String getCorrelationID() {
+        return correlationID;
+    }
+
+    @JsonProperty("correlation-id")
+     void setCorrelationID(String correlationID) {
+        this.correlationID = correlationID;
+    }
+
+    @JsonProperty("cambria.partition")
+     String getPartition() {
+        return partition;
+    }
+
+    @JsonProperty("cambria.partition")
+     void setPartition(String partition) {
+        this.partition = partition;
+    }
+
+    @JsonProperty
+    JsonNode getBody() {
+        return body;
+    }
+
+    @JsonProperty
+     void setBody(JsonNode body) {
+        this.body = body;
+    }
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java
new file mode 100644
index 0000000..133fbf8
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/ProtocolType.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+public enum ProtocolType {
+
+    SYNC, ASYNC;
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java
new file mode 100644
index 0000000..4610555
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/RetrieveMessageCallback.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+
+public interface RetrieveMessageCallback {
+
+    /**
+     * called when response received
+     * @param payload
+     * @param context
+     */
+    void onResponse(String payload, MessageContext context);
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java
new file mode 100644
index 0000000..22563f6
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBMessagingService.java
@@ -0,0 +1,102 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.*;
+
+class UEBMessagingService implements MessagingService {
+
+    private Consumer consumer;
+    private Producer producer;
+
+    private final String DEFAULT_READ_TIMEOUT_MS = "60000";
+    private final String DEFAULT_READ_LIMIT = "1000";
+
+    private int readLimit;
+
+    private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class);
+
+    @SuppressWarnings("Since15")
+    public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+
+        if (props != null) {
+            String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ);
+            String writeTopic = props.getProperty(UEBPropertiesKeys.TOPIC_WRITE);
+            String apiKey = props.getProperty(UEBPropertiesKeys.AUTH_USER);
+            String apiSecret = props.getProperty(UEBPropertiesKeys.AUTH_SECRET);
+            String readTimeoutString = props.getProperty(UEBPropertiesKeys.TOPIC_READ_TIMEOUT, DEFAULT_READ_TIMEOUT_MS);
+            Integer readTimeout = Integer.parseInt(readTimeoutString);
+            String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT);
+            readLimit = Integer.parseInt(readLimitString);
+            //get hosts pool
+            Collection<String> pool = new HashSet<String>();
+            String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS);
+            if (hostNames != null && !hostNames.isEmpty()) {
+                for (String name : hostNames.split(",")) {
+                    pool.add(name);
+                }
+            }
+
+            //generate consumer id and group - same value for both
+            String consumerName = UUID.randomUUID().toString();
+            String consumerID = consumerName;
+
+            //create consumer and producer
+            consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret);
+            producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret);
+
+            //initial consumer registration
+            try {
+                consumer.registerForRead();
+            }catch(Exception e){
+                LOG.error("Message consumer failed to register client "+consumerID);
+            }
+        }
+    }
+
+    public void send(String partition, String body) throws IOException {
+        producer.post(partition, body);
+    }
+
+    public List<String> fetch() throws IOException {
+        return consumer.fetch(readLimit);
+    }
+
+    public List<String> fetch(int limit) throws IOException {
+        return consumer.fetch(limit);
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+        producer.close();
+    }
+
+}
diff --git a/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java
new file mode 100644
index 0000000..8bbcaf4
--- /dev/null
+++ b/appc-client/client-lib/src/main/java/org/openecomp/appc/client/impl/protocol/UEBPropertiesKeys.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+class UEBPropertiesKeys {
+
+    static final String TOPIC_READ = "topic.read";
+    static final String TOPIC_READ_TIMEOUT = "topic.read.timeout";
+    static final String READ_LIMIT = "topic.read.limit";
+    static final String TOPIC_WRITE = "topic.write";
+    static final String AUTH_USER = "client.key";
+    static final String AUTH_SECRET = "client.secret";
+    static final String HOSTS = "poolMembers";
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java
new file mode 100644
index 0000000..d1720f0
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/ResponseManagerTest.java
@@ -0,0 +1,163 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import org.openecomp.appc.client.impl.core.AsyncRequestResponseHandler;
+import org.openecomp.appc.client.impl.core.CoreException;
+import org.openecomp.appc.client.impl.core.CoreManager;
+import org.openecomp.appc.client.impl.core.ICoreAsyncResponseHandler;
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+import org.openecomp.appc.client.impl.protocol.RetrieveMessageCallback;
+import org.junit.Before;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.mockito.Mockito.mock;
+
+public class ResponseManagerTest {
+
+    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
+    ICoreAsyncResponseHandler listener1 = new ListenerImpl();
+    ICoreAsyncResponseHandler listener2 = new SleeepListenerImpl();
+    ICoreAsyncResponseHandler listener3 = new ListenerImpl();
+    CoreManager coreManager = null;
+
+    public void initialize() throws CoreException {
+        Properties prop = new Properties();
+        prop.setProperty("client.pool.size", "10");
+        prop.setProperty("client.response.timeout", "7000");
+        coreManager = new ResponseManagerTest.CoreManagerTest(prop);
+    }
+
+    void asyncRequest(String request, ICoreAsyncResponseHandler businessCallback, String correlationId, String rpcName) throws CoreException {
+        AsyncRequestResponseHandler requestResponseHandler = new AsyncRequestResponseHandler(correlationId, businessCallback, coreManager);
+        requestResponseHandler.sendRequest(request, correlationId, rpcName);
+    }
+
+    public void simpleResponseTest() throws Exception {
+        System.out.println("simpleResponseTest");
+        asyncRequest("request 1", listener1,"vasia1", "test");
+        MessageContext msgCtx = new MessageContext();
+        msgCtx.setCorrelationID("vasia1");
+        msgCtx.setType("response");
+        coreManager.getProtocolCallback().onResponse("vasia1 response",msgCtx);
+        coreManager.getProtocolCallback().onResponse("vasia2 response",msgCtx);
+        Thread.sleep(10);
+    }
+
+    public void twoResponseTest() throws Exception {
+        System.out.println("twoResponseTest");
+        asyncRequest("twoResponseTest request 1", listener2,"vasia2", "test");
+        MessageContext msgCtx = new MessageContext();
+        msgCtx.setCorrelationID("vasia2");
+        msgCtx.setType("response");
+        coreManager.getProtocolCallback().onResponse("second of vasia2",msgCtx);
+        Thread.sleep(100);
+        asyncRequest("twoResponseTest request 2", listener1,"vasia1", "test");
+        MessageContext msgCtx2 = new MessageContext();
+        msgCtx2.setCorrelationID("vasia1");
+        msgCtx2.setType("response");
+        coreManager.getProtocolCallback().onResponse("first of vasia1",msgCtx2);
+        Thread.sleep(150);
+    }
+
+    public void threeResponseTest() throws Exception {
+        System.out.println("treeResponseTest");
+        asyncRequest("threeResponseTest request 2", listener1,"vasia4", "test");
+        asyncRequest("threeResponseTest request 1", listener2,"vasia2", "test");
+        MessageContext msgCtx2 = new MessageContext();
+        msgCtx2.setCorrelationID("vasia2");
+        msgCtx2.setType("response");
+        coreManager.getProtocolCallback().onResponse("second of vasia2",msgCtx2);
+
+        asyncRequest("threeResponseTest request 2", listener1,"vasia1", "test");
+        MessageContext msgCtx1 = new MessageContext();
+        msgCtx1.setCorrelationID("vasia1");
+        msgCtx1.setType("response");
+        coreManager.getProtocolCallback().onResponse("vasia1",msgCtx1);
+
+        asyncRequest("threeResponseTest request 3", listener3,"vasia3", "test");
+        MessageContext msgCtx3 = new MessageContext();
+        msgCtx3.setCorrelationID("vasia3");
+        msgCtx3.setType("response");
+        coreManager.getProtocolCallback().onResponse("three1",msgCtx3);
+
+        coreManager.getProtocolCallback().onResponse("three2", msgCtx3);
+
+        coreManager.getProtocolCallback().onResponse("first1", msgCtx1);
+        Thread.sleep(250);
+
+        coreManager.getProtocolCallback().onResponse("first2", msgCtx1);
+        Thread.sleep(10000);
+    }
+
+    private class ListenerImpl implements ICoreAsyncResponseHandler{
+
+        public boolean onResponse(String message, String type) {
+            System.out.println("callback " + message);
+            return message != null;
+        }
+
+        @Override
+        public void onException(Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private class SleeepListenerImpl implements ICoreAsyncResponseHandler{
+
+        public boolean onResponse(String message, String type) {
+            try {
+                Thread.sleep(150);
+                System.out.println("sleep callback " + message);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            return message != null;
+        }
+
+        @Override
+        public void onException(Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    class CoreManagerTest extends CoreManager{
+        CoreManagerTest(Properties properties) throws CoreException {
+            super(properties);
+            protocol = mock(AsyncProtocol.class);
+        }
+        protected void sendRequest2Protocol(String request, String corrId, String rpcName) throws CoreException {
+        }
+
+        protected void initProtocol(Properties properties, RetrieveMessageCallback protocolCallback) throws ProtocolException {
+
+        }
+    }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java
new file mode 100644
index 0000000..4b4bce5
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/core/SyncFlowTest.java
@@ -0,0 +1,151 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.core;
+
+import org.openecomp.appc.client.impl.core.CoreException;
+import org.openecomp.appc.client.impl.core.CoreManager;
+import org.openecomp.appc.client.impl.core.ICoreSyncResponseHandler;
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.core.SyncRequestResponseHandler;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+import org.openecomp.appc.client.impl.protocol.RetrieveMessageCallback;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Mockito.mock;
+
+public class SyncFlowTest {
+    CoreManager coreManager = null;
+
+    public void initialize() throws CoreException {
+        Properties prop = new Properties();
+        prop.setProperty("client.pool.size", "10");
+        prop.setProperty("client.response.timeout", "7000");
+        coreManager = new CoreManagerTest(prop);
+    }
+
+    <T> T syncRequest(String request, ICoreSyncResponseHandler businessCallback, String correlationId, String rpcName ) throws CoreException, TimeoutException {
+        SyncRequestResponseHandler requestResponseHandler = new SyncRequestResponseHandler(correlationId, businessCallback, coreManager);
+        requestResponseHandler.sendRequest(request, correlationId, rpcName);
+        T responseObject = (T) requestResponseHandler.getResponse();
+        return responseObject;
+    }
+
+    public void blockRequestTest(){
+        ICoreSyncResponseHandler handler = new ICoreSyncResponseHandlerImpl1();
+        try {
+            syncRequest("request 1", handler, "vasia1", "test");
+        }catch (Throwable e){
+            e.printStackTrace();
+            Assert.assertTrue(e != null);
+        }
+
+    }
+
+    public <T> void blockRequestSucceedTest() throws InterruptedException {
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final ICoreSyncResponseHandler handler = new ICoreSyncResponseHandlerImpl1();
+        try {
+            executorService.submit(new Runnable() {
+                public void run() {
+                    System.out.println("Send request");
+                    T response;
+                    try {
+                        response = syncRequest("request 1", handler, "vasia1", "test");
+                        System.out.println("=======" + response.toString());
+                    } catch (CoreException e) {
+                        e.printStackTrace();
+                    } catch (TimeoutException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }catch (Throwable e){
+            Assert.assertTrue((RuntimeException)e != null);
+        }
+        Thread.sleep(2000);
+        executorService.submit(new Runnable() {
+            public void run() {
+                MessageContext ctx = new MessageContext();
+                ctx.setCorrelationID("vasia1");
+                ctx.setType("response");
+                try {
+                    System.out.println("Send response 1");
+                    coreManager.getProtocolCallback().onResponse("response for request 1", ctx);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        Thread.sleep(2000);
+        executorService.submit(new Runnable() {
+            public void run() {
+                MessageContext ctx = new MessageContext();
+                ctx.setCorrelationID("vasia1");
+                ctx.setType("response");
+                try {
+                    System.out.println("Send response 2");
+                    coreManager.getProtocolCallback().onResponse("response for request 1 final", ctx);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        Thread.sleep(1000);
+
+    }
+
+    class ICoreSyncResponseHandlerImpl1 implements ICoreSyncResponseHandler{
+
+
+        public <T> T onResponse(String message, String type) {
+            System.out.println("Received message = " + message) ;
+            if(message.contains("final")){
+                return (T) new String(message);
+            }
+            return null;
+        }
+    }
+
+    class CoreManagerTest extends CoreManager{
+        CoreManagerTest(Properties properties) throws CoreException {
+            super(properties);
+            protocol = mock(AsyncProtocol.class);
+        }
+        protected void sendRequest2Protocol(String request, String corrId, String rpcName) throws CoreException {
+        }
+
+        protected void initProtocol(Properties properties, RetrieveMessageCallback protocolCallback) throws ProtocolException{
+
+        }
+    }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java
new file mode 100644
index 0000000..91f6157
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/APPCMessageReaderWriterTest.java
@@ -0,0 +1,104 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.protocol.APPCMessageReaderWriter;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+public class APPCMessageReaderWriterTest {
+
+    private APPCMessageReaderWriter messageReaderWriter;
+    private ObjectMapper mapper;
+
+    private static final String VERSION = "2.0";
+    private static final String TYPE = "typeTest";
+    private static final String CORRELATION_ID = "correlationIdTest";
+    private static final String PARTITION = "partitionTest";
+    private static final String RPC = "rpcTest";
+    private static final String PAYLOAD = "{\"key1\":\"val1\",\"key2\":\"val2\",\"key3\":{\"key3.1\":\"val3.1\"}}";
+
+    @Before
+    public void init() throws IOException {
+        mapper = new ObjectMapper();
+        messageReaderWriter = new APPCMessageReaderWriter();
+    }
+
+    @Test
+    public void writeTest() throws IOException, ProtocolException {
+        MessageContext context = new MessageContext();
+        context.setType(TYPE);
+        context.setCorrelationID(CORRELATION_ID);
+        context.setPartiton(PARTITION);
+        context.setRpc(RPC);
+        String payload = PAYLOAD;
+        String message = messageReaderWriter.write(payload, context);
+
+        JsonNode messageJson = mapper.readTree(message);
+        Assert.assertEquals(VERSION, messageJson.get("version").asText());
+        Assert.assertEquals(context.getType(), messageJson.get("type").asText());
+        Assert.assertEquals(context.getCorrelationID(), messageJson.get("correlation-id").asText());
+        Assert.assertEquals(context.getPartiton(), messageJson.get("cambria.partition").asText());
+        Assert.assertEquals(context.getRpc(), messageJson.get("rpc-name").asText());
+        Assert.assertEquals(payload, messageJson.get("body").toString());
+    }
+
+    @Test
+    public void readTest() throws IOException, ProtocolException {
+        ObjectNode node = mapper.createObjectNode();
+        node.put("version", VERSION);
+        node.put("type", TYPE);
+        node.put("correlation-id", CORRELATION_ID);
+        node.put("cambria.partition", PARTITION);
+        node.put("rpc-name", RPC);
+        JsonNode payload = mapper.valueToTree(PAYLOAD);
+        node.set("body", payload);
+        String message = node.toString();
+
+        MessageContext returnContext = new MessageContext();
+        String returnPayload = messageReaderWriter.read(message, returnContext);
+
+        Assert.assertEquals(TYPE, returnContext.getType());
+        Assert.assertEquals(CORRELATION_ID, returnContext.getCorrelationID());
+        Assert.assertEquals(PARTITION, returnContext.getPartiton());
+        Assert.assertEquals(RPC, returnContext.getRpc());
+        Assert.assertEquals(payload.toString(), returnPayload);
+    }
+
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java
new file mode 100644
index 0000000..046f5a8
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImpl.java
@@ -0,0 +1,91 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocolImpl;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+import org.openecomp.appc.client.impl.protocol.RetrieveMessageCallback;
+import org.openecomp.appc.client.impl.protocol.UEBPropertiesKeys;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestAsyncProtocolImpl {
+
+    private static AsyncProtocol protocol;
+    private static AtomicBoolean gotResponse;
+    private static Properties props;
+
+    private static class TestCallback implements RetrieveMessageCallback{
+
+        public void onResponse(String payload, MessageContext context) {
+            Assert.assertNotEquals(null, payload);
+            Assert.assertNotEquals(null, context);
+            protocol = null;
+            gotResponse.set(true);
+        }
+    }
+
+    @BeforeClass
+    public static void setUp() throws IOException, ProtocolException {
+
+        gotResponse = new AtomicBoolean(false);
+
+        props = new Properties();
+        String propFileName = "ueb.properties";
+
+        InputStream input = TestAsyncProtocolImpl.class.getClassLoader().getResourceAsStream(propFileName);
+
+        props.load(input);
+
+        protocol = new AsyncProtocolImpl();
+        protocol.init(props, new TestCallback());
+    }
+
+    public void testSendRequest() throws ProtocolException {
+
+        MessageContext context = new MessageContext();
+        context.setType("Test");
+
+        protocol.sendRequest("{\"Test\":\"\"}", context);
+
+        try {
+            Long timeToSleep = Long.parseLong((String)props.get(UEBPropertiesKeys.TOPIC_READ_TIMEOUT))*2;
+            Thread.sleep(timeToSleep);
+        } catch (InterruptedException e) {
+            Assert.assertFalse(e.getMessage(), false);
+        }
+        if (gotResponse.get() == false) {
+            Assert.assertFalse("Message was not read !", true);
+        }
+    }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java
new file mode 100644
index 0000000..9611a79
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplMissingProps.java
@@ -0,0 +1,75 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.openecomp.appc.client.impl.core.MessageContext;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocolImpl;
+import org.openecomp.appc.client.impl.protocol.RetrieveMessageCallback;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class TestAsyncProtocolImplMissingProps {
+
+    private static AsyncProtocol protocol;
+
+    private static class TestCallback implements RetrieveMessageCallback {
+
+        public void onResponse(String payload, MessageContext context) {
+            Assert.assertFalse("bad Callback !",false);
+        }
+    }
+
+    @Test
+    /**
+     * protocol should throw illegal argument exception due to null properties
+     */
+    public void testSetUpMissingProps() {
+
+        Properties props = new Properties();
+        String propFileName = "ueb.missing.properties";
+
+        InputStream input = TestAsyncProtocolImplMissingProps.class.getClassLoader().getResourceAsStream(propFileName);
+
+        try {
+            props.load(input);
+        } catch (IOException e) {
+            Assert.assertFalse(e.getMessage(),false);
+        }
+
+        protocol = new AsyncProtocolImpl();
+        try {
+            protocol.init(props, new TestCallback());
+        } catch (IllegalArgumentException e) {
+            Assert.assertTrue(true);
+        } catch (Exception e) {
+            Assert.assertFalse(e.getMessage(),false);
+        }
+    }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java
new file mode 100644
index 0000000..e537037
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestAsyncProtocolImplNullCallback.java
@@ -0,0 +1,60 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocol;
+import org.openecomp.appc.client.impl.protocol.AsyncProtocolImpl;
+import org.openecomp.appc.client.impl.protocol.ProtocolException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class TestAsyncProtocolImplNullCallback {
+
+    private static AsyncProtocol protocol;
+
+    public void testSetUpNoCallback() throws IOException {
+
+        Properties props = new Properties();
+        String propFileName = "ueb.properties";
+
+        InputStream input = TestAsyncProtocolImpl.class.getClassLoader().getResourceAsStream(propFileName);
+
+        props.load(input);
+
+        protocol = new AsyncProtocolImpl();
+
+        try {
+            protocol.init(props, null);
+        } catch (ProtocolException e) {
+            Assert.assertTrue(true);
+        } catch (Exception e){
+            Assert.assertFalse(e.getMessage(),false);
+        }
+    }
+}
diff --git a/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java
new file mode 100644
index 0000000..109065a
--- /dev/null
+++ b/appc-client/client-lib/src/test/java/org/openecomp/appc/client/impl/protocol/TestUEBMessagingService.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.client.impl.protocol;
+
+import org.junit.*;
+import org.junit.runners.MethodSorters;
+import org.openecomp.appc.client.impl.protocol.MessagingService;
+import org.openecomp.appc.client.impl.protocol.UEBMessagingService;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import java.util.Properties;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestUEBMessagingService {
+
+    private static MessagingService ueb;
+
+    public static void setUp() throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+
+        Properties props = new Properties();
+        String propFileName = "ueb.properties";
+
+        InputStream input = TestUEBMessagingService.class.getClassLoader().getResourceAsStream(propFileName);
+
+        props.load(input);
+
+        ueb = new UEBMessagingService();
+        ueb.init(props);
+    }
+
+    public void test1Send() throws IOException {
+    System.out.println("Here");
+
+        String message = "Test Message Service";
+        ueb.send(null,message);
+    }
+
+    public void test2Fetch() throws IOException {
+
+    System.out.println("Here2");
+        List<String> messages = ueb.fetch(1);
+        Assert.assertEquals(1,messages.size());
+    }
+
+}
diff --git a/appc-client/client-lib/src/test/resources/ueb.missing.properties b/appc-client/client-lib/src/test/resources/ueb.missing.properties
new file mode 100644
index 0000000..54ba582
--- /dev/null
+++ b/appc-client/client-lib/src/test/resources/ueb.missing.properties
@@ -0,0 +1,30 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+# ============LICENSE_END=========================================================
+###
+
+topic.read=APPC-UNIT-TEST
+topic.read.timeout=60000
+topic.write=APPC-UNIT-TEST
+client.name=APPC-CLIENT
+client.id=0
+
diff --git a/appc-client/client-lib/src/test/resources/ueb.properties b/appc-client/client-lib/src/test/resources/ueb.properties
new file mode 100644
index 0000000..026d5f4
--- /dev/null
+++ b/appc-client/client-lib/src/test/resources/ueb.properties
@@ -0,0 +1,29 @@
+###
+# ============LICENSE_START=======================================================
+# ONAP : APPC
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Copyright (C) 2017 Amdocs
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+# ============LICENSE_END=========================================================
+###
+
+poolMembers=10.147.29.67:3904
+topic.read=APPC-TEST1
+topic.read.timeout=2500
+topic.write=APPC-TEST1
+