TS pred logic
Issue-ID: RICAPP-19
Signed-off-by: Ron Shacham <rshacham@research.att.com>
Change-Id: Id64cd20bc14533f4c01ea0775dbfd53e4fca1ba0
diff --git a/Dockerfile b/Dockerfile
index 6201076..1f8f596 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -40,7 +40,7 @@
WORKDIR /playpen
# versions we snarf from package cloud
-ARG RMR_VER=4.0.2
+ARG RMR_VER=4.0.5
ARG SDL_VER=1.0.4
ARG XFCPP_VER=1.0.0
@@ -65,6 +65,16 @@
wget -nv --content-disposition ${PC_STG_URL}/sdl-dev_${SDL_VER}-1_amd64.deb/download.deb &&\
dpkg -i sdl-dev_${SDL_VER}-1_amd64.deb sdl_${SDL_VER}-1_amd64.deb
+RUN git clone https://github.com/Tencent/rapidjson && \
+ cd rapidjson && \
+ mkdir build && \
+ cd build && \
+ cmake -DCMAKE_INSTALL_PREFIX=/usr/local .. && \
+ make install && \
+ cd ${STAGE_DIR} && \
+ rm -rf rapidjson
+
+
#
# build and install the application(s)
diff --git a/container-tag.yaml b/container-tag.yaml
index 252b070..d0cf09e 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -1,3 +1,3 @@
# this is used by CI jobs to apply a tag when it builds the image
---
-tag: '1.0.1'
+tag: '1.0.2'
diff --git a/examples/Makefile b/examples/Makefile
deleted file mode 100644
index 08a350b..0000000
--- a/examples/Makefile
+++ /dev/null
@@ -1,33 +0,0 @@
-# vim: ts=4 sw=4 noet:
-
-#==================================================================================
-# Copyright (c) 2020 Nokia
-# Copyright (c) 2020 AT&T Intellectual Property.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#==================================================================================
-
-# simple makefile to build the examples. This assumes that the xapp framework
-# library has been installed or the LD_LIBRARY_PATH and C_INCLUDE_PATH environent
-# variables are set to reference the needed files.
-
-%.o:: %.cpp %.hpp
- g++ -g ${prereq%% *} -c
-
-% :: %.cpp
- g++ $< -g -o $@ -lricxfcpp -lrmr_si -lpthread -lm -lsdl
-
-all:: ts_xapp
-
-install::
- cp ts_xapp /usr/local/bin/
diff --git a/examples/ts_xapp.cpp b/examples/ts_xapp.cpp
deleted file mode 100644
index 50ed0ab..0000000
--- a/examples/ts_xapp.cpp
+++ /dev/null
@@ -1,346 +0,0 @@
-// vi: ts=4 sw=4 noet:
-/*
-==================================================================================
- Copyright (c) 2020 Nokia
- Copyright (c) 2020 AT&T Intellectual Property.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-==================================================================================
-*/
-
-/*
- Mnemonic: ts_xapp.cpp
- Abstract: Traffic Steering xApp;
- 1. Receives A1 Policy
- 2. Queries SDL to decide which UE to attempt Traffic Steering for
- 3. Requests prediction for UE throughput on current and neighbor cells
- 4. Receives prediction
- 5. Optionally exercises Traffic Steering action over E2
-
- Date: 22 April 2020
- Author: Ron Shacham
-
-*/
-
-#include <stdio.h>
-#include <string.h>
-#include <unistd.h>
-
-#include <iostream>
-#include <memory>
-
-#include <sdl/syncstorage.hpp>
-#include <set>
-#include <map>
-#include <vector>
-#include <string>
-
-#include "ricxfcpp/xapp.hpp"
-
-using Namespace = std::string;
-using Key = std::string;
-using Data = std::vector<uint8_t>;
-using DataMap = std::map<Key, Data>;
-using Keys = std::set<Key>;
-
-
-// ----------------------------------------------------------
-
-std::unique_ptr<Xapp> xfw;
-
-
-void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
-
- long now;
- long total_count;
-
- int sz;
- int i;
-
- int response_to = 0; // max timeout wating for a response
-
- int send_mtype = 0;
- int rmtype; // received message type
- int delay = 1000000; // mu-sec delay; default 1s
-
- std::unique_ptr<Message> msg;
- Msg_component send_payload; // special type of unique pointer to the payload
-
- fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
- fprintf(stderr, "payload is %s\n", payload.get());
-
- //fprintf( stderr, "callback 1 got a message type = %d len = %d\n", mtype, len );
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-
- mtype = 0;
-
- fprintf(stderr, "cb 1\n");
-
- msg = xfw->Alloc_msg( 2048 );
-
- sz = msg->Get_available_size(); // we'll reuse a message if we received one back; ensure it's big enough
- if( sz < 2048 ) {
- fprintf( stderr, "<SNDR> fail: message returned did not have enough size: %d [%d]\n", sz, i );
- exit( 1 );
- }
-
- fprintf(stderr, "cb 2");
-
- send_payload = msg->Get_payload(); // direct access to payload
- snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\" : [\"222\", \"333\", \"444\"]}" );
-
- fprintf(stderr, "cb 3");
-
- // payload updated in place, nothing to copy from, so payload parm is nil
- if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() )+1, NULL )) {
- fprintf( stderr, "<SNDR> send failed: %d\n", i );
- }
-
- fprintf(stderr, "cb 4");
-
- /*
- msg = xfw->Receive( response_to );
- if( msg != NULL ) {
- rmtype = msg->Get_mtype();
- send_payload = msg->Get_payload();
- fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
- }
- */
-
-}
-
-
-void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
-
- long now;
- long total_count;
-
- int sz;
- int i;
-
- int response_to = 0; // max timeout wating for a response
-
- int send_mtype = 0;
- int rmtype; // received message type
- int delay = 1000000; // mu-sec delay; default 1s
-
- std::unique_ptr<Message> msg;
- Msg_component send_payload; // special type of unique pointer to the payload
-
- fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
- fprintf(stderr, "payload is %s\n", payload.get());
-
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
- mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
-
- mtype = 0;
-
- fprintf(stderr, "cb 1\n");
-
-}
-
-
-
-extern int main( int argc, char** argv ) {
-
- std::unique_ptr<Message> msg;
- Msg_component payload; // special type of unique pointer to the payload
-
- int nthreads = 1;
-
- int response_to = 0; // max timeout wating for a response
-
- int delay = 1000000; // mu-sec delay; default 1s
-
- char* port = (char *) "4560";
-
- int ai;
-
- ai = 1;
- while( ai < argc ) { // very simple flag processing (no bounds/error checking)
- if( argv[ai][0] != '-' ) {
- break;
- }
-
- switch( argv[ai][1] ) { // we only support -x so -xy must be -x -y
- case 'd': // delay between messages (mu-sec)
- delay = atoi( argv[ai+1] );
- ai++;
- break;
-
- case 'p':
- port = argv[ai+1];
- ai++;
- break;
-
- case 't': // timeout in seconds; we need to convert to ms for rmr calls
- response_to = atoi( argv[ai+1] ) * 1000;
- ai++;
- break;
- }
- ai++;
- }
-
- fprintf( stderr, "<XAPP> response timeout set to: %d\n", response_to );
- fprintf( stderr, "<XAPP> listening on port: %s\n", port );
-
- xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
-
- fprintf(stderr, "code1\n");
-
- xfw->Add_msg_cb( 20010, policy_callback, NULL );
- xfw->Add_msg_cb( 30002, prediction_callback, NULL );
-
- fprintf(stderr, "code2\n");
-
- std::string sdl_namespace_u = "TS-UE-metrics";
- std::string sdl_namespace_c = "TS-cell-metrics";
-
- fprintf(stderr, "code5\n");
-
- std::unique_ptr<shareddatalayer::SyncStorage> sdl(shareddatalayer::SyncStorage::create());
-
- Namespace nsu(sdl_namespace_u);
- Namespace nsc(sdl_namespace_c);
-
- /*
-
- fprintf(stderr, "before sdl set\n");
-
- try{
- //connecting to the Redis and generating a random key for namespace "hwxapp"
- fprintf(stderr, "IN SDL Set Data");
- // std::string data_string = "{\"rsrp\" : -110}";
-
-
- std::string data_string = "{\"CellID\": \"310-680-200-555001\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 2000000, \"PDCPBytesUL\": 1200000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 50 }";
-
- DataMap dmap;
- // char key[4]="abc";
- char key[] = "310-680-200-555001";
- std::cout << "KEY: "<< key << std::endl;
- Key k = key;
- Data d;
- // uint8_t num = 101;
- d.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap.insert({k,d});
-
- sdl->set(nsc, dmap);
-
- data_string = "{ \"CellID\": \"310-680-200-555002\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }";
-
- Data d2;
- DataMap dmap2;
- char key2[] = "310-680-200-555002";
- std::cout << "KEY: "<< key2 << std::endl;
- Key k2 = key2;
- d2.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap2.insert({k2,d});
-
- sdl->set(nsc, dmap2);
-
-
-
- std::string data_string = "{ \"CellID\": \"310-680-200-555003\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }";
-
- Data d3;
- DataMap dmap3;
- char key3[] = "310-680-200-555003";
- std::cout << "KEY: "<< key3 << std::endl;
- Key k3 = key3;
- d3.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap3.insert({k3,d3});
-
- sdl->set(nsc, dmap3);
-
-
-
- data_string = "{ \"UEID\": 12345, \"ServingCellID\": \"310-680-200-555002\", \"MeasTimestampUEPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPDCPBytes\": 20,\"UEPDCPBytesDL\": 250000,\"UEPDCPBytesUL\": 100000, \"MeasTimestampUEPRBUsage\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPRBUsage\": 20, \"UEPRBUsageDL\": 10, \"UEPRBUsageUL\": 30, \"MeasTimestampRF\": \"2020-03-18 02:23:18.210\",\"MeasPeriodRF\": 40, \"ServingCellRF\": [-115,-16,-5], \"NeighborCellRF\": [ {\"CID\": \"310-680-200-555001\",\"Cell-RF\": [-90,-13,-2.5 ] }, {\"CID\": \"310-680-200-555003\", \"Cell-RF\": [-140,-17,-6 ] } ] }";
-
- Data d4;
- DataMap dmap4;
- char key4[] = "12345";
- std::cout << "KEY: "<< key << std::endl;
- d4.assign(data_string.begin(), data_string.end());
- Key k4 = key4;
- // d.push_back(num);
- dmap4.insert({k4,d4});
-
- sdl->set(nsu, dmap4);
-
-
- }
- catch(...){
- fprintf(stderr,"SDL Error in Set Data for Namespace");
- return false;
- }
-
- fprintf(stderr, "after sdl set\n");
-
- */
-
- fprintf(stderr, "before sdl get\n");
-
-
- std::string prefix2="310";
- Keys K = sdl->findKeys(nsc, prefix2); // just the prefix
- DataMap Dk = sdl->get(nsc, K);
-
- std::cout << "K contains " << K.size() << " elements.\n";
-
- fprintf(stderr, "before forloop\n");
-
- for(auto si=K.begin();si!=K.end();++si){
- std::vector<uint8_t> val_v = Dk[(*si)]; // 4 lines to unpack a string
- char val[val_v.size()+1]; // from Data
- int i;
- for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
- val[i]='\0';
- fprintf(stderr, "KEYS and Values %s = %s\n",(*si).c_str(), val);
- }
-
-
- std::string prefix3="12";
- Keys K2 = sdl->findKeys(nsu, prefix3); // just the prefix
- DataMap Dk2 = sdl->get(nsu, K2);
-
- std::cout << "K contains " << K2.size() << " elements.\n";
-
- fprintf(stderr, "before forloop\n");
-
- for(auto si=K2.begin();si!=K2.end();++si){
- std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
- char val[val_v.size()+1]; // from Data
- int i;
- for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
- val[i]='\0';
- fprintf(stderr, "KEYS and Values %s = %s\n",(*si).c_str(), val);
- }
-
-
- fprintf(stderr, "after sdl get\n");
-
- xfw->Run( nthreads );
-
- fprintf(stderr, "code3\n");
-
- msg = xfw->Alloc_msg( 2048 );
-
- fprintf(stderr, "code4\n");
-
-
-}
diff --git a/rmr-version.yaml b/rmr-version.yaml
index c03848e..ceaf505 100644
--- a/rmr-version.yaml
+++ b/rmr-version.yaml
@@ -1,3 +1,3 @@
# Communicate to CI which version of RMR to install in the build/vet environment
---
-version: 4.0.2
+version: 4.0.5
diff --git a/routes.txt b/routes.txt
index f0242cf..5837eb1 100755
--- a/routes.txt
+++ b/routes.txt
@@ -1,6 +1,4 @@
newrt|start
-mse|20010|20008|4560
rte|20011|service-ricplt-a1mediator-rmr:10000
-rte|TS_QUE_PREDICTION|4560
-rte|TS_UE_LIST|service-ricxapp-qpd:4560
+rte|30000|service-ricxapp-qpdriver.ricxapp.svc.cluster.local:4562
newrt|end
diff --git a/run_xapp.sh b/run_xapp.sh
deleted file mode 100755
index f65a4b7..0000000
--- a/run_xapp.sh
+++ /dev/null
@@ -1,21 +0,0 @@
-#! /bin/bash
-
-export RMR_SEED_RT="routes.txt"
-export RMR_RTG_SVC="9999"
-export XAPP_NAME="HELLOWORLD_XAPP"
-export HW_PORTS="4560"
-export MSG_MAX_BUFFER="2048"
-export THREADS="1"
-export VERBOSE="0"
-export CONFIG_FILE="config/config-file.json"
-export GNODEB="NYC123"
-export XAPP_ID="3489-er492k-92389"
-export A1_SCHEMA_FILE="schemas/hwxapp-policy.json"
-export VES_SCHEMA_FILE="schemas/hwxapp-ves.json"
-export VES_COLLECTOR_URL="127.0.0.1:6350"
-export VES_MEASUREMENT_INTERVAL="10"
-export LOG_LEVEL="MDCLOG_ERR"
-export OPERATING_MODE="CONTROL"
-
-
-
diff --git a/src/ts_xapp/ts_xapp.cpp b/src/ts_xapp/ts_xapp.cpp
index f6e2c37..4cb3ebc 100644
--- a/src/ts_xapp/ts_xapp.cpp
+++ b/src/ts_xapp/ts_xapp.cpp
@@ -36,6 +36,7 @@
#include <string.h>
#include <unistd.h>
+#include <thread>
#include <iostream>
#include <memory>
@@ -44,9 +45,19 @@
#include <map>
#include <vector>
#include <string>
+#include <unordered_map>
+
+#include <rapidjson/document.h>
+#include <rapidjson/writer.h>
+#include <rapidjson/stringbuffer.h>
+#include <rapidjson/schema.h>
+#include <rapidjson/reader.h>
+
#include "ricxfcpp/xapp.hpp"
+using namespace rapidjson;
+using namespace std;
using Namespace = std::string;
using Key = std::string;
using Data = std::vector<uint8_t>;
@@ -58,23 +69,195 @@
std::unique_ptr<Xapp> xfw;
+std::string sdl_namespace_u = "TS-UE-metrics";
+std::string sdl_namespace_c = "TS-cell-metrics";
+
+int rsrp_threshold = 0;
+
+std::unique_ptr<shareddatalayer::SyncStorage> sdl;
+
+Namespace nsu;
+Namespace nsc;
+
+struct UEData {
+ string serving_cell;
+ int serving_cell_rsrp;
+
+};
+
+struct PredictionHandler : public BaseReaderHandler<UTF8<>, PredictionHandler> {
+ unordered_map<string, string> cell_pred;
+ std::string ue_id;
+ bool ue_id_found = false;
+ string curr_key = "";
+ string curr_value = "";
+ bool Null() { cout << "Null()" << endl; return true; }
+ bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
+ bool Int(int i) { cout << "Int(" << i << ")" << endl; return true; }
+ bool Uint(unsigned u) { cout << "Uint(" << u << ")" << endl; return true; }
+ bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
+ bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
+ bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
+ bool String(const char* str, SizeType length, bool copy) {
+ cout << "String(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
+ if (curr_key.compare("") != 0) {
+ cout << "Found throughput\n";
+ curr_value = str;
+ cell_pred[curr_key] = curr_value;
+ curr_key = "";
+ curr_value = "";
+ }
+
+ return true;
+ }
+ bool StartObject() { cout << "StartObject()" << endl; return true; }
+ bool Key(const char* str, SizeType length, bool copy) {
+ cout << "Key(" << str << ", " << length << ", " << boolalpha << copy << ")" << endl;
+ if (!ue_id_found) {
+ cout << "Found UE ID\n";
+ ue_id = str;
+ ue_id_found = true;
+ } else {
+ curr_key = str;
+ }
+ return true;
+ }
+ bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
+ bool StartArray() { cout << "StartArray()" << endl; return true; }
+ bool EndArray(SizeType elementCount) { cout << "EndArray(" << elementCount << ")" << endl; return true; }
+};
+
+
+struct UEDataHandler : public BaseReaderHandler<UTF8<>, UEDataHandler> {
+ unordered_map<string, string> cell_pred;
+ std::string serving_cell_id;
+ int serving_cell_rsrp;
+ int serving_cell_rsrq;
+ int serving_cell_sinr;
+ bool in_serving_array = false;
+ int rf_meas_index = 0;
+
+ string curr_key = "";
+ string curr_value = "";
+ bool Null() { cout << "Null()" << endl; return true; }
+ bool Bool(bool b) { cout << "Bool(" << boolalpha << b << ")" << endl; return true; }
+ bool Int(int i) {
+ fprintf(stderr, "Int(%d)\n", i);
+ if (in_serving_array) {
+ fprintf(stderr, "we are in serving array\n");
+ switch(rf_meas_index) {
+ case 0:
+ serving_cell_rsrp = i;
+ break;
+ case 1:
+ serving_cell_rsrq = i;
+ break;
+ case 2:
+ serving_cell_sinr = i;
+ break;
+ }
+ rf_meas_index++;
+ }
+ return true;
+ }
+ bool Uint(unsigned u) {
+ fprintf(stderr, "Int(%d)\n", u); return true; }
+ bool Int64(int64_t i) { cout << "Int64(" << i << ")" << endl; return true; }
+ bool Uint64(uint64_t u) { cout << "Uint64(" << u << ")" << endl; return true; }
+ bool Double(double d) { cout << "Double(" << d << ")" << endl; return true; }
+ bool String(const char* str, SizeType length, bool copy) {
+ fprintf(stderr,"String(%s)\n", str);
+ if (curr_key.compare("ServingCellID") == 0) {
+ serving_cell_id = str;
+ }
+
+ return true;
+ }
+ bool StartObject() { cout << "StartObject()" << endl; return true; }
+ bool Key(const char* str, SizeType length, bool copy) {
+ fprintf(stderr,"Key(%s)\n", str);
+ curr_key = str;
+ return true;
+ }
+ bool EndObject(SizeType memberCount) { cout << "EndObject(" << memberCount << ")" << endl; return true; }
+ bool StartArray() {
+ fprintf(stderr,"StartArray()");
+ if (curr_key.compare("ServingCellRF") == 0) {
+ in_serving_array = true;
+ }
+
+ return true;
+ }
+ bool EndArray(SizeType elementCount) {
+ fprintf(stderr, "EndArray()\n");
+ if (curr_key.compare("servingCellRF") == 0) {
+ in_serving_array = false;
+ rf_meas_index = 0;
+ }
+
+ return true; }
+};
+
+
+unordered_map<string, UEData> get_sdl_ue_data() {
+
+ fprintf(stderr, "In get_sdl_ue_data()\n");
+
+ unordered_map<string, string> ue_data;
+
+ unordered_map<string, UEData> return_ue_data_map;
+
+ std::string prefix3="12";
+ Keys K2 = sdl->findKeys(nsu, prefix3);
+ DataMap Dk2 = sdl->get(nsu, K2);
+
+ string ue_json;
+ string ue_id;
+
+ for(auto si=K2.begin();si!=K2.end();++si){
+ std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
+ char val[val_v.size()+1]; // from Data
+ int i;
+ fprintf(stderr, "val size %d\n", val_v.size());
+ for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
+ val[i]='\0';
+ ue_id.assign((std::string)*si);
+
+ ue_json.assign(val);
+ ue_data[ue_id] = ue_json;
+ }
+
+ fprintf(stderr, "after sdl get of ue data\n");
+
+ fprintf(stderr, "From UE data map\n");
+
+ for (auto map_iter = ue_data.begin(); map_iter != ue_data.end(); map_iter++) {
+ UEDataHandler handler;
+ Reader reader;
+ StringStream ss(map_iter->second.c_str());
+ reader.Parse(ss,handler);
+
+ string ueID = map_iter->first;
+ string serving_cell_id = handler.serving_cell_id;
+ int serv_rsrp = handler.serving_cell_rsrp;
+
+ fprintf(stderr,"UE data for %s\n", ueID.c_str());
+ fprintf(stderr,"Serving cell %s\n", serving_cell_id.c_str());
+ fprintf(stderr,"RSRP for UE %d\n", serv_rsrp);
+
+ return_ue_data_map[ueID] = {serving_cell_id, serv_rsrp};
+
+ }
+
+ fprintf(stderr, "\n");
+ return return_ue_data_map;
+}
void policy_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
- long now;
- long total_count;
-
- int sz;
- int i;
-
int response_to = 0; // max timeout wating for a response
+ int rmtype; // received message type
- int send_mtype = 0;
- int rmtype; // received message type
- int delay = 1000000; // mu-sec delay; default 1s
-
- std::unique_ptr<Message> msg;
- Msg_component send_payload; // special type of unique pointer to the payload
fprintf( stderr, "Policy Callback got a message, type=%d , length=%d\n" , mtype, len);
fprintf(stderr, "payload is %s\n", payload.get());
@@ -83,8 +266,23 @@
mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK1\n" ); // validate that we can use the same buffer for 2 rts calls
mbuf.Send_response( 101, -1, 5, (unsigned char *) "OK2\n" );
- mtype = 0;
+ //Set the threshold value
+
+}
+
+void send_prediction_request(vector<string> ues_to_predict) {
+
+ std::unique_ptr<Message> msg;
+ Msg_component payload; // special type of unique pointer to the payload
+
+ int nthreads = 1;
+ int response_to = 0; // max timeout wating for a response
+ int mtype = 30000;
+ int sz;
+ int i;
+ Msg_component send_payload;
+
fprintf(stderr, "cb 1\n");
msg = xfw->Alloc_msg( 2048 );
@@ -96,18 +294,39 @@
}
fprintf(stderr, "cb 2");
+
+ string ues_list = "[";
+
+ for (int i = 0; i < ues_to_predict.size(); i++) {
+ if (i == ues_to_predict.size() - 1) {
+ ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"";
+ } else {
+ ues_list = ues_list + " \"" + ues_to_predict.at(i) + "\"" + ",";
+ }
+ }
+
+ string message_body = "{\"UEPredictionSet\": " + ues_list + "}";
+
+ const char *body = message_body.c_str();
+
+ // char *body = "{\"UEPredictionSet\": [\"12345\"]}";
send_payload = msg->Get_payload(); // direct access to payload
- snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\" : [\"222\", \"333\", \"444\"]}" );
+ // snprintf( (char *) send_payload.get(), 2048, '{"UEPredictionSet" : ["12345"]}', 1 );
+ // snprintf( (char *) send_payload.get(), 2048, body);
+ snprintf( (char *) send_payload.get(), 2048, "{\"UEPredictionSet\": [\"12345\"]}");
- fprintf(stderr, "cb 3");
+ fprintf(stderr, "message body %s\n", send_payload.get());
+
+ fprintf(stderr, "cb 3");
+ fprintf(stderr, "payload length %d\n", strlen( (char *) send_payload.get() ));
// payload updated in place, nothing to copy from, so payload parm is nil
- if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() )+1, NULL )) {
+ if ( ! msg->Send_msg( mtype, Message::NO_SUBID, strlen( (char *) send_payload.get() ), NULL )) {
fprintf( stderr, "<SNDR> send failed: %d\n", msg->Get_state() );
}
- fprintf(stderr, "cb 4");
+ fprintf(stderr, "cb 4");
/*
msg = xfw->Receive( response_to );
@@ -116,10 +335,9 @@
send_payload = msg->Get_payload();
fprintf( stderr, "got: mtype=%d payload=(%s)\n", rmtype, (char *) send_payload.get() );
}
- */
-
-}
+ */
+}
void prediction_callback( Message& mbuf, int mtype, int subid, int len, Msg_component payload, void* data ) {
@@ -135,9 +353,6 @@
int rmtype; // received message type
int delay = 1000000; // mu-sec delay; default 1s
- std::unique_ptr<Message> msg;
- Msg_component send_payload; // special type of unique pointer to the payload
-
fprintf( stderr, "Prediction Callback got a message, type=%d , length=%d\n" , mtype, len);
fprintf(stderr, "payload is %s\n", payload.get());
@@ -147,200 +362,142 @@
mtype = 0;
fprintf(stderr, "cb 1\n");
+
+ char *incoming_msg = "{\"12345\": {\"222\": \"20000\", \"333\" : \"50000\"} }";
+
+ PredictionHandler handler;
+ Reader reader;
+ StringStream ss(incoming_msg);
+ reader.Parse(ss,handler);
+
+ std::string pred_ue_id = handler.ue_id;
+
+ cout << "Prediction for " << pred_ue_id << endl;
+
+ unordered_map<string, string> throughput_map = handler.cell_pred;
+
+
+ cout << endl;
+
+ unordered_map<string, UEData> sdl_data = get_sdl_ue_data();
+
+ //Decision about CONTROL message
+ //(1) Identify UE Id in Prediction message
+ //(2) Get UEData struct for this UE Id
+ //(3) Identify the UE's service cell ID
+ //(4) Iterate through Prediction message.
+ // If one of the cells, have a higher throughput prediction than serving cell, log a CONTROL request
+
+ UEData pred_ue_data = sdl_data[pred_ue_id];
+ std::string serving_cell_id = pred_ue_data.serving_cell;
+
+ int serving_cell_throughput;
+ int highest_throughput;
+ std::string highest_throughput_cell_id;
+ std::string::size_type str_size;
+
+ cout << "Going through throughtput map:" << endl;
+
+ for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
+ cout << map_iter->first << " : " << map_iter->second << endl;
+ std::string curr_cellid = map_iter->first;
+ cout << "Cell ID is " << curr_cellid;
+ int curr_throughput = stoi(map_iter->second, &str_size);
+ cout << "Throughput is " << curr_throughput << endl;
+
+ if (curr_cellid.compare(serving_cell_id) == 0) {
+ serving_cell_throughput = curr_throughput;
+ highest_throughput = serving_cell_throughput;
+ }
+
+ }
+
+ //Iterating again to identify the highest throughput prediction
+
+ for (auto map_iter = throughput_map.begin(); map_iter != throughput_map.end(); map_iter++) {
+ cout << map_iter->first << " : " << map_iter->second << endl;
+ std::string curr_cellid = map_iter->first;
+ cout << "Cell ID is " << curr_cellid;
+ int curr_throughput = stoi(map_iter->second, &str_size);
+ cout << "Throughput is " << curr_throughput << endl;
+
+ if (curr_throughput > serving_cell_throughput) {
+ highest_throughput = curr_throughput;
+ highest_throughput_cell_id = curr_cellid;
+ }
+ }
+
+ if (highest_throughput > serving_cell_throughput) {
+ cout << "WE WOULD SEND A CONTROL REQUEST NOW" << endl;
+ cout << "UE ID: " << pred_ue_id << endl;
+ cout << "Source cell " << serving_cell_id << endl;
+ cout << "Target cell " << highest_throughput_cell_id << endl;
+ }
+
+}
+
+
+//This function runs a loop that continuously checks SDL for any UE
+
+void run_loop() {
+
+ fprintf(stderr, "in run_loop()\n");
+
+ unordered_map<string, UEData> uemap;
+
+ vector<string> prediction_ues;
+
+ while (1) {
+
+ fprintf(stderr, "in while loop\n");
+
+ uemap = get_sdl_ue_data();
+
+ for (auto map_iter = uemap.begin(); map_iter != uemap.end(); map_iter++) {
+ string ueid = map_iter->first;
+ UEData data = map_iter->second;
+ if (data.serving_cell_rsrp < rsrp_threshold) {
+ prediction_ues.push_back(ueid);
+ }
+ }
+
+ if (prediction_ues.size() > 0) {
+ send_prediction_request(prediction_ues);
+ }
+
+ sleep(20);
+ }
}
extern int main( int argc, char** argv ) {
- std::unique_ptr<Message> msg;
- Msg_component payload; // special type of unique pointer to the payload
-
int nthreads = 1;
- int response_to = 0; // max timeout wating for a response
-
- int delay = 1000000; // mu-sec delay; default 1s
-
char* port = (char *) "4560";
- int ai;
-
- ai = 1;
- while( ai < argc ) { // very simple flag processing (no bounds/error checking)
- if( argv[ai][0] != '-' ) {
- break;
- }
-
- switch( argv[ai][1] ) { // we only support -x so -xy must be -x -y
- case 'd': // delay between messages (mu-sec)
- delay = atoi( argv[ai+1] );
- ai++;
- break;
-
- case 'p':
- port = argv[ai+1];
- ai++;
- break;
-
- case 't': // timeout in seconds; we need to convert to ms for rmr calls
- response_to = atoi( argv[ai+1] ) * 1000;
- ai++;
- break;
- }
- ai++;
- }
-
- fprintf( stderr, "<XAPP> response timeout set to: %d\n", response_to );
- fprintf( stderr, "<XAPP> listening on port: %s\n", port );
-
- xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
+ sdl = shareddatalayer::SyncStorage::create();
+ nsu = Namespace(sdl_namespace_u);
+ nsc = Namespace(sdl_namespace_c);
+
+
+ fprintf( stderr, "<XAPP> listening on port: %s\n", port );
+ xfw = std::unique_ptr<Xapp>( new Xapp( port, true ) ); // new xAPP thing; wait for a route table
fprintf(stderr, "code1\n");
+
xfw->Add_msg_cb( 20010, policy_callback, NULL );
xfw->Add_msg_cb( 30002, prediction_callback, NULL );
fprintf(stderr, "code2\n");
-
- std::string sdl_namespace_u = "TS-UE-metrics";
- std::string sdl_namespace_c = "TS-cell-metrics";
-
- fprintf(stderr, "code5\n");
- std::unique_ptr<shareddatalayer::SyncStorage> sdl(shareddatalayer::SyncStorage::create());
+ std::thread loop_thread;
- Namespace nsu(sdl_namespace_u);
- Namespace nsc(sdl_namespace_c);
-
- /*
-
- fprintf(stderr, "before sdl set\n");
-
- try{
- //connecting to the Redis and generating a random key for namespace "hwxapp"
- fprintf(stderr, "IN SDL Set Data");
- // std::string data_string = "{\"rsrp\" : -110}";
-
-
- std::string data_string = "{\"CellID\": \"310-680-200-555001\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 2000000, \"PDCPBytesUL\": 1200000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 50 }";
-
- DataMap dmap;
- // char key[4]="abc";
- char key[] = "310-680-200-555001";
- std::cout << "KEY: "<< key << std::endl;
- Key k = key;
- Data d;
- // uint8_t num = 101;
- d.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap.insert({k,d});
-
- sdl->set(nsc, dmap);
-
- data_string = "{ \"CellID\": \"310-680-200-555002\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }";
-
- Data d2;
- DataMap dmap2;
- char key2[] = "310-680-200-555002";
- std::cout << "KEY: "<< key2 << std::endl;
- Key k2 = key2;
- d2.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap2.insert({k2,d});
-
- sdl->set(nsc, dmap2);
-
-
-
- std::string data_string = "{ \"CellID\": \"310-680-200-555003\", \"MeasTimestampPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodPDCPBytes\": 20, \"PDCPBytesDL\": 800000, \"PDCPBytesUL\": 400000, \"MeasTimestampAvailPRB\": \"2020-03-18 02:23:18.220\", \"MeasPeriodAvailPRB\": 20, \"AvailPRBDL\": 30, \"AvailPRBUL\": 45 }";
-
- Data d3;
- DataMap dmap3;
- char key3[] = "310-680-200-555003";
- std::cout << "KEY: "<< key3 << std::endl;
- Key k3 = key3;
- d3.assign(data_string.begin(), data_string.end());
- // d.push_back(num);
- dmap3.insert({k3,d3});
-
- sdl->set(nsc, dmap3);
-
-
-
- data_string = "{ \"UEID\": 12345, \"ServingCellID\": \"310-680-200-555002\", \"MeasTimestampUEPDCPBytes\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPDCPBytes\": 20,\"UEPDCPBytesDL\": 250000,\"UEPDCPBytesUL\": 100000, \"MeasTimestampUEPRBUsage\": \"2020-03-18 02:23:18.220\", \"MeasPeriodUEPRBUsage\": 20, \"UEPRBUsageDL\": 10, \"UEPRBUsageUL\": 30, \"MeasTimestampRF\": \"2020-03-18 02:23:18.210\",\"MeasPeriodRF\": 40, \"ServingCellRF\": [-115,-16,-5], \"NeighborCellRF\": [ {\"CID\": \"310-680-200-555001\",\"Cell-RF\": [-90,-13,-2.5 ] }, {\"CID\": \"310-680-200-555003\", \"Cell-RF\": [-140,-17,-6 ] } ] }";
-
- Data d4;
- DataMap dmap4;
- char key4[] = "12345";
- std::cout << "KEY: "<< key << std::endl;
- d4.assign(data_string.begin(), data_string.end());
- Key k4 = key4;
- // d.push_back(num);
- dmap4.insert({k4,d4});
-
- sdl->set(nsu, dmap4);
-
-
- }
- catch(...){
- fprintf(stderr,"SDL Error in Set Data for Namespace");
- return false;
- }
-
- fprintf(stderr, "after sdl set\n");
-
- */
-
- fprintf(stderr, "before sdl get\n");
-
-
- std::string prefix2="310";
- Keys K = sdl->findKeys(nsc, prefix2); // just the prefix
- DataMap Dk = sdl->get(nsc, K);
-
- std::cout << "K contains " << K.size() << " elements.\n";
-
- fprintf(stderr, "before forloop\n");
-
- for(auto si=K.begin();si!=K.end();++si){
- std::vector<uint8_t> val_v = Dk[(*si)]; // 4 lines to unpack a string
- char val[val_v.size()+1]; // from Data
- int i;
- for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
- val[i]='\0';
- fprintf(stderr, "KEYS and Values %s = %s\n",(*si).c_str(), val);
- }
-
-
- std::string prefix3="12";
- Keys K2 = sdl->findKeys(nsu, prefix3); // just the prefix
- DataMap Dk2 = sdl->get(nsu, K2);
-
- std::cout << "K contains " << K2.size() << " elements.\n";
-
- fprintf(stderr, "before forloop\n");
-
- for(auto si=K2.begin();si!=K2.end();++si){
- std::vector<uint8_t> val_v = Dk2[(*si)]; // 4 lines to unpack a string
- char val[val_v.size()+1]; // from Data
- int i;
- for(i=0;i<val_v.size();++i) val[i] = (char)(val_v[i]);
- val[i]='\0';
- fprintf(stderr, "KEYS and Values %s = %s\n",(*si).c_str(), val);
- }
-
-
- fprintf(stderr, "after sdl get\n");
+ loop_thread = std::thread(&run_loop);
xfw->Run( nthreads );
-
- fprintf(stderr, "code3\n");
-
- msg = xfw->Alloc_msg( 2048 );
-
- fprintf(stderr, "code4\n");
-
}