blob: d4033c32116a7307671713e8b2d433efabea4c21 [file] [log] [blame]
// vi: ts=4 sw=4 noet:
/*
==================================================================================
Copyright (c) 2020 Nokia
Copyright (c) 2020 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
/*
Mnemonic: messenger.cpp
Abstract: Message Router Messenger.
Date: 10 March 2020
Author: E. Scott Daniels
*/
#include <string.h>
#include <unistd.h>
#include <rmr/rmr.h>
#include <rmr/RIC_message_types.h>
#include <iostream>
#include <string>
#include <map>
#include <memory>
#include <mutex>
#include "callback.hpp"
#include "default_cb.hpp" // default callback prototypes
#include "message.hpp"
#include "messenger.hpp"
// --------------- private -----------------------------------------------------
// ---------------- C++ buggerd up way of maintining class constants ----------
const int Messenger::MAX_PAYLOAD = (1024*64);
const int Messenger::DEFAULT_CALLBACK = -1;
// --------------- builders -----------------------------------------------
/*
If wait4table is true, then the construction of the object does not
complete until the underlying transport has a new copy of the route
table.
If port is nil, then the default port is used (4560).
*/
Messenger::Messenger( char* port, bool wait4table ) {
if( port == NULL ) {
port = (char *) "4560";
}
gate = new std::mutex();
listen_port = strdup( port );
mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 );
if( wait4table ) {
this->Wait_for_cts( 0 );
}
Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL ); // add our default call backs
ok_2_run = true;
}
/*
Destroyer.
*/
Messenger::~Messenger() {
if( mrc != NULL ) {
rmr_close( mrc );
}
free( listen_port );
}
/*
Allow user to register a callback function invoked when a specific type of
message is received. The user may pass an optional data pointer which
will be passed to the function when it is called. The function signature
must be:
void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data );
The user can also invoke this function to set the "default" callback by
passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback
is defined for a message type, the default callback function is invoked.
If a default is not provided, a non-matching message is silently dropped.
*/
void Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) {
Callback* cb;
cb = new Callback( fun_name, data );
cb_hash[mtype] = cb;
callbacks = true;
}
/*
Message allocation for user to send. User must destroy the message when
finished, but may keep the message for as long as is necessary
and reuse it over and over.
*/
//Message* Messenger::Alloc_msg( int payload_size ) {
std::unique_ptr<Message> Messenger::Alloc_msg( int payload_size ) {
return std::unique_ptr<Message>( new Message( mrc, payload_size ) );
}
void Messenger::Listen( ) {
int count = 0;
rmr_mbuf_t* mbuf = NULL;
std::map<int,Callback*>::iterator mi; // map iterator; silly indirect way to point at the value
Callback* dcb = NULL; // default callback so we don't search
Callback* sel_cb; // callback selected to invoke
std::unique_ptr<Message>m;
if( mrc == NULL ) {
return;
}
mi = cb_hash.find( DEFAULT_CALLBACK );
if( mi != cb_hash.end() ) {
dcb = mi->second; // oddly named second field is the address of the callback block
}
while( ok_2_run ) {
mbuf = rmr_torcv_msg( mrc, mbuf, 2000 ); // come up for air every 2 sec to check ok2run
if( mbuf != NULL ) {
if( mbuf->state == RMR_OK ) {
m = std::unique_ptr<Message>( new Message( mbuf, mrc ) ); // auto delteted when scope terminates
sel_cb = dcb; // start with default
if( callbacks && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) {
sel_cb = mi->second; // override with user callback
}
if( sel_cb != NULL ) {
sel_cb->Drive_cb( *this, *m ); // drive the selected one
mbuf = NULL; // not safe to use after given to cb
}
} else {
if( mbuf->state != RMR_ERR_TIMEOUT ) {
fprintf( stderr, "<LISTENER> got bad status: %d\n", mbuf->state );
}
}
}
}
}
/*
Wait for the next message, up to a max timout, and return the message received.
*/
std::unique_ptr<Message> Messenger::Receive( int timeout ) {
rmr_mbuf_t* mbuf = NULL;
//std::unique_ptr<Message> m;
if( mrc == NULL ) {
return NULL;
}
mbuf = rmr_torcv_msg( mrc, mbuf, timeout ); // future: do we want to reuse the mbuf here?
if( mbuf != NULL ) {
return std::unique_ptr<Message>( new Message( mbuf, mrc ) );
}
return NULL;
}
/*
Called to gracefully stop all listeners.
*/
void Messenger::Stop( ) {
ok_2_run = false;
}
/*
RMR messages must be released by RMR as there might be transport
buffers that have to be dealt with. Every callback is expected to
call this function when finished with the message.
*/
void Messenger::Release_mbuf( void* vmbuf ) {
rmr_free_msg( (rmr_mbuf_t *) vmbuf );
}
/*
Wait for clear to send.
Until RMR loads a route table, all sends will fail with a
"no endpoint" state. This function allows the user application
to block until RMR has a viable route table. It does not guarentee
that every message that the user app will try to send has an entry.
The use of this function by the user application allows for the
parallel initialisation of the application while waiting for the
route table service to generate a table for the application. The
initialisation function may be callsed with "no wait" and this
function invoked when the application has completed initialisation
and is ready to start sending messages.
The max wait parameter is the maximum number of seconds to block.
If RMR never reports ready false is returned. A true return
incidcates all is ready. If max_wait is 0, then this will only
return when RMR is ready to send.
*/
bool Messenger::Wait_for_cts( int max_wait ) {
bool block_4ever;
block_4ever = max_wait == 0;
while( block_4ever || max_wait > 0 ) {
if( rmr_ready( mrc ) ) {
return true;
}
sleep( 1 );
max_wait--;
}
return false;
}