GNU Radio 3.7.2git-79-g931a7b07 C++ API
basic_block.h
Go to the documentation of this file.
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2006,2008,2009,2011,2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  *
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with GNU Radio; see the file COPYING. If not, write to
19  * the Free Software Foundation, Inc., 51 Franklin Street,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifndef INCLUDED_GR_BASIC_BLOCK_H
24 #define INCLUDED_GR_BASIC_BLOCK_H
25 
26 #include <gnuradio/api.h>
27 #include <gnuradio/sptr_magic.h>
28 #include <gnuradio/msg_accepter.h>
29 #include <gnuradio/runtime_types.h>
30 #include <gnuradio/io_signature.h>
31 #include <gnuradio/thread/thread.h>
32 #include <boost/enable_shared_from_this.hpp>
33 #include <boost/function.hpp>
34 #include <boost/foreach.hpp>
35 #include <boost/thread/condition_variable.hpp>
36 #include <iostream>
37 #include <string>
38 #include <deque>
39 #include <map>
40 
41 #ifdef GR_CTRLPORT
43 #endif
44 
45 namespace gr {
46 
47  /*!
48  * \brief The abstract base class for all signal processing blocks.
49  * \ingroup internal
50  *
51  * Basic blocks are the bare abstraction of an entity that has a
52  * name, a set of inputs and outputs, and a message queue. These
53  * are never instantiated directly; rather, this is the abstract
54  * parent class of both gr_hier_block, which is a recursive
55  * container, and block, which implements actual signal
56  * processing functions.
57  */
59  public boost::enable_shared_from_this<basic_block>
60  {
61  typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
62 
63  private:
64  //msg_handler_t d_msg_handler;
65  typedef std::map<pmt::pmt_t , msg_handler_t, pmt::comperator> d_msg_handlers_t;
66  d_msg_handlers_t d_msg_handlers;
67 
68  typedef std::deque<pmt::pmt_t> msg_queue_t;
69  typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comperator> msg_queue_map_t;
70  typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comperator>::iterator msg_queue_map_itr;
71  std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::comperator> msg_queue_ready;
72 
73  gr::thread::mutex mutex; //< protects all vars
74 
75  protected:
76  friend class flowgraph;
77  friend class flat_flowgraph; // TODO: will be redundant
78  friend class tpb_thread_body;
79 
80  enum vcolor { WHITE, GREY, BLACK };
81 
82  std::string d_name;
87  std::string d_symbol_name;
88  std::string d_symbol_alias;
90  bool d_rpc_set;
91 
92  msg_queue_map_t msg_queue;
93  std::vector<boost::any> d_rpc_vars; // container for all RPC variables
94 
95  basic_block(void) {} // allows pure virtual interface sub-classes
96 
97  //! Protected constructor prevents instantiation by non-derived classes
98  basic_block(const std::string &name,
99  gr::io_signature::sptr input_signature,
100  gr::io_signature::sptr output_signature);
101 
102  //! may only be called during constructor
104  d_input_signature = iosig;
105  }
106 
107  //! may only be called during constructor
109  d_output_signature = iosig;
110  }
111 
112  /*!
113  * \brief Allow the flowgraph to set for sorting and partitioning
114  */
115  void set_color(vcolor color) { d_color = color; }
116  vcolor color() const { return d_color; }
117 
118  /*!
119  * \brief Tests if there is a handler attached to port \p which_port
120  */
121  virtual bool has_msg_handler(pmt::pmt_t which_port) {
122  return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
123  }
124 
125  /*
126  * This function is called by the runtime system to dispatch messages.
127  *
128  * The thread-safety guarantees mentioned in set_msg_handler are
129  * implemented by the callers of this method.
130  */
131  virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
132  {
133  // AA Update this
134  if(has_msg_handler(which_port)) { // Is there a handler?
135  d_msg_handlers[which_port](msg); // Yes, invoke it.
136  }
137  }
138 
139  // Message passing interface
141 
142  public:
143  virtual ~basic_block();
144  long unique_id() const { return d_unique_id; }
145  long symbolic_id() const { return d_symbolic_id; }
146  std::string name() const { return d_name; }
147  std::string symbol_name() const { return d_symbol_name; }
148  gr::io_signature::sptr input_signature() const { return d_input_signature; }
149  gr::io_signature::sptr output_signature() const { return d_output_signature; }
150  basic_block_sptr to_basic_block(); // Needed for Python type coercion
151  bool alias_set() { return !d_symbol_alias.empty(); }
152  std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
153  pmt::pmt_t alias_pmt(){ return pmt::intern(alias()); }
154  void set_block_alias(std::string name);
155 
156  // ** Message passing interface **
157  void message_port_register_in(pmt::pmt_t port_id);
158  void message_port_register_out(pmt::pmt_t port_id);
159  void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
160  void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
161  void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
162 
163  virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier\n"; return false; }
164  virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier_in\n"; return false; }
165  virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier_out\n"; return false; }
166 
167  /*!
168  * \brief Get input message port names.
169  *
170  * Returns the available input message ports for a block. The
171  * return object is a PMT vector that is filled with PMT symbols.
172  */
173  pmt::pmt_t message_ports_in();
174 
175  /*!
176  * \brief Get output message port names.
177  *
178  * Returns the available output message ports for a block. The
179  * return object is a PMT vector that is filled with PMT symbols.
180  */
181  pmt::pmt_t message_ports_out();
182 
183  /*!
184  * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
185  */
186  void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
187 
188  //! is the queue empty?
189  bool empty_p(pmt::pmt_t which_port) {
190  if(msg_queue.find(which_port) == msg_queue.end())
191  throw std::runtime_error("port does not exist!");
192  return msg_queue[which_port].empty();
193  }
194  bool empty_p() {
195  bool rv = true;
196  BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
197  rv &= msg_queue[i.first].empty();
198  }
199  return rv;
200  }
201 
202  //! are all msg ports with handlers empty?
203  bool empty_handled_p(pmt::pmt_t which_port){
204  return (empty_p(which_port) || !has_msg_handler(which_port));
205  }
206  bool empty_handled_p() {
207  bool rv = true;
208  BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
209  rv &= empty_handled_p(i.first);
210  }
211  return rv;
212  }
213 
214  //! How many messages in the queue?
215  size_t nmsgs(pmt::pmt_t which_port) {
216  if(msg_queue.find(which_port) == msg_queue.end())
217  throw std::runtime_error("port does not exist!");
218  return msg_queue[which_port].size();
219  }
220 
221  //| Acquires and release the mutex
222  void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
223  /*!
224  * \returns returns pmt at head of queue or pmt::pmt_t() if empty.
225  */
226  pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
227 
228  /*!
229  * \returns returns pmt at head of queue or pmt::pmt_t() if empty.
230  */
231  pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
232 
233  msg_queue_t::iterator get_iterator(pmt::pmt_t which_port) {
234  return msg_queue[which_port].begin();
235  }
236 
237  void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it) {
238  msg_queue[which_port].erase(it);
239  }
240 
241  virtual bool has_msg_port(pmt::pmt_t which_port) {
242  if(msg_queue.find(which_port) != msg_queue.end()) {
243  return true;
244  }
245  if(pmt::dict_has_key(message_subscribers, which_port)) {
246  return true;
247  }
248  return false;
249  }
250 
251 #ifdef GR_CTRLPORT
252  /*!
253  * \brief Add an RPC variable (get or set).
254  *
255  * Using controlport, we create new getters/setters and need to
256  * store them. Each block has a vector to do this, and these never
257  * need to be accessed again once they are registered with the RPC
258  * backend. This function takes a
259  * boost::shared_sptr<rpcbasic_base> so that when the block is
260  * deleted, all RPC registered variables are cleaned up.
261  *
262  * \param s an rpcbasic_sptr of the new RPC variable register to store.
263  */
264  void add_rpc_variable(rpcbasic_sptr s)
265  {
266  d_rpc_vars.push_back(s);
267  }
268 #endif /* GR_CTRLPORT */
269 
270  /*!
271  * \brief Set up the RPC registered variables.
272  *
273  * This must be overloaded by a block that wants to use
274  * controlport. This is where rpcbasic_register_{get,set} pointers
275  * are created, which then get wrapped as shared pointers
276  * (rpcbasic_sptr(...)) and stored using add_rpc_variable.
277  */
278  virtual void setup_rpc() {};
279 
280  /*!
281  * \brief Ask if this block has been registered to the RPC.
282  *
283  * We can only register a block once, so we use this to protect us
284  * from calling it multiple times.
285  */
286  bool is_rpc_set() { return d_rpc_set; }
287 
288  /*!
289  * \brief When the block is registered with the RPC, set this.
290  */
291  void rpc_set() { d_rpc_set = true; }
292 
293  /*!
294  * \brief Confirm that ninputs and noutputs is an acceptable combination.
295  *
296  * \param ninputs number of input streams connected
297  * \param noutputs number of output streams connected
298  *
299  * \returns true if this is a valid configuration for this block.
300  *
301  * This function is called by the runtime system whenever the
302  * topology changes. Most classes do not need to override this.
303  * This check is in addition to the constraints specified by the
304  * input and output gr::io_signatures.
305  */
306  virtual bool check_topology(int ninputs, int noutputs) {
307  (void)ninputs;
308  (void)noutputs;
309  return true;
310  }
311 
312  /*!
313  * \brief Set the callback that is fired when messages are available.
314  *
315  * \p msg_handler can be any kind of function pointer or function object
316  * that has the signature:
317  * <pre>
318  * void msg_handler(pmt::pmt msg);
319  * </pre>
320  *
321  * (You may want to use boost::bind to massage your callable into
322  * the correct form. See gr::blocks::nop for an example that sets
323  * up a class method as the callback.)
324  *
325  * Blocks that desire to handle messages must call this method in
326  * their constructors to register the handler that will be invoked
327  * when messages are available.
328  *
329  * If the block inherits from block, the runtime system will
330  * ensure that msg_handler is called in a thread-safe manner, such
331  * that work and msg_handler will never be called concurrently.
332  * This allows msg_handler to update state variables without
333  * having to worry about thread-safety issues with work,
334  * general_work or another invocation of msg_handler.
335  *
336  * If the block inherits from hier_block2, the runtime system
337  * will ensure that no reentrant calls are made to msg_handler.
338  */
339  template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler) {
340  if(msg_queue.find(which_port) == msg_queue.end()) {
341  throw std::runtime_error("attempt to set_msg_handler() on bad input message port!");
342  }
343  d_msg_handlers[which_port] = msg_handler_t(msg_handler);
344  }
345 
346  virtual void set_processor_affinity(const std::vector<int> &mask)
347  { throw std::runtime_error("set_processor_affinity not overloaded in child class."); }
348 
349  virtual void unset_processor_affinity()
350  { throw std::runtime_error("unset_processor_affinity not overloaded in child class."); }
351 
352  virtual std::vector<int> processor_affinity()
353  { throw std::runtime_error("processor_affinity not overloaded in child class."); }
354  };
355 
357  {
358  return lhs->unique_id() < rhs->unique_id();
359  }
360 
361  typedef std::vector<basic_block_sptr> basic_block_vector_t;
362  typedef std::vector<basic_block_sptr>::iterator basic_block_viter_t;
363 
365 
366  inline std::ostream &operator << (std::ostream &os, basic_block_sptr basic_block)
367  {
368  os << basic_block->name() << "(" << basic_block->unique_id() << ")";
369  return os;
370  }
371 
372 } /* namespace gr */
373 
374 #endif /* INCLUDED_GR_BASIC_BLOCK_H */