Fawkes API  Fawkes Development Version
blackboard_manager.cpp
1 
2 /***************************************************************************
3  * Protoboard plugin template
4  * - blackboard manager implementation: Watch interfaces specified in
5  * instantiation and call to protobuf sender thread accordingly.
6  * Also specialize templates for peer handling since that is domain
7  * independent.
8  *
9  * Copyright 2019 Victor MatarĂ©
10  ****************************************************************************/
11 
12 /* This program is free software; you can redistribute it and/or modify
13  * it under the terms of the GNU General Public License as published by
14  * the Free Software Foundation; either version 2 of the License, or
15  * (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU Library General Public License for more details.
21  *
22  * Read the full text in the LICENSE.GPL file in the doc directory.
23  */
24 
25 #include "blackboard_manager.h"
26 
27 #include "protobuf_to_bb.h"
28 
29 namespace protoboard {
30 
31 using namespace fawkes;
32 
34 {
35 }
36 
38 {
39 }
40 
42 : fawkes::Thread("ProtoboardBlackboardManager", Thread::OPMODE_WAITFORWAKEUP),
43  message_handler_(msg_handler),
44  bb_receiving_interfaces_(make_receiving_interfaces_map()),
45  on_message_waker_(nullptr),
46  next_peer_idx_(0)
47 {
49 }
50 
51 void
53 {
54  pb_sender_.reset(sender);
55 }
56 
57 void
59 {
60  pb_sender_->init();
61  peer_iface_ = blackboard->open_for_writing<ProtobufPeerInterface>("/protoboard/peers");
62 
63  on_message_waker_ = new fawkes::BlackBoardOnMessageWaker(blackboard, peer_iface_, this);
64 
65  for (pb_conversion_map::value_type &c : bb_receiving_interfaces_)
66  c.second->init(blackboard, logger);
67 }
68 
69 void
71 {
72  delete on_message_waker_;
73  bb_receiving_interfaces_.clear();
74  pb_sender_->finalize();
75  blackboard->close(peer_iface_);
76 }
77 
78 void
80 {
81  // Handle CreatePeer* messages
82  on_interface<ProtobufPeerInterface>{peer_iface_, this}
87 
88  // Handle sending blackboard interfaces
89  pb_sender_->process_sending_interfaces();
90 
91  // Handle receiving blackboard interfaces
92  while (message_handler_->pb_queue_incoming()) {
93  ProtobufThead::incoming_message inc = message_handler_->pb_queue_pop();
94  pb_conversion_map::iterator it;
95 
96  if ((it = bb_receiving_interfaces_.find(inc.msg->GetTypeName()))
97  == bb_receiving_interfaces_.end()) {
99  "Received message of unregistered type `%s'",
100  inc.msg->GetTypeName().c_str());
101  continue;
102  }
103  try {
104  it->second->handle(inc.msg);
105  } catch (std::exception &e) {
106  logger->log_error(name(),
107  "Exception while handling %s: %s",
108  inc.msg->GetTypeName().c_str(),
109  e.what());
110  }
111  }
112 }
113 
114 BlackBoard *
116 {
117  return blackboard;
118 }
119 
120 void
121 BlackboardManager::add_peer(ProtobufPeerInterface *iface, long peer_id)
122 {
123  if (next_peer_idx_ >= iface->maxlenof_peers()) {
124  logger->log_error(name(),
125  "Maximum number of peers reached. Can't create new peer with index %d!",
126  next_peer_idx_);
127  return;
128  }
129  iface->set_peers(next_peer_idx_++, peer_id);
130  iface->write();
131 }
132 
133 /**
134  * Local specialization for the CreatePeerMessage that establishes ProtoBuf communication
135  * @param iface The ProtobufPeerInterface
136  * @param msg The incoming CreatePeerMessage
137  */
138 template <>
139 void
142 {
143  add_peer(iface, message_handler_->peer_create(msg->address(), msg->port()));
144 }
145 
146 /**
147  * Local specialization for the CreatePeerLocalMessage that establishes ProtoBuf communication
148  * @param iface The ProtobufPeerInterface
149  * @param msg The incoming CreatePeerLocalMessage
150  */
151 template <>
152 void
155 {
156  add_peer(iface,
157  message_handler_->peer_create_local(msg->address(),
158  msg->send_to_port(),
159  msg->recv_on_port()));
160 }
161 
162 /**
163  * Local specialization for the CreatePeerCryptoMessage that establishes ProtoBuf communication
164  * @param iface The ProtobufPeerInterface
165  * @param msg The incoming CreatePeerCryptoMessage
166  */
167 template <>
168 void
171 {
172  add_peer(iface,
173  message_handler_->peer_create_crypto(
174  msg->address(), msg->port(), msg->crypto_key(), msg->cipher()));
175 }
176 
177 /**
178  * Local specialization for the CreatePeerLocalCryptoMessage that establishes ProtoBuf communication
179  * @param iface The ProtobufPeerInterface
180  * @param msg The incoming CreatePeerLocalCryptoMessage
181  */
182 template <>
183 void
186 {
187  add_peer(iface,
188  message_handler_->peer_create_local_crypto(msg->address(),
189  msg->send_to_port(),
190  msg->recv_on_port(),
191  msg->crypto_key(),
192  msg->cipher()));
193 }
194 
195 } // namespace protoboard
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Wake threads on receiving a blackboard message.
The BlackBoard abstract class.
Definition: blackboard.h:46
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:501
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
CreatePeerCryptoMessage Fawkes BlackBoard Interface Message.
CreatePeerLocalCryptoMessage Fawkes BlackBoard Interface Message.
CreatePeerLocalMessage Fawkes BlackBoard Interface Message.
CreatePeerMessage Fawkes BlackBoard Interface Message.
ProtobufPeerInterface Fawkes BlackBoard Interface.
size_t maxlenof_peers() const
Get maximum length of peers value.
void set_peers(unsigned int index, const int64_t new_peers)
Set peers value at given index.
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
Definition: thread.cpp:729
Abstract superclass for sending out ProtoBuf messages.
AbstractProtobufSender(BlackboardManager *bb_mgr)
Constructor.
virtual ~AbstractProtobufSender()
Destructor.
The main thread that is woken each time a message arrives on any of the interfaces watched by a bb_if...
virtual void finalize() override
Finalize the thread.
void set_protobuf_sender(AbstractProtobufSender *sender)
The ProtoBuf sender must be initialized after construction to beak a dependency loop.
fawkes::BlackBoard * get_blackboard()
Helper for other classes to get access to the blackboard.
virtual void init() override
Initialize the thread.
virtual void loop() override
Code to execute in the thread.
void handle_message(InterfaceT *iface, MessageT *msg)
Act on a given message on a given blackboard interface.
BlackboardManager(ProtobufThead *msg_handler)
Main thread constructor.
Receive incoming ProtoBuf messages and pass them on to the BlackboardManager for publication to the a...
incoming_message pb_queue_pop()
long int peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
long int peer_create(const std::string &host, int port)
Enable protobuf peer.
long int peer_create_local(const std::string &host, int send_to_port, int recv_on_port)
Enable protobuf peer.
long int peer_create_local_crypto(const std::string &host, int send_to_port, int recv_on_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Fawkes library namespace.
Wrapper for a ProtoBuf message and its metadata.
std::shared_ptr< google::protobuf::Message > msg
The message itself.