GNU Radio v3.6.2-209-ga47c5485 C++ API
gr_basic_block.h
Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2006,2008,2009,2011 Free Software Foundation, Inc.
00004  *
00005  * This file is part of GNU Radio
00006  *
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 3, or (at your option)
00010  * any later version.
00011  *
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with GNU Radio; see the file COPYING.  If not, write to
00019  * the Free Software Foundation, Inc., 51 Franklin Street,
00020  * Boston, MA 02110-1301, USA.
00021  */
00022 
00023 #ifndef INCLUDED_GR_BASIC_BLOCK_H
00024 #define INCLUDED_GR_BASIC_BLOCK_H
00025 
00026 #include <gr_core_api.h>
00027 #include <gr_runtime_types.h>
00028 #include <gr_sptr_magic.h>
00029 #include <boost/enable_shared_from_this.hpp>
00030 #include <boost/function.hpp>
00031 #include <gr_msg_accepter.h>
00032 #include <string>
00033 #include <deque>
00034 #include <map>
00035 #include <gr_io_signature.h>
00036 #include <gruel/thread.h>
00037 #include <boost/foreach.hpp>
00038 #include <boost/thread/condition_variable.hpp>
00039 
00040 /*!
00041  * \brief The abstract base class for all signal processing blocks.
00042  * \ingroup internal
00043  *
00044  * Basic blocks are the bare abstraction of an entity that has a name,
00045  * a set of inputs and outputs, and a message queue.  These are never instantiated
00046  * directly; rather, this is the abstract parent class of both gr_hier_block,
00047  * which is a recursive container, and gr_block, which implements actual
00048  * signal processing functions.
00049  */
00050 
00051 class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
00052 {
00053     typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
00054 
00055 private:
00056     /*
00057      * This function is called by the runtime system to dispatch messages.
00058      *
00059      * The thread-safety guarantees mentioned in set_msg_handler are implemented
00060      * by the callers of this method.
00061      */
00062     void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
00063     {
00064         // AA Update this
00065       if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
00066         d_msg_handlers[which_port](msg); // Yes, invoke it.
00067     };
00068 
00069     //msg_handler_t      d_msg_handler;
00070     typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
00071     d_msg_handlers_t d_msg_handlers;
00072    
00073     typedef std::deque<pmt::pmt_t>    msg_queue_t;
00074     typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>    msg_queue_map_t;
00075     msg_queue_map_t msg_queue;
00076 //    boost::condition_variable msg_queue_ready;
00077     std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
00078 
00079     gruel::mutex          mutex;          //< protects all vars
00080 
00081 
00082 protected:
00083     friend class gr_flowgraph;
00084     friend class gr_flat_flowgraph; // TODO: will be redundant
00085     friend class gr_tpb_thread_body;
00086 
00087     enum vcolor { WHITE, GREY, BLACK };
00088 
00089     std::string          d_name;
00090     gr_io_signature_sptr d_input_signature;
00091     gr_io_signature_sptr d_output_signature;
00092     long                 d_unique_id;
00093     long                 d_symbolic_id;
00094     std::string          d_symbol_name;
00095     std::string          d_symbol_alias;
00096     vcolor               d_color;
00097 
00098     gr_basic_block(void){} //allows pure virtual interface sub-classes
00099 
00100     //! Protected constructor prevents instantiation by non-derived classes
00101     gr_basic_block(const std::string &name,
00102                    gr_io_signature_sptr input_signature,
00103                    gr_io_signature_sptr output_signature);
00104 
00105     //! may only be called during constructor
00106     void set_input_signature(gr_io_signature_sptr iosig) {
00107         d_input_signature = iosig;
00108     }
00109 
00110     //! may only be called during constructor
00111     void set_output_signature(gr_io_signature_sptr iosig) {
00112         d_output_signature = iosig;
00113     }
00114 
00115     /*!
00116      * \brief Allow the flowgraph to set for sorting and partitioning
00117      */
00118     void set_color(vcolor color) { d_color = color; }
00119     vcolor color() const { return d_color; }
00120 
00121     // Message passing interface
00122     pmt::pmt_t message_subscribers;
00123 
00124 public:
00125     virtual ~gr_basic_block();
00126     long unique_id() const { return d_unique_id; }
00127     long symbolic_id() const { return d_symbolic_id; }
00128     std::string name() const { return d_name; }
00129     std::string symbol_name() const { return d_symbol_name; }
00130     gr_io_signature_sptr input_signature() const  { return d_input_signature; }
00131     gr_io_signature_sptr output_signature() const { return d_output_signature; }
00132     gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
00133     bool alias_set() { return !d_symbol_alias.empty(); }
00134     std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
00135     pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
00136     void set_block_alias(std::string name);
00137 
00138     // ** Message passing interface **
00139     void message_port_register_in(pmt::pmt_t port_id);
00140     void message_port_register_out(pmt::pmt_t port_id);
00141     void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
00142     void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
00143     void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
00144 
00145     /*!
00146      * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
00147      */
00148     void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
00149 
00150     //! is the queue empty?
00151     //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
00152     bool empty_p(pmt::pmt_t which_port) { 
00153         if(msg_queue.find(which_port) == msg_queue.end())
00154             throw std::runtime_error("port does not exist!");
00155         return msg_queue[which_port].empty(); 
00156         }
00157     bool empty_p() { 
00158         bool rv = true;
00159         BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
00160         return rv;
00161         }
00162 
00163     //| Acquires and release the mutex
00164     void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
00165     /*!
00166      * \returns returns pmt at head of queue or pmt_t() if empty.
00167      */
00168     pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
00169 
00170     /*!
00171      * \returns returns pmt at head of queue or pmt_t() if empty.
00172      */
00173     pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
00174 
00175     msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
00176         return msg_queue[which_port].begin();
00177         }
00178     void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
00179         msg_queue[which_port].erase(it);
00180         }
00181 
00182 
00183     /*!
00184      * \brief Confirm that ninputs and noutputs is an acceptable combination.
00185      *
00186      * \param ninputs   number of input streams connected
00187      * \param noutputs  number of output streams connected
00188      *
00189      * \returns true if this is a valid configuration for this block.
00190      *
00191      * This function is called by the runtime system whenever the
00192      * topology changes.  Most classes do not need to override this.
00193      * This check is in addition to the constraints specified by the input
00194      * and output gr_io_signatures.
00195      */
00196     virtual bool check_topology(int ninputs, int noutputs) { return true; }
00197 
00198     /*!
00199      * \brief Set the callback that is fired when messages are available.
00200      *
00201      * \p msg_handler can be any kind of function pointer or function object
00202      * that has the signature:
00203      * <pre>
00204      *    void msg_handler(pmt::pmt msg);
00205      * </pre>
00206      *
00207      * (You may want to use boost::bind to massage your callable into the
00208      * correct form.  See gr_nop.{h,cc} for an example that sets up a class
00209      * method as the callback.)
00210      *
00211      * Blocks that desire to handle messages must call this method in their
00212      * constructors to register the handler that will be invoked when messages
00213      * are available.
00214      *
00215      * If the block inherits from gr_block, the runtime system will ensure that
00216      * msg_handler is called in a thread-safe manner, such that work and
00217      * msg_handler will never be called concurrently.  This allows msg_handler
00218      * to update state variables without having to worry about thread-safety
00219      * issues with work, general_work or another invocation of msg_handler.
00220      *
00221      * If the block inherits from gr_hier_block2, the runtime system will
00222      * ensure that no reentrant calls are made to msg_handler.
00223      */
00224     //template <typename T> void set_msg_handler(T msg_handler){
00225     //  d_msg_handler = msg_handler_t(msg_handler);
00226     //}
00227     template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
00228       if(msg_queue.find(which_port) == msg_queue.end()){ 
00229             throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
00230       d_msg_handlers[which_port] = msg_handler_t(msg_handler);
00231     }
00232 };
00233 
00234 inline bool operator<(gr_basic_block_sptr lhs, gr_basic_block_sptr rhs)
00235 {
00236   return lhs->unique_id() < rhs->unique_id();
00237 }
00238 
00239 typedef std::vector<gr_basic_block_sptr> gr_basic_block_vector_t;
00240 typedef std::vector<gr_basic_block_sptr>::iterator gr_basic_block_viter_t;
00241 
00242 GR_CORE_API long gr_basic_block_ncurrently_allocated();
00243 
00244 inline std::ostream &operator << (std::ostream &os, gr_basic_block_sptr basic_block)
00245 {
00246     os << basic_block->name() << "(" << basic_block->unique_id() << ")";
00247     return os;
00248 }
00249 
00250 #endif /* INCLUDED_GR_BASIC_BLOCK_H */