Fawkes API  Fawkes Development Version
communicator.h
1 
2 /***************************************************************************
3  * communicator.h - protobuf network communication for CLIPS
4  *
5  * Created: Tue Apr 16 13:41:13 2013
6  * Copyright 2013-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #ifndef _PROTOBUF_CLIPS_COMMUNICATOR_H_
38 #define _PROTOBUF_CLIPS_COMMUNICATOR_H_
39 
40 #include <core/threading/mutex.h>
41 #include <protobuf_comm/server.h>
42 
43 #include <clipsmm.h>
44 #include <list>
45 #include <map>
46 
47 namespace protobuf_comm {
48 class ProtobufStreamClient;
49 class ProtobufBroadcastPeer;
50 } // namespace protobuf_comm
51 
52 namespace fawkes {
53 class Logger;
54 }
55 
56 namespace protobuf_clips {
57 
59 {
60 public:
61  ClipsProtobufCommunicator(CLIPS::Environment *env,
62  fawkes::Mutex & env_mutex,
63  fawkes::Logger * logger = NULL);
64  ClipsProtobufCommunicator(CLIPS::Environment * env,
65  fawkes::Mutex & env_mutex,
66  std::vector<std::string> &proto_path,
67  fawkes::Logger * logger = NULL);
69 
70  void enable_server(int port);
71  void disable_server();
72 
73  /** Get Protobuf server.
74  * @return protobuf server */
75  protobuf_comm::ProtobufStreamServer *
76  server() const
77  {
78  return server_;
79  }
80 
81  /** Get protobuf_comm peers.
82  * @return protobuf_comm peer */
83  const std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> &
84  peers() const
85  {
86  return peers_;
87  }
88 
89  /** Get the communicator's message register.
90  * @return message register */
91  protobuf_comm::MessageRegister &
93  {
94  return *message_register_;
95  }
96 
97  /** Signal invoked for a message that has been sent to a server client.
98  * @return signal
99  */
100  boost::signals2::signal<void(protobuf_comm::ProtobufStreamServer::ClientID,
101  std::shared_ptr<google::protobuf::Message>)> &
103  {
104  return sig_server_sent_;
105  }
106 
107  /** Signal invoked for a message that has been sent to a client.
108  * @return signal
109  */
110  boost::signals2::signal<
111  void(std::string, unsigned short, std::shared_ptr<google::protobuf::Message>)> &
113  {
114  return sig_client_sent_;
115  }
116 
117  /** Signal invoked for a message that has been sent via broadcast.
118  * @return signal
119  */
120  boost::signals2::signal<void(long, std::shared_ptr<google::protobuf::Message>)> &
122  {
123  return sig_peer_sent_;
124  }
125 
126 private:
127  void setup_clips();
128 
129  CLIPS::Value clips_pb_register_type(std::string full_name);
130  CLIPS::Values clips_pb_field_names(void *msgptr);
131  CLIPS::Value clips_pb_has_field(void *msgptr, std::string field_name);
132  CLIPS::Value clips_pb_field_value(void *msgptr, std::string field_name);
133  CLIPS::Value clips_pb_field_type(void *msgptr, std::string field_name);
134  CLIPS::Value clips_pb_field_label(void *msgptr, std::string field_name);
135  CLIPS::Values clips_pb_field_list(void *msgptr, std::string field_name);
136  CLIPS::Value clips_pb_field_is_list(void *msgptr, std::string field_name);
137  CLIPS::Value clips_pb_create(std::string full_name);
138  CLIPS::Value clips_pb_ref(void *msgptr);
139  void clips_pb_destroy(void *msgptr);
140  void clips_pb_set_field(void *msgptr, std::string field_name, CLIPS::Value value);
141  void clips_pb_add_list(void *msgptr, std::string field_name, CLIPS::Value value);
142  void clips_pb_send(long int client_id, void *msgptr);
143  std::string clips_pb_tostring(void *msgptr);
144  long int clips_pb_client_connect(std::string host, int port);
145  void clips_pb_disconnect(long int client_id);
146  void clips_pb_broadcast(long int peer_id, void *msgptr);
147  void clips_pb_enable_server(int port);
148 
149  long int clips_pb_peer_create(std::string host, int port);
150  long int clips_pb_peer_create_local(std::string host, int send_port, int recv_port);
151  long int clips_pb_peer_create_crypto(std::string host,
152  int port,
153  std::string crypto_key = "",
154  std::string cipher = "");
155  long int clips_pb_peer_create_local_crypto(std::string host,
156  int send_port,
157  int recv_port,
158  std::string crypto_key = "",
159  std::string cipher = "");
160  void clips_pb_peer_destroy(long int peer_id);
161  void clips_pb_peer_setup_crypto(long int peer_id, std::string crypto_key, std::string cipher);
162 
163  typedef enum { CT_SERVER, CT_CLIENT, CT_PEER } ClientType;
164  void clips_assert_message(std::pair<std::string, unsigned short> & endpoint,
165  uint16_t comp_id,
166  uint16_t msg_type,
167  std::shared_ptr<google::protobuf::Message> &msg,
168  ClientType ct,
169  long int client_id = 0);
170  void handle_server_client_connected(protobuf_comm::ProtobufStreamServer::ClientID client,
171  boost::asio::ip::tcp::endpoint & endpoint);
172  void handle_server_client_disconnected(protobuf_comm::ProtobufStreamServer::ClientID client,
173  const boost::system::error_code & error);
174 
175  void handle_server_client_msg(protobuf_comm::ProtobufStreamServer::ClientID client,
176  uint16_t component_id,
177  uint16_t msg_type,
178  std::shared_ptr<google::protobuf::Message> msg);
179 
180  void handle_server_client_fail(protobuf_comm::ProtobufStreamServer::ClientID client,
181  uint16_t component_id,
182  uint16_t msg_type,
183  std::string msg);
184 
185  void handle_peer_msg(long int peer_id,
186  boost::asio::ip::udp::endpoint & endpoint,
187  uint16_t component_id,
188  uint16_t msg_type,
189  std::shared_ptr<google::protobuf::Message> msg);
190  void handle_peer_recv_error(long int peer_id,
191  boost::asio::ip::udp::endpoint &endpoint,
192  std::string msg);
193  void handle_peer_send_error(long int peer_id, std::string msg);
194 
195  void handle_client_connected(long int client_id);
196  void handle_client_disconnected(long int client_id, const boost::system::error_code &error);
197  void handle_client_msg(long int client_id,
198  uint16_t comp_id,
199  uint16_t msg_type,
200  std::shared_ptr<google::protobuf::Message> msg);
201  void handle_client_receive_fail(long int client_id,
202  uint16_t comp_id,
203  uint16_t msg_type,
204  std::string msg);
205 
206  static std::string to_string(const CLIPS::Value &v);
207 
208 private:
209  CLIPS::Environment *clips_;
210  fawkes::Mutex & clips_mutex_;
211 
212  fawkes::Logger *logger_;
213 
214  protobuf_comm::MessageRegister * message_register_;
215  protobuf_comm::ProtobufStreamServer *server_;
216 
217  boost::signals2::signal<void(protobuf_comm::ProtobufStreamServer::ClientID,
218  std::shared_ptr<google::protobuf::Message>)>
219  sig_server_sent_;
220  boost::signals2::signal<
221  void(std::string, unsigned short, std::shared_ptr<google::protobuf::Message>)>
222  sig_client_sent_;
223  boost::signals2::signal<void(long int, std::shared_ptr<google::protobuf::Message>)>
224  sig_peer_sent_;
225 
226  fawkes::Mutex map_mutex_;
227  long int next_client_id_;
228 
229  std::map<long int, protobuf_comm::ProtobufStreamServer::ClientID> server_clients_;
230  typedef std::map<protobuf_comm::ProtobufStreamServer::ClientID, long int> RevServerClientMap;
231  RevServerClientMap rev_server_clients_;
232  std::map<long int, protobuf_comm::ProtobufStreamClient *> clients_;
233  std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> peers_;
234 
235  std::map<long int, std::pair<std::string, unsigned short>> client_endpoints_;
236 
237  std::list<std::string> functions_;
238  CLIPS::Fact::pointer avail_fact_;
239 };
240 
241 } // end namespace protobuf_clips
242 
243 #endif
Interface for logging.
Definition: logger.h:42
Mutex mutual exclusion lock.
Definition: mutex.h:33
CLIPS protobuf integration class.
Definition: communicator.h:59
boost::signals2::signal< void(protobuf_comm::ProtobufStreamServer::ClientID, std::shared_ptr< google::protobuf::Message >)> & signal_server_sent()
Signal invoked for a message that has been sent to a server client.
Definition: communicator.h:102
const std::map< long int, protobuf_comm::ProtobufBroadcastPeer * > & peers() const
Get protobuf_comm peers.
Definition: communicator.h:84
void enable_server(int port)
Enable protobuf stream server.
boost::signals2::signal< void(long, std::shared_ptr< google::protobuf::Message >)> & signal_peer_sent()
Signal invoked for a message that has been sent via broadcast.
Definition: communicator.h:121
ClipsProtobufCommunicator(CLIPS::Environment *env, fawkes::Mutex &env_mutex, fawkes::Logger *logger=NULL)
Constructor.
protobuf_comm::MessageRegister & message_register()
Get the communicator's message register.
Definition: communicator.h:92
protobuf_comm::ProtobufStreamServer * server() const
Get Protobuf server.
Definition: communicator.h:76
void disable_server()
Disable protobu stream server.
boost::signals2::signal< void(std::string, unsigned short, std::shared_ptr< google::protobuf::Message >)> & signal_client_sent()
Signal invoked for a message that has been sent to a client.
Definition: communicator.h:112
Fawkes library namespace.