| // vi: ts=4 sw=4 noet: |
| /* |
| ================================================================================== |
| Copyright (c) 2020 Nokia |
| Copyright (c) 2020 AT&T Intellectual Property. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ================================================================================== |
| */ |
| |
| /* |
| Mnemonic: messenger.cpp |
| Abstract: Message Router Messenger. |
| |
| Date: 10 March 2020 |
| Author: E. Scott Daniels |
| */ |
| |
| #include <string.h> |
| #include <unistd.h> |
| |
| #include <rmr/rmr.h> |
| #include <rmr/RIC_message_types.h> |
| |
| |
| #include <iostream> |
| #include <string> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| |
| #include "callback.hpp" |
| #include "default_cb.hpp" // default callback prototypes |
| #include "message.hpp" |
| #include "messenger.hpp" |
| #include "alarm.hpp" |
| #include "metrics.hpp" |
| |
| namespace xapp { |
| |
| // --------------- private ----------------------------------------------------- |
| |
| |
| // ---------------- C++ buggerd up way of maintining class constants ---------- |
| const int xapp::Messenger::MAX_PAYLOAD = (1024*64); |
| const int xapp::Messenger::DEFAULT_CALLBACK = -1; |
| |
| // --------------- builders ----------------------------------------------- |
| /* |
| If wait4table is true, then the construction of the object does not |
| complete until the underlying transport has a new copy of the route |
| table. |
| |
| If port is nil, then the default port is used (4560). |
| */ |
| xapp::Messenger::Messenger( const char* uport, bool wait4table ) { |
| |
| if( uport == NULL ) { |
| listen_port = strdup( "4560" ); |
| } else { |
| listen_port = strdup( uport ); |
| } |
| |
| gate = new std::mutex(); |
| mrc = rmr_init( listen_port, Messenger::MAX_PAYLOAD, 0 ); |
| |
| if( wait4table ) { |
| this->Wait_for_cts( 0 ); |
| } |
| |
| Add_msg_cb( RIC_HEALTH_CHECK_REQ, Health_ck_cb, NULL ); // add our default call backs |
| |
| ok_2_run = true; |
| } |
| |
| /* |
| Move support. We DO allow the instance to be moved as only one copy |
| remains following the move. |
| Given a source object instance (soi) we move the information to |
| the new object, and then DELETE what was moved so that when the |
| user frees the soi, it doesn't destroy what we snarfed. |
| */ |
| xapp::Messenger::Messenger( Messenger&& soi ) : |
| mrc( soi.mrc ), |
| listen_port( soi.listen_port ), |
| ok_2_run( soi.ok_2_run ), |
| gate( soi.gate ), |
| cb_hash( soi.cb_hash ) // this seems dodgy |
| { |
| soi.gate = NULL; |
| soi.listen_port = NULL; |
| soi.mrc = NULL; |
| } |
| |
| /* |
| Move operator. Given a source object instance, movee it's contents |
| to this insance. We must first clean up this instance. |
| */ |
| xapp::Messenger& Messenger::operator=( Messenger&& soi ) { |
| if( this != &soi ) { // cannot move onto ourself |
| if( mrc != NULL ) { |
| rmr_close( mrc ); |
| } |
| if( listen_port != NULL ) { |
| delete( listen_port ); |
| } |
| |
| mrc = soi.mrc; |
| listen_port = soi.listen_port; |
| ok_2_run = soi.ok_2_run; |
| gate = soi.gate; |
| cb_hash = soi.cb_hash; // this seems dodgy |
| |
| soi.gate = NULL; |
| soi.listen_port = NULL; |
| soi.mrc = NULL; |
| } |
| |
| return *this; |
| } |
| |
| /* |
| Destroyer. |
| */ |
| xapp::Messenger::~Messenger() { |
| if( mrc != NULL ) { |
| rmr_close( mrc ); |
| } |
| |
| if( listen_port != NULL ) { |
| delete( listen_port ); |
| } |
| } |
| |
| /* |
| Allow user to register a callback function invoked when a specific type of |
| message is received. The user may pass an optional data pointer which |
| will be passed to the function when it is called. The function signature |
| must be: |
| void fun( Messenger* mr, rmr_mbuf_t* mbuf, void* data ) |
| |
| The user can also invoke this function to set the "default" callback by |
| passing Messenger::DEFAULT_CALLBACK as the mtype. If no other callback |
| is defined for a message type, the default callback function is invoked. |
| If a default is not provided, a non-matching message is silently dropped. |
| */ |
| void xapp::Messenger::Add_msg_cb( int mtype, user_callback fun_name, void* data ) { |
| Callback* cb; |
| |
| cb = new Callback( fun_name, data ); |
| cb_hash[mtype] = cb; |
| |
| callbacks = true; |
| } |
| |
| /* |
| Message allocation for user to send. User must destroy the message when |
| finished, but may keep the message for as long as is necessary |
| and reuse it over and over. |
| */ |
| std::unique_ptr<Message> xapp::Messenger::Alloc_msg( int payload_size ) { |
| return std::unique_ptr<Message>( new Message( mrc, payload_size ) ); |
| } |
| |
| |
| // ----------------- alarm support ----------------------------------------------- |
| /* |
| Allocate an alarm object. |
| Alarms must be allocated via the framework becasue we need a wormhole |
| id and to get one of those we need the mrc. We can easily send with |
| just a message, but to avoid having the user pass the framework |
| object in, we'll just supply a "factory" function. |
| */ |
| std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( int prob_id, const std::string& meid ) { |
| std::shared_ptr<Message> m; |
| Alarm* a; |
| |
| m = Alloc_msg( 4096 ); |
| a = new Alarm( m, prob_id, meid ); |
| a->Set_whid( Wormhole_open( a->Get_endpoint() ) ); |
| |
| return std::unique_ptr<Alarm>( a ); |
| } |
| |
| std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( const std::string& meid ) { |
| return Alloc_alarm( -1, meid ); |
| } |
| |
| std::unique_ptr<xapp::Alarm> xapp::Messenger::Alloc_alarm( ) { |
| return Alloc_alarm( -1, "" ); |
| } |
| |
| |
| // ------------------ metrics support -------------------------------------------------- |
| std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( ) { |
| std::shared_ptr<Message> m; |
| |
| m = Alloc_msg( 4096 ); |
| return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m ) ); |
| } |
| |
| std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( const std::string& source ) { |
| std::shared_ptr<Message> m; |
| |
| m = Alloc_msg( 4096 ); |
| return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m, source ) ); |
| } |
| |
| std::unique_ptr<xapp::Metrics> xapp::Messenger::Alloc_metrics( const std::string& reporter, const std::string& source ) { |
| std::shared_ptr<Message> m; |
| |
| m = Alloc_msg( 4096 ); |
| return std::unique_ptr<xapp::Metrics>( new xapp::Metrics( m, reporter, source ) ); |
| } |
| |
| // ------------------- listening support ----------------------------------------------- |
| |
| /* |
| The Listen function waits for messages and drives the appropriate callback |
| function when one is received. This function will return to the caller |
| only when the ok to run flag in the object has been set to false (likely |
| never, or only at graceful termination). Callers should normally not |
| expect to have controll returned in the calling thread. |
| |
| Concurrently executing listeners are allowed. |
| */ |
| void xapp::Messenger::Listen( ) { |
| rmr_mbuf_t* mbuf = NULL; |
| std::map<int,Callback*>::iterator mi; // map iterator; silly indirect way to point at the value |
| Callback* dcb = NULL; // default callback so we don't search |
| Callback* sel_cb; // callback selected to invoke |
| std::unique_ptr<Message> m; |
| |
| if( mrc == NULL ) { |
| return; |
| } |
| |
| mi = cb_hash.find( DEFAULT_CALLBACK ); |
| if( mi != cb_hash.end() ) { |
| dcb = mi->second; // oddly named second field is the address of the callback block |
| } |
| |
| while( ok_2_run ) { |
| mbuf = rmr_torcv_msg( mrc, mbuf, 2000 ); // come up for air every 2 sec to check ok2run |
| if( mbuf != NULL ) { |
| if( mbuf->state == RMR_OK ) { |
| m = std::unique_ptr<Message>( new Message( mbuf, mrc ) ); // auto delteted when scope terminates |
| |
| sel_cb = dcb; // start with default |
| if( callbacks && ((mi = cb_hash.find( mbuf->mtype )) != cb_hash.end()) ) { |
| sel_cb = mi->second; // override with user callback |
| } |
| if( sel_cb != NULL ) { |
| sel_cb->Drive_cb( *m ); // drive the selected one |
| mbuf = NULL; // not safe to use after given to cb |
| } |
| } else { |
| if( mbuf->state != RMR_ERR_TIMEOUT ) { |
| fprintf( stderr, "<LISTENER> got bad status: %d\n", mbuf->state ); |
| } |
| } |
| } |
| } |
| } |
| |
| /* |
| Wait for the next message, up to a max timout, and return the message received. |
| This function allows the user xAPP to implement their own polling loop (no callbacks). |
| */ |
| std::unique_ptr<Message> xapp::Messenger::Receive( int timeout ) { |
| rmr_mbuf_t* mbuf = NULL; |
| std::unique_ptr<Message> m = NULL; |
| |
| if( mrc != NULL ) { |
| mbuf = rmr_torcv_msg( mrc, mbuf, timeout ); // future: do we want to reuse the mbuf here? |
| if( mbuf != NULL ) { |
| m = std::unique_ptr<Message>( new Message( mbuf, mrc ) ); |
| } |
| } |
| |
| return m; |
| } |
| |
| /* |
| Called to gracefully stop all listeners. |
| */ |
| void xapp::Messenger::Stop( ) { |
| ok_2_run = false; |
| } |
| |
| /* |
| RMR messages must be released by RMR as there might be transport |
| buffers that have to be dealt with. Every callback is expected to |
| call this function when finished with the message. |
| void xapp::Messenger::Release_mbuf( void* vmbuf ) { |
| rmr_free_msg( (rmr_mbuf_t *) vmbuf ); |
| } |
| */ |
| |
| /* |
| Wait for clear to send. |
| Until RMR loads a route table, all sends will fail with a |
| "no endpoint" state. This function allows the user application |
| to block until RMR has a viable route table. It does not guarentee |
| that every message that the user app will try to send has an entry. |
| |
| The use of this function by the user application allows for the |
| parallel initialisation of the application while waiting for the |
| route table service to generate a table for the application. The |
| initialisation function may be callsed with "no wait" and this |
| function invoked when the application has completed initialisation |
| and is ready to start sending messages. |
| |
| The max wait parameter is the maximum number of seconds to block. |
| If RMR never reports ready false is returned. A true return |
| incidcates all is ready. If max_wait is 0, then this will only |
| return when RMR is ready to send. |
| */ |
| bool xapp::Messenger::Wait_for_cts( int max_wait ) { |
| bool block_4ever; |
| bool state = false; |
| |
| block_4ever = max_wait == 0; |
| while( block_4ever || max_wait > 0 ) { |
| if( rmr_ready( mrc ) ) { |
| state = true; |
| break; |
| } |
| |
| sleep( 1 ); |
| max_wait--; |
| } |
| |
| return state; |
| } |
| |
| /* |
| Open a wormhole to the indicated endpoint and return the wormhole ID. |
| */ |
| int xapp::Messenger::Wormhole_open( const std::string& endpoint ) { |
| rmr_whid_t whid; |
| |
| whid = rmr_wh_open( mrc, endpoint.c_str() ); |
| |
| return (int) whid; |
| } |
| |
| |
| } // namespace |