Fawkes API  Fawkes Development Version
mongodb_instance_config.cpp
1 
2 /***************************************************************************
3  * mongodb_instance_config.cpp - MongoDB instance configuration
4  *
5  * Created: Wed Jul 12 14:33:02 2017
6  * Copyright 2006-2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mongodb_instance_config.h"
23 
24 #include "utils.h"
25 
26 #include <config/config.h>
27 #include <core/exceptions/system.h>
28 #include <utils/sub_process/proc.h>
29 #include <utils/time/wait.h>
30 
31 #include <boost/filesystem.hpp>
32 #include <boost/filesystem/operations.hpp>
33 #include <boost/format.hpp>
34 #include <bsoncxx/builder/basic/document.hpp>
35 #include <bsoncxx/json.hpp>
36 #include <chrono>
37 #include <mongocxx/client.hpp>
38 #include <mongocxx/exception/exception.hpp>
39 #include <numeric>
40 #include <sstream>
41 #include <wordexp.h>
42 
43 using namespace fawkes;
44 using namespace std::chrono_literals;
45 
46 /** @class MongoDBInstanceConfig "mongodb_client_config.h"
47  * MongoDB Instances Configuration.
48  * Configure a single mongod instance that can be run as a sub-process.
49  *
50  * @author Tim Niemueller
51  */
52 
53 /** Constructor.
54  * This will read the given configuration.
55  * @param config configuration to query
56  * @param cfgname configuration name
57  * @param prefix configuration path prefix
58  */
60  std::string cfgname,
61  std::string prefix)
62 : Thread("MongoDBInstance", Thread::OPMODE_CONTINUOUS)
63 {
64  set_name("MongoDBInstance|%s", cfgname.c_str());
65  config_name_ = cfgname;
66 
67  running_ = false;
68 
69  enabled_ = false;
70  try {
71  enabled_ = config->get_bool(prefix + "enabled");
72  } catch (Exception &e) {
73  }
74 
75  if (enabled_) {
76  startup_grace_period_ = 10;
77  try {
78  startup_grace_period_ = config->get_uint(prefix + "startup-grace-period");
79  } catch (Exception &e) {
80  } // ignored, use default
81  loop_interval_ = 5.0;
82  try {
83  loop_interval_ = config->get_float(prefix + "loop-interval");
84  } catch (Exception &e) {
85  } // ignored, use default
86  termination_grace_period_ = config->get_uint(prefix + "termination-grace-period");
87  port_ = config->get_uint(prefix + "port");
88  bind_ip_ = config->get_string_or_default(std::string(prefix + "bind_ip").c_str(), "0.0.0.0");
89  use_tmp_directory_ = config->get_bool_or_default((prefix + "use-tmp-directory").c_str(), false);
90  // By default, clear data if we use a tmp directory, otherwise don't.
91  clear_data_on_termination_ =
92  config->get_bool_or_default((prefix + "clear-data-on-termination").c_str(),
93  use_tmp_directory_);
94  if (use_tmp_directory_) {
95  // Add the port to the filename, escape all literal "%" as "%%".
96  const std::string filename =
97  boost::str(boost::format("mongodb-%1%-%%%%%%%%-%%%%%%%%-%%%%%%%%") % port_);
98  const boost::filesystem::path path =
99  boost::filesystem::unique_path(boost::filesystem::temp_directory_path() / filename);
100  data_path_ = path.string();
101  log_path_ = (path / "mongodb.log").string();
102  } else {
103  data_path_ = config->get_string(prefix + "data-path");
104  log_path_ = config->get_string(prefix + "log/path");
105  }
106  log_append_ = config->get_bool(prefix + "log/append");
107  try {
108  replica_set_ = config->get_string(prefix + "replica-set");
109  ;
110  } catch (Exception &e) {
111  } // ignored, no replica set
112  if (!replica_set_.empty()) {
113  oplog_size_ = 0;
114  try {
115  oplog_size_ = config->get_uint(prefix + "oplog-size");
116  } catch (Exception &e) {
117  } // ignored, use default
118  }
119  }
120 
121  argv_ = {
122  "mongod", "--bind_ip", bind_ip_, "--port", std::to_string(port_), "--dbpath", data_path_};
123 
124  if (!log_path_.empty()) {
125  if (log_append_) {
126  argv_.push_back("--logappend");
127  }
128  argv_.push_back("--logpath");
129  argv_.push_back(log_path_);
130  }
131 
132  if (!replica_set_.empty()) {
133  argv_.push_back("--replSet");
134  argv_.push_back(replica_set_);
135  if (oplog_size_ > 0) {
136  argv_.push_back("--oplogSize");
137  argv_.push_back(std::to_string(oplog_size_));
138  }
139  }
140 
141  if (enabled_) {
142  std::string extra_args = config->get_string_or_default((prefix + "args").c_str(), "");
143  if (!extra_args.empty()) {
144  wordexp_t p;
145  int wrv = wordexp(extra_args.c_str(), &p, WRDE_NOCMD | WRDE_UNDEF);
146  switch (wrv) {
147  case 0: break; // all good
148  case WRDE_BADCHAR: throw Exception("%s: invalid character in args", name());
149  case WRDE_BADVAL: throw Exception("%s: undefined variable referenced in args", name());
150  case WRDE_CMDSUB:
151  throw Exception("%s: running sub-commands has been disabled for args", name());
152  case WRDE_NOSPACE: throw OutOfMemoryException("Cannot parse args");
153  case WRDE_SYNTAX: throw Exception("%s: shell syntax error in args", name());
154  default: throw Exception("Unexpected wordexp error %d when parsing args", wrv);
155  }
156 
157  // These arguments may not be passed, they are either configured through
158  // config values and could interfere, or they mess with our rs handling.
159  std::vector<std::string> invalid_args = {"--port",
160  "--dbpath",
161  "--fork",
162  "--logappend",
163  "--logpath",
164  "--replSet",
165  "--oplogSize",
166  "--master",
167  "--slave",
168  "--source",
169  "--only"};
170 
171  // pass and verify arguments to be added to command line
172  for (size_t i = 0; i < p.we_wordc; ++i) {
173  for (size_t j = 0; j < invalid_args.size(); ++j) {
174  if (invalid_args[j] == p.we_wordv[i]) {
175  wordfree(&p);
176  throw Exception("%s: %s may not be passed in args", name(), invalid_args[j].c_str());
177  }
178  }
179  argv_.push_back(p.we_wordv[i]);
180  }
181  wordfree(&p);
182  }
183  }
184 
185  command_line_ = std::accumulate(std::next(argv_.begin()),
186  argv_.end(),
187  argv_.front(),
188  [](std::string &s, const std::string &a) { return s + " " + a; });
189 }
190 
191 void
193 {
194  if (enabled_) {
195  logger->log_debug(name(), "enabled: true");
196  logger->log_debug(name(), "TCP port: %u", port_);
197  logger->log_debug(name(), "Termination grace period: %u", termination_grace_period_);
198  logger->log_debug(name(),
199  "clear data on termination: %s",
200  clear_data_on_termination_ ? "yes" : "no");
201  logger->log_debug(name(), "data path: %s", data_path_.c_str());
202  logger->log_debug(name(), "log path: %s", log_path_.c_str());
203  logger->log_debug(name(), "log append: %s", log_append_ ? "yes" : "no");
204  logger->log_debug(name(),
205  "replica set: %s",
206  replica_set_.empty() ? "DISABLED" : replica_set_.c_str());
207  if (!replica_set_.empty()) {
208  logger->log_debug(name(), "Op Log Size: %u MB", oplog_size_);
209  }
210 
211  start_mongod();
212  } else {
213  throw Exception("Instance '%s' cannot be started while disabled", name());
214  }
215 
216  timewait_ = new TimeWait(clock, (int)(loop_interval_ * 1000000.));
217 }
218 
219 void
221 {
222  timewait_->mark_start();
223  if (!running_ || !check_alive()) {
224  logger->log_error(name(), "MongoDB dead, restarting");
225  // on a crash, clean to make sure
226  try {
227  kill_mongod(true);
228  start_mongod();
229  } catch (Exception &e) {
230  logger->log_error(name(), "Failed to start MongoDB: %s", e.what_no_backtrace());
231  }
232  }
233  timewait_->wait_systime();
234 }
235 
236 void
238 {
239  kill_mongod(clear_data_on_termination_);
240  delete timewait_;
241 }
242 
243 /** Get command line used to execute program.
244  * @return command line to run mongod
245  */
246 std::string
248 {
249  return command_line_;
250 }
251 
252 /** Get termination grace period.
253  * @return termination grace period
254  */
255 unsigned int
257 {
258  return termination_grace_period_;
259 }
260 
261 bool
262 MongoDBInstanceConfig::check_alive()
263 {
264  try {
265  mongocxx::client client{mongocxx::uri("mongodb://localhost:" + std::to_string(port_))};
266 
267  using namespace bsoncxx::builder;
268  auto cmd{basic::document{}};
269  cmd.append(basic::kvp("isMaster", 1));
270 
271  auto reply = client["admin"].run_command(cmd.view());
272  bool ok = check_mongodb_ok(reply.view());
273  if (!ok) {
274  logger->log_warn(name(), "Failed to connect: %s", bsoncxx::to_json(reply.view()).c_str());
275  }
276  return ok;
277  } catch (mongocxx::exception &e) {
278  logger->log_warn(name(), "Fail: %s", e.what());
279  return false;
280  }
281 }
282 
283 /** Start mongod. */
284 void
286 {
287  if (running_)
288  return;
289 
290  if (check_alive()) {
291  logger->log_warn(name(), "MongoDB already running, not starting");
292  running_ = true;
293  return;
294  }
295 
296  try {
297  boost::filesystem::create_directories(data_path_);
298  } catch (boost::filesystem::filesystem_error &e) {
299  throw Exception("Failed to create data path '%s' for mongod(%s): %s",
300  data_path_.c_str(),
301  config_name_.c_str(),
302  e.what());
303  }
304 
305  if (!log_path_.empty()) {
306  boost::filesystem::path p(log_path_);
307  try {
308  boost::filesystem::create_directories(p.parent_path());
309  } catch (boost::filesystem::filesystem_error &e) {
310  throw Exception("Failed to create log path '%s' for mongod(%s): %s",
311  p.parent_path().string().c_str(),
312  config_name_.c_str(),
313  e.what());
314  }
315  }
316 
317  std::string progname = "mongod(" + config_name_ + ")";
318  proc_ =
319  std::make_shared<SubProcess>(progname, "mongod", argv_, std::vector<std::string>{}, logger);
320 
321  for (unsigned i = 0; i < startup_grace_period_ * 4; ++i) {
322  if (check_alive()) {
323  running_ = true;
324  return;
325  }
326  std::this_thread::sleep_for(250ms);
327  }
328  if (!running_) {
329  proc_.reset();
330  throw Exception("%s: instance did not start in time", name());
331  }
332 }
333 
334 /** Stop mongod.
335  * This send a SIGINT and then wait for the configured grace period
336  * before sending the TERM signal.
337  * @param clear_data true to clear data, false otherwise
338  */
339 void
341 {
342  if (proc_) {
343  proc_->kill(SIGINT);
344  for (unsigned i = 0; i < termination_grace_period_; ++i) {
345  if (!proc_->alive())
346  break;
347  std::this_thread::sleep_for(1s);
348  }
349  // This will send the term signal
350  proc_.reset();
351  running_ = false;
352  if (clear_data) {
353  try {
354  boost::filesystem::remove_all(data_path_);
355  } catch (boost::filesystem::filesystem_error &e) {
356  throw Exception("Failed to create data path '%s' for mongod(%s): %s",
357  data_path_.c_str(),
358  config_name_.c_str(),
359  e.what());
360  }
361  }
362  }
363 }
unsigned int termination_grace_period() const
Get termination grace period.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
void start_mongod()
Start mongod.
void kill_mongod(bool clear_data)
Stop mongod.
MongoDBInstanceConfig(fawkes::Configuration *config, std::string cfgname, std::string prefix)
Constructor.
virtual void init()
Initialize the thread.
std::string command_line() const
Get command line used to execute program.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Interface for configuration handling.
Definition: config.h:68
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
Definition: config.cpp:736
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual bool get_bool_or_default(const char *path, const bool &default_val)
Get value from configuration which is of type bool, or the given default if the path does not exist.
Definition: config.cpp:726
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual const char * what_no_backtrace() const noexcept
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
Time wait utility.
Definition: wait.h:33
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:96
Fawkes library namespace.