blob: 6d0c60fbbd98656db9a0b027c542a15af72e693a [file] [log] [blame]
#!/bin/bash
# ================================================================================
# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
set -e
CDAP_HOST='localhost'
CDAP_PORT='11015'
TCA_NAMESPACE='cdap_tca_hi_lo'
TCA_APPNAME='dcae-tca'
TCA_ARTIFACT='dcae-analytics-cdap-tca'
TCA_FILE_PATH='/opt/tca'
TCA_JAR="$(ls -1r ${TCA_FILE_PATH}/${TCA_ARTIFACT}*.jar | head -1)"
TCA_ARTIFACT_VERSION=$(echo "$TCA_JAR" |rev |cut -f 2-4 -d '.' |rev)
TCA_APP_CONF="${TCA_FILE_PATH}/tca_app_config.json"
TCA_CONF="${TCA_FILE_PATH}/tca_config.json"
TCA_PREF="${TCA_FILE_PATH}/tca_app_preferences.json"
TCA_CONF_TEMP='/tmp/tca_config.json'
TCA_APP_CONF_TEMP='/tmp/tca_app_config.json'
TCA_PREF_TEMP='/tmp/tca_preferences.json'
TCA_PATH_APP="${CDAP_HOST}:${CDAP_PORT}/v3/namespaces/${TCA_NAMESPACE}/apps/${TCA_APPNAME}"
TCA_PATH_ARTIFACT="${CDAP_HOST}:${CDAP_PORT}/v3/namespaces/${TCA_NAMESPACE}/artifacts"
MR_WATCHDOG_PATH="${TCA_FILE_PATH}/mr-watchdog.sh"
WORKER_COUNT='0'
CONSUL_HOST=${CONSUL_HOST:-consul}
CONSUL_PORT=${CONSUL_PORT:-8500}
CONFIG_BINDING_SERVICE=${CONFIG_BINDING_SERVICE:-config_binding_service}
CBS_SERVICE_NAME=${CONFIG_BINDING_SERVICE}
#Changing to HOSTNAME parameter for consistency with k8s deploy
MY_NAME=${HOSTNAME:-tca}
echo "Generting preference file"
DMAAPSUBGROUP=${DMAAPSUBGROUP:-OpenDCAEc12}
DMAAPSUBID=${DMAAPSUBID:=c12}
sed -i 's/{{DMAAPHOST}}/'"${DMAAPHOST}"'/g' ${TCA_PREF}
sed -i 's/{{DMAAPPORT}}/'"${DMAAPPORT}"'/g' ${TCA_PREF}
sed -i 's/{{DMAAPPUBTOPIC}}/'"${DMAAPPUBTOPIC}"'/g' ${TCA_PREF}
sed -i 's/{{DMAAPSUBTOPIC}}/'"${DMAAPSUBTOPIC}"'/g' ${TCA_PREF}
sed -i 's/{{DMAAPSUBGROUP}}/'"${DMAAPSUBGROUP}"'/g' ${TCA_PREF}
sed -i 's/{{DMAAPSUBID}}/'"${DMAAPSUBID}"'/g' ${TCA_PREF}
sed -i 's/{{AAIHOST}}/'"${AAIHOST}"'/g' ${TCA_PREF}
sed -i 's/{{AAIPORT}}/'"${AAIPORT}"'/g' ${TCA_PREF}
if [ -z "$REDISHOSTPORT" ]; then
sed -i 's/{{REDISHOSTPORT}}/NONE/g' ${TCA_PREF}
sed -i 's/{{REDISCACHING}}/false/g' ${TCA_PREF}
else
sed -i 's/{{REDISHOSTPORT}}/'"${REDISHOSTPORT}"'/g' ${TCA_PREF}
sed -i 's/{{REDISCACHING}}/true/g' ${TCA_PREF}
fi
function tca_stop {
# stop programs
echo
echo "Stopping TCADMaaPMRPublisherWorker, TCADMaaPMRSubscriberWorker, and TCAVESCollectorFlow ..."
echo
curl -s -X POST "http://${TCA_PATH_APP}/workers/TCADMaaPMRPublisherWorker/stop"
curl -s -X POST "http://${TCA_PATH_APP}/workers/TCADMaaPMRSubscriberWorker/stop"
curl -s -X POST "http://${TCA_PATH_APP}/flows/TCAVESCollectorFlow/stop"
echo "done"
echo
}
function tca_load_artifact {
echo
echo "Loading artifact ${TCA_JAR} to http://${TCA_PATH_ARTIFACT}/${TCA_ARTIFACT}..."
curl -s -X POST --data-binary @"${TCA_JAR}" "http://${TCA_PATH_ARTIFACT}/${TCA_ARTIFACT}"
echo
}
function tca_load_conf {
echo
echo "Loading configuration ${TCA_APP_CONF} to http://${TCA_PATH_APP}"
curl -s -X PUT -d @${TCA_APP_CONF} http://${TCA_PATH_APP}
echo
# load preferences
echo
echo "Loading preferences ${TCA_PREF} to http://${TCA_PATH_APP}/preferences"
curl -s -X PUT -d @${TCA_PREF} http://${TCA_PATH_APP}/preferences
echo
}
function tca_delete {
echo
echo "Deleting application dcae-tca http://${TCA_PATH_APP}"
curl -s -X DELETE http://${TCA_PATH_APP}
echo
# delete artifact
echo
echo "Deleting artifact http://${TCA_PATH_ARTIFACT}/${TCA_ARTIFACT}/versions/${TCA_ARTIFACT_VERSION} ..."
curl -s -X DELETE "http://${TCA_PATH_ARTIFACT}/${TCA_ARTIFACT}/versions/${TCA_ARTIFACT_VERSION}"
echo
}
function tca_start {
echo
echo "Starting TCADMaaPMRPublisherWorker, TCADMaaPMRSubscriberWorker, and TCAVESCollectorFlow ..."
curl -s -X POST "http://${TCA_PATH_APP}/workers/TCADMaaPMRPublisherWorker/start"
curl -s -X POST "http://${TCA_PATH_APP}/workers/TCADMaaPMRSubscriberWorker/start"
curl -s -X POST "http://${TCA_PATH_APP}/flows/TCAVESCollectorFlow/start"
echo
}
function tca_status {
WORKER_COUNT='0'
echo
STATUS=$(curl -s "http://${TCA_PATH_APP}/workers/TCADMaaPMRPublisherWorker/status")
echo "TCADMaaPMRPublisherWorker status: $STATUS"
INC=$(echo "$STATUS" | jq . |grep RUNNING |wc -l)
WORKER_COUNT=$((WORKER_COUNT+INC))
STATUS=$(curl -s "http://${TCA_PATH_APP}/workers/TCADMaaPMRSubscriberWorker/status")
echo "TCADMaaPMRSubscriberWorker status: $STATUS"
INC=$(echo "$STATUS" | jq . |grep RUNNING |wc -l)
WORKER_COUNT=$((WORKER_COUNT+INC))
STATUS=$(curl -s "http://${TCA_PATH_APP}/flows/TCAVESCollectorFlow/status")
echo "TCAVESCollectorFlow status: $STATUS"
INC=$(echo "$STATUS" | jq . |grep RUNNING |wc -l)
WORKER_COUNT=$((WORKER_COUNT+INC))
echo
}
function tca_restart {
MR_HOST=$(jq .subscriberHostName ${TCA_PREF} |sed -e 's/\"//g')
MR_PORT=$(jq .subscriberHostPort ${TCA_PREF} |sed -e 's/\"//g')
MR_TOPIC=$(jq .subscriberTopicName ${TCA_PREF} |sed -e 's/\"//g')
echo "Verifying DMaaP topic: ${MR_TOPIC}@${MR_HOST}:${MR_PORT} (will block until topic ready)"
"${MR_WATCHDOG_PATH}" "${MR_HOST}" "${MR_PORT}" "${MR_TOPIC}"
tca_stop
tca_delete
tca_load_artifact
tca_load_conf
tca_start
sleep 5
tca_status
}
function tca_poll_policy {
URL0="${CBS_HOST}:${CBS_PORT}/service_component_all/${MY_NAME}"
echo "tca_poll_policy: Retrieving all-in-one config at ${URL0}"
HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" "$URL0")
HTTP_BODY=$(echo "$HTTP_RESPONSE" | sed -e 's/HTTPSTATUS\:.*//g')
HTTP_STATUS=$(echo "$HTTP_RESPONSE" | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
if [ "$HTTP_STATUS" != "200" ]; then
echo "tca_poll_policy: Retrieving all-in-one config failed with status $HTTP_STATUS"
URL1="${CBS_HOST}:${CBS_PORT}/service_component/${MY_NAME}"
echo "tca_poll_policy: Retrieving app config only at ${URL1}"
HTTP_RESPONSE1=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" "$URL1")
HTTP_BODY1=$(echo "$HTTP_RESPONSE1" | sed -e 's/HTTPSTATUS\:.*//g')
HTTP_STATUS1=$(echo "$HTTP_RESPONSE1" | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
if [ "$HTTP_STATUS1" != "200" ]; then
echo "tca_poll_policy: Retrieving app config only failed with status $HTTP_STATUS1"
return
fi
URL2="$URL1:preferences"
echo "tca_poll_policy: Retrieving app preferences only at ${URL2}"
HTTP_RESPONSE2=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" "$URL2")
HTTP_BODY2=$(echo "$HTTP_RESPONSE2" | sed -e 's/HTTPSTATUS\:.*//g')
HTTP_STATUS2=$(echo "$HTTP_RESPONSE2" | tr -d '\n' | sed -e 's/.*HTTPSTATUS://')
if [ "$HTTP_STATUS2" != "200" ]; then
echo "tca_poll_policy: Retrieving app preferences only failed with status $HTTP_STATUS2"
return
fi
if [[ "$CONFIG" == "null" || "$PREF" == "null" ]]; then
echo "tca_poll_policy: either app config or app preferences being empty, config not applicable"
return
fi
echo "$HTTP_BODY1" | jq . --sort-keys > "${TCA_CONF_TEMP}"
echo "$HTTP_BODY2" | jq . --sort-keys > "${TCA_PREF_TEMP}"
else
CONFIG=$(echo "$HTTP_BODY" | jq .config.app_config)
PREF=$(echo "$HTTP_BODY" | jq .config.app_preferences)
POLICY=$(echo "$HTTP_BODY" | jq .policies.items[0].config.content.tca_policy)
if [[ "$CONFIG" == "null" || "$PREF" == "null" ]]; then
echo "tca_poll_policy: CONFIG received is parsed to be empty, trying to parse using R1 format"
CONFIG=$(echo "$HTTP_BODY" | jq .config)
NEWPREF=$(echo "$HTTP_BODY" | jq .preferences)
#echo "CONFIG is [$CONFIG]"
#echo "NEWPREF is [$NEWPREF]"
else
echo "tca_poll_policy: CONFIG is [${CONFIG}], PREF is [${PREF}], POLICY is [${POLICY}]"
## Check if policy content under tca_policy is returned null
## null indicates no active policy flow; hence use configuration loaded
## from blueprint
if [ "$POLICY" == "null" ]; then
# tca_policy through blueprint
NEWPREF=${PREF}
else
# tca_policy through active policy flow through PH
NEWPREF=$(echo "$PREF" | jq --arg tca_policy "$POLICY" '. + {$tca_policy}')
fi
NEWPREF=$(echo "$NEWPREF" | sed 's/\\n//g')
fi
if [[ "$CONFIG" == "null" || "$NEWPREF" == "null" ]]; then
echo "tca_poll_policy: either app config or app preferences being empty, config not applicable"
return
fi
echo "$CONFIG" | jq . --sort-keys > "${TCA_CONF_TEMP}"
echo "$NEWPREF" | jq . --sort-keys > "${TCA_PREF_TEMP}"
fi
if [ ! -e "${TCA_CONF_TEMP}" ] || [ "$(ls -sh ${TCA_CONF_TEMP} |cut -f1 -d' ' |sed -e 's/[^0-9]//g')" -lt "1" ]; then
echo "Fail to receive configuration"
return
fi
if [ ! -e "${TCA_PREF_TEMP}" ] || [ "$(ls -sh ${TCA_PREF_TEMP} |cut -f1 -d' ' |sed -e 's/[^0-9]//g')" -lt "1" ]; then
echo "Fail to receive preferences"
return
fi
CONF_CHANGED=""
# extract only the config section from APP CONF (which has both artifact and config sections)
jq .config --sort-keys ${TCA_APP_CONF} > ${TCA_CONF}
if ! diff ${TCA_CONF} ${TCA_CONF_TEMP} ; then
echo "TCA config changed"
# generating the new app conf using current app conf's artifact section and the new downloaded config
jq --argfile CONFVALUE ${TCA_CONF_TEMP} '.config = $CONFVALUE' <${TCA_APP_CONF} > ${TCA_APP_CONF_TEMP}
mv ${TCA_APP_CONF_TEMP} ${TCA_APP_CONF}
CONF_CHANGED=1
fi
PERF_CHANGED=""
# update the subscriber ConsumerID, if not already unique,
# so replicas appear as different consumers in the consumer group
HOSTID=$(head -1 /etc/hostname | rev |cut -f1-2 -d'-' |rev)
CONSUMERID=$(jq .subscriberConsumerId ${TCA_PREF_TEMP} |sed -e 's/\"//g')
if ! (echo "$CONSUMERID" |grep "$HOSTID"); then
CONSUMERID="${CONSUMERID}-${HOSTID}"
jq --arg CID "${CONSUMERID}" '.subscriberConsumerId = $CID' < "${TCA_PREF_TEMP}" > "${TCA_PREF_TEMP}2"
mv "${TCA_PREF_TEMP}2" "${TCA_PREF_TEMP}"
fi
if ! diff ${TCA_PREF} ${TCA_PREF_TEMP} ; then
echo "TCA preference updated"
mv ${TCA_PREF_TEMP} ${TCA_PREF}
PERF_CHANGED=1
fi
if [[ "$PERF_CHANGED" == "1" || "$CONF_CHANGED" == "1" ]]; then
echo "Newly received configuration/preference differ from the running instance's. reload confg"
tca_restart
else
echo "Newly received configuration/preference identical from the running instance's"
fi
}
export PATH=${PATH}:/opt/cdap/sdk/bin
echo "Starting TCA-CDAP in standalone mode"
# starting CDAP SDK in background
cdap sdk start
echo "CDAP Started, waiting CDAP ready on ${CDAP_HOST}:${CDAP_PORT} ..."
while ! nc -z ${CDAP_HOST} ${CDAP_PORT}; do
sleep 1 # wait for 1 second before check again
done
echo "Creating namespace cdap_tca_hi_lo ..."
curl -s -X PUT "http://${CDAP_HOST}:${CDAP_PORT}/v3/namespaces/cdap_tca_hi_lo"
# stop programs
tca_stop
# delete application
tca_delete
# load artifact
tca_load_artifact
tca_load_conf
# start programs
tca_start
# get status of programs
tca_status
echo "TCA-CDAP standalone mode initialization completed, with $WORKER_COUNT / 3 up"
#Changing to HOSTNAME parameter for consistency with k8s deploy
MY_NAME=${HOSTNAME:-tca}
unset CBS_HOST
unset CBS_PORT
echo "TCA environment: I am ${MY_NAME}, consul at ${CONSUL_HOST}:${CONSUL_PORT}, CBS service name ${CBS_SERVICE_NAME}"
while echo
do
echo "======================================================> $(date)"
tca_status
while [ "$WORKER_COUNT" != "3" ]; do
echo "Status checking: worker count is $WORKER_COUNT, needs a reset"
sleep 5
tca_restart
echo "TCA restarted"
done
if [[ -z "$CBS_HOST" || -z "$CBS_PORT" ]]; then
echo "Retrieving host and port for ${CBS_SERVICE_NAME} from ${CONSUL_HOST}:${CONSUL_PORT}"
sleep 2
CBS_HOST=$(curl -s "${CONSUL_HOST}:${CONSUL_PORT}/v1/catalog/service/${CBS_SERVICE_NAME}" |jq .[0].ServiceAddress |sed -e 's/\"//g')
CBS_PORT=$(curl -s "${CONSUL_HOST}:${CONSUL_PORT}/v1/catalog/service/${CBS_SERVICE_NAME}" |jq .[0].ServicePort |sed -e 's/\"//g')
echo "CBS discovered to be at ${CBS_HOST}:${CBS_PORT}"
fi
if [ ! -z "$CBS_HOST" ] && [ ! -z "$CBS_PORT" ]; then
tca_poll_policy
fi
sleep 30
done