bes  Updated for version 3.20.10
GlobalMetadataStore.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of HYrax, A C++ implementation of the OPeNDAP Data
4 // Access Protocol.
5 
6 // Copyright (c) 2018 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 #include "config.h"
26 
27 #include <fcntl.h> // for posix_advise
28 #include <unistd.h>
29 
30 #include <cerrno>
31 #include <cstring>
32 
33 #include <iostream>
34 #include <string>
35 #include <fstream>
36 #include <sstream>
37 #include <functional>
38 #include <libdap/DAS.h>
39 #include <memory>
40 #include <sys/stat.h>
41 
42 #include <libdap/DapObj.h>
43 #include <libdap/DDS.h>
44 #include <libdap/DMR.h>
45 #include <libdap/D4ParserSax2.h>
46 #include <libdap/XMLWriter.h>
47 #include <libdap/BaseTypeFactory.h>
48 #include <libdap/D4BaseTypeFactory.h>
49 
50 #include "PicoSHA2/picosha2.h"
51 
52 #include "TempFile.h"
53 #include "TheBESKeys.h"
54 #include "BESUtil.h"
55 #include "BESLog.h"
56 #include "BESContextManager.h"
57 #include "BESDebug.h"
58 #include "BESRequestHandler.h"
59 #include "BESRequestHandlerList.h"
60 #include "BESNotFoundError.h"
61 
62 #include "BESInternalError.h"
63 #include "BESInternalFatalError.h"
64 
65 #include "GlobalMetadataStore.h"
66 
67 #define DEBUG_KEY "metadata_store"
68 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
69 
70 #ifdef HAVE_ATEXIT
71 #define AT_EXIT(x) atexit((x))
72 #else
73 #define AT_EXIT(x)
74 #endif
75 
85 #undef SYMETRIC_ADD_RESPONSES
86 
87 #define prolog std::string("GlobalMetadataStore::").append(__func__).append("() - ")
88 
89 using namespace std;
90 using namespace libdap;
91 using namespace bes;
92 
93 static const unsigned int default_cache_size = 20; // 20 GB
94 static const string default_cache_prefix = "mds";
95 static const string default_cache_dir = ""; // I'm making the default empty so that no key == no caching. jhrg 9.26.16
96 static const string default_ledger_name = "mds_ledger.txt";
97 
98 static const string PATH_KEY = "DAP.GlobalMetadataStore.path";
99 static const string PREFIX_KEY = "DAP.GlobalMetadataStore.prefix";
100 static const string SIZE_KEY = "DAP.GlobalMetadataStore.size";
101 static const string LEDGER_KEY = "DAP.GlobalMetadataStore.ledger";
102 static const string LOCAL_TIME_KEY = "BES.LogTimeLocal";
103 
104 GlobalMetadataStore *GlobalMetadataStore::d_instance = 0;
105 bool GlobalMetadataStore::d_enabled = true;
106 
121 void GlobalMetadataStore::transfer_bytes(int fd, ostream &os)
122 {
123  static const int BUFFER_SIZE = 16*1024;
124 
125 #if _POSIX_C_SOURCE >= 200112L
126  /* Advise the kernel of our access pattern. */
127  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
128  if (status != 0)
129  ERROR_LOG(prolog << "Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
130 #endif
131 
132  char buf[BUFFER_SIZE + 1];
133 
134  while(int bytes_read = read(fd, buf, BUFFER_SIZE))
135  {
136  if(bytes_read == -1)
137  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
138  if (!bytes_read)
139  break;
140 
141  os.write(buf, bytes_read);
142  }
143 }
144 
157 void GlobalMetadataStore::insert_xml_base(int fd, ostream &os, const string &xml_base)
158 {
159  static const int BUFFER_SIZE = 1024;
160 
161 #if _POSIX_C_SOURCE >= 200112L
162  /* Advise the kernel of our access pattern. */
163  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
164  if (status != 0)
165  ERROR_LOG(prolog << "Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
166 #endif
167 
168  char buf[BUFFER_SIZE + 1];
169  size_t bytes_read = read(fd, buf, BUFFER_SIZE);
170 
171  if(bytes_read == (size_t)-1)
172  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
173 
174  if (bytes_read == 0)
175  return;
176 
177  // Every valid DMR/++ response in the MDS starts with:
178  // <?xml version="1.0" encoding="ISO‌-8859-1"?>
179  //
180  // and has one of two kinds of <Dataset...> tags
181  // 1: <Dataset xmlns="..." xml:base="file:DMR_1.xml" ... >
182  // 2: <Dataset xmlns="..." ... >
183  //
184  // Assume it is well formed and always includes the prolog,
185  // but might not use <CR> <CRLF> chars
186 
187  // transfer the prolog (<?xml version="1.0" encoding="ISO‌-8859-1"?>)
188  size_t i = 0;
189  while (buf[i++] != '>')
190  ; // 'i' now points one char past the xml prolog
191  os.write(buf, i);
192 
193  // transfer <Dataset ...> with new value for xml:base
194  size_t s = i; // start of <Dataset ...>
195  size_t j = 0;
196  char xml_base_literal[] = "xml:base";
197  while (i < bytes_read) {
198  if (buf[i] == '>') { // Found end of Dataset; no xml:base was present
199  os.write(buf + s, i - s);
200  os << " xml:base=\"" << xml_base << "\"";
201  break;
202  }
203  else if (j == sizeof(xml_base_literal) - 1) { // found 'xml:base' literal
204  os.write(buf + s, i - s); // This will include all of <Dataset... including 'xml:base'
205  while (buf[i++] != '=')
206  ; // read/discard '="..."'
207  while (buf[i++] != '"')
208  ;
209  while (buf[i++] != '"')
210  ;
211  os << "=\"" << xml_base << "\""; // write the new xml:base value
212  break;
213  }
214  else if (buf[i] == xml_base_literal[j]) {
215  ++j;
216  }
217  else {
218  j = 0;
219  }
220 
221  ++i;
222  }
223 
224  // transfer the rest
225  os.write(buf + i, bytes_read - i);
226 
227  // Now, if the response is more than 1k, use faster code to finish the tx
228  transfer_bytes(fd, os);
229 }
230 
231 unsigned long GlobalMetadataStore::get_cache_size_from_config()
232 {
233  bool found;
234  string size;
235  unsigned long size_in_megabytes = default_cache_size;
236  TheBESKeys::TheKeys()->get_value(SIZE_KEY, size, found);
237  if (found) {
238  BESDEBUG(DEBUG_KEY,
239  "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY << "=" << size << endl);
240  istringstream iss(size);
241  iss >> size_in_megabytes;
242  }
243 
244  return size_in_megabytes;
245 }
246 
247 string GlobalMetadataStore::get_cache_prefix_from_config()
248 {
249  bool found;
250  string prefix = default_cache_prefix;
251  TheBESKeys::TheKeys()->get_value(PREFIX_KEY, prefix, found);
252  if (found) {
253  BESDEBUG(DEBUG_KEY,
254  "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY << "=" << prefix << endl);
255  prefix = BESUtil::lowercase(prefix);
256  }
257 
258  return prefix;
259 }
260 
261 // If the cache prefix is the empty string, the cache is turned off.
262 string GlobalMetadataStore::get_cache_dir_from_config()
263 {
264  bool found;
265 
266  string cacheDir = default_cache_dir;
267  TheBESKeys::TheKeys()->get_value(PATH_KEY, cacheDir, found);
268  if (found) {
269  BESDEBUG(DEBUG_KEY,
270  "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<< "=" << cacheDir << endl);
271  }
272 
273  return cacheDir;
274 }
275 
290 
308 GlobalMetadataStore::get_instance(const string &cache_dir, const string &prefix, unsigned long long size)
309 {
310  if (d_enabled && d_instance == 0) {
311  d_instance = new GlobalMetadataStore(cache_dir, prefix, size); // never returns null_ptr
312  d_enabled = d_instance->cache_enabled();
313  if (!d_enabled) {
314  delete d_instance;
315  d_instance = 0;
316 
317  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
318  }
319  else {
320  AT_EXIT(delete_instance);
321 
322  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
323  }
324  }
325 
326  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
327 
328  return d_instance;
329 }
330 
338 GlobalMetadataStore::get_instance()
339 {
340  if (d_enabled && d_instance == 0) {
341  d_instance = new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
342  get_cache_size_from_config());
343  d_enabled = d_instance->cache_enabled();
344  if (!d_enabled) {
345  delete d_instance;
346  d_instance = NULL;
347  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
348  }
349  else {
350  AT_EXIT(delete_instance);
351 
352  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
353  }
354  }
355 
356  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance() - d_instance: " << (void *) d_instance << endl);
357 
358  return d_instance;
359 }
361 
365 void
366 GlobalMetadataStore::initialize()
367 {
368  bool found;
369 
370  TheBESKeys::TheKeys()->get_value(LEDGER_KEY, d_ledger_name, found);
371  if (found) {
372  BESDEBUG(DEBUG_KEY, "Located BES key " << LEDGER_KEY << "=" << d_ledger_name << endl);
373  }
374  else {
375  d_ledger_name = default_ledger_name;
376  }
377 
378  ofstream of(d_ledger_name.c_str(), ios::app);
379 
380  // By default, use UTC in the logs.
381  string local_time = "no";
382  TheBESKeys::TheKeys()->get_value(LOCAL_TIME_KEY, local_time, found);
383  d_use_local_time = (local_time == "YES" || local_time == "Yes" || local_time == "yes");
384 }
385 
401 GlobalMetadataStore::GlobalMetadataStore()
402  : BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
403 {
404  initialize();
405 }
406 
407 GlobalMetadataStore::GlobalMetadataStore(const string &cache_dir, const string &prefix,
408  unsigned long long size) : BESFileLockingCache(cache_dir, prefix, size)
409 {
410  initialize();
411 }
413 
418 static void dump_time(ostream &os, bool use_local_time)
419 {
420  time_t now;
421  time(&now);
422  char buf[sizeof "YYYY-MM-DDTHH:MM:SSzone"];
423  int status = 0;
424 
425  // From StackOverflow:
426  // This will work too, if your compiler doesn't support %F or %T:
427  // strftime(buf, sizeof buf, "%Y-%m-%dT%H:%M:%S%Z", gmtime(&now));
428  //
429  // Apologies for the twisted logic - UTC is the default. Override to
430  // local time using BES.LogTimeLocal=yes in bes.conf. jhrg 11/15/17
431  struct tm result;
432  if (!use_local_time) {
433  gmtime_r(&now, &result);
434  status = strftime(buf, sizeof buf, "%FT%T%Z", &result);
435  }
436  else {
437  localtime_r(&now, &result);
438  status = strftime(buf, sizeof buf, "%FT%T%Z", &result);
439  }
440  if (!status)
441  ERROR_LOG(prolog << "Error getting time for Metadata Store ledger.");
442 
443  os << buf;
444 }
445 
449 void
451 {
452  // open just once, <- done SBL 11.7.19
453 
454  int fd; // value-result parameter;
455  if (get_exclusive_lock(d_ledger_name, fd)) {
456  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Ledger " << d_ledger_name << " write locked." << endl);
457  if (of) {
458  try {
459  dump_time(of, d_use_local_time);
460  of << " " << d_ledger_entry << endl;
461  VERBOSE("MDS Ledger name: '" << d_ledger_name << "', entry: '" << d_ledger_entry + "'.");
462  unlock_and_close(d_ledger_name); // closes fd
463  }
464  catch (...) {
465  unlock_and_close(d_ledger_name);
466  throw;
467  }
468  }
469  else {
470  ERROR_LOG(prolog << "Warning: Metadata store could not write to its ledger file.");
471  unlock_and_close(d_ledger_name);
472  }
473  }
474  else {
475  throw BESInternalError("Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
476  }
477 }
478 
485 string
486 GlobalMetadataStore::get_hash(const string &name)
487 {
488  if (name.empty())
489  throw BESInternalError("Empty name passed to the Metadata Store.", __FILE__, __LINE__);
490 
491  return picosha2::hash256_hex_string(name[0] == '/' ? name : "/" + name);
492 }
493 
511 {
512  if (d_dds) {
513  D4BaseTypeFactory factory;
514  DMR dmr(&factory, *d_dds);
515  XMLWriter xml;
516  dmr.print_dap4(xml);
517  os << xml.get_doc();
518  }
519  else if (d_dmr) {
520  XMLWriter xml;
521  d_dmr->print_dap4(xml);
522  os << xml.get_doc();
523  }
524  else {
525  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
526  }
527 }
528 
531  if (d_dds)
532  d_dds->print(os);
533  else if (d_dmr)
534  d_dmr->getDDS()->print(os);
535  else
536  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
537 }
538 
541  if (d_dds)
542  d_dds->print_das(os);
543  else if (d_dmr)
544  d_dmr->getDDS()->print_das(os);
545  else
546  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
547 }
548 
563 bool
564 GlobalMetadataStore::store_dap_response(StreamDAP &writer, const string &key, const string &name,
565  const string &response_name)
566 {
567  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " BEGIN " << key << endl);
568 
569  string item_name = get_cache_file_name(key, false /*mangle*/);
570 
571  int fd;
572  if (create_and_lock(item_name, fd)) {
573  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Storing " << item_name << endl);
574 
575  // Get an output stream directed at the locked cache file
576  ofstream response(item_name.c_str(), ios::out|ios::app);
577  if (!response.is_open())
578  throw BESInternalError("Could not open '" + key + "' to write the response.", __FILE__, __LINE__);
579 
580  try {
581  // for the different writers, look at the StreamDAP struct in the class
582  // definition. jhrg 2.27.18
583  writer(response); // different writers can write the DDS, DAS or DMR
584 
585  // Compute/update/maintain the cache size? This is extra work
586  // that might never be used. It also locks the cache...
587  if (!is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
588  // This enables the call to update_cache_info() below.
590 
591  unsigned long long size = update_cache_info(item_name);
592  if (!is_unlimited() && cache_too_big(size)) update_and_purge(item_name);
593  }
594 
595  unlock_and_close(item_name);
596  }
597  catch (...) {
598  // Bummer. There was a problem doing The Stuff. Now we gotta clean up.
599  response.close();
600  this->purge_file(item_name);
601  unlock_and_close(item_name);
602  throw;
603  }
604 
605  VERBOSE("Metadata store: Wrote " << response_name << " response for '" << name << "'." << endl);
606  d_ledger_entry.append(" ").append(key);
607 
608  return true;
609  }
610  else if (get_read_lock(item_name, fd)) {
611  // We found the key; didn't add this because it was already here
612  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Found " << item_name << " in the store already." << endl);
613  unlock_and_close(item_name);
614 
615  ERROR_LOG(prolog << "Metadata store: unable to store the " << response_name << " response for '" << name << "'." << endl);
616 
617  return false;
618  }
619  else {
620  throw BESInternalError("Could neither create or open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
621  }
622 }
623 
638 
650 bool
651 GlobalMetadataStore::add_responses(DDS *dds, const string &name)
652 {
653  // Start the index entry
654  d_ledger_entry = string("add DDS ").append(name);
655 
656  // I'm appending the 'dds r' string to the name before hashing so that
657  // the different hashes for the file's DDS, DAS, ..., are all very different.
658  // This will be useful if we use S3 instead of EFS for the Metadata Store.
659  //
660  // The helper() also updates the ledger string.
661  StreamDDS write_the_dds_response(dds);
662  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
663 
664  StreamDAS write_the_das_response(dds);
665  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
666 
667 #if SYMETRIC_ADD_RESPONSES
668  StreamDMR write_the_dmr_response(dds);
669  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
670 #endif
671 
672  write_ledger(); // write the index line
673 
674 #if SYMETRIC_ADD_RESPONSES
675  return (stored_dds && stored_das && stored_dmr);
676 #else
677  return (stored_dds && stored_das);
678 #endif
679 }
680 
692 bool
693 GlobalMetadataStore::add_responses(DMR *dmr, const string &name)
694 {
695  // Start the index entry
696  d_ledger_entry = string("add DMR ").append(name);
697 
698  // I'm appending the 'dds r' string to the name before hashing so that
699  // the different hashes for the file's DDS, DAS, ..., are all very different.
700  // This will be useful if we use S3 instead of EFS for the Metadata Store.
701  //
702  // The helper() also updates the ledger string.
703 #if SYMETRIC_ADD_RESPONSES
704  StreamDDS write_the_dds_response(dmr);
705  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
706 
707  StreamDAS write_the_das_response(dmr);
708  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
709 #endif
710 
711  StreamDMR write_the_dmr_response(dmr);
712  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
713 
714  write_ledger(); // write the index line
715 
716 #if SYMETRIC_ADD_RESPONSES
717  return (stored_dds && stored_das && stored_dmr);
718 #else
719  return(stored_dmr /* && stored_dmrpp */);
720 #endif
721 }
723 
737 GlobalMetadataStore::get_read_lock_helper(const string &name, const string &suffix, const string &object_name)
738 {
739  BESDEBUG(DEBUG_KEY, __func__ << "() MDS hashing name '" << name << "', '" << suffix << "'"<< endl);
740 
741  if (name.empty())
742  throw BESInternalError("An empty name string was received by "
743  "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
744 
745  string item_name = get_cache_file_name(get_hash(name + suffix), false);
746  int fd;
747  MDSReadLock lock(item_name, get_read_lock(item_name, fd), this);
748  BESDEBUG(DEBUG_KEY, __func__ << "() MDS lock for " << item_name << ": " << lock() << endl);
749 
750  if (lock())
751  INFO_LOG(prolog << "MDS Cache hit for '" << name << "' and response " << object_name << endl);
752  else
753  INFO_LOG(prolog << "MDS Cache miss for '" << name << "' and response " << object_name << endl);
754 
755  return lock;
756  }
757 
778 {
779  return get_read_lock_helper(name,"dmr_r","DMR");
780 }//end is_dmr_available(string)
781 
784 {
785  //call get_read_lock_helper
786  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(), "dmr_r", "DMR");
787  if (lock()){
788 
789  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmr_r");
790 
791  if(reload){
792  lock.clearLock();
793  return lock;
794  }//end if
795  else{
796  return lock;
797  }//end else
798 
799  }//end if(is locked)
800  else{
801  return lock;
802  }//end else
803 
804 }//end is_dmr_available(BESContainer)
805 
807 GlobalMetadataStore::is_dmr_available(const std::string &realName, const std::string &relativeName, const std::string &fileType)
808 {
809  //call get_read_lock_helper
810  MDSReadLock lock = get_read_lock_helper(relativeName,"dmr_r","DMR");
811  if (lock()){
812 
813  bool reload = is_available_helper(realName, relativeName, fileType, "dmr_r");
814 
815  if(reload){
816  lock.clearLock();
817  return lock;
818  }//end if
819  else{
820  return lock;
821  }//end else
822 
823  }//end if(is locked)
824  else{
825  return lock;
826  }//end else
827 
828 }//end is_dmr_available(string, string, string)
829 
839 {
840  return get_read_lock_helper(name,"dds_r","DDS");
841 }//end is_dds_available(string)
842 
855 {
856  //call get_read_lock_helper
857  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dds_r","DDS");
858  if (lock()){
859 
860  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dds_r");
861 
862  if(reload){
863  lock.clearLock();
864  return lock;
865  }//end if
866  else{
867  return lock;
868  }//end else
869 
870  }//end if(is locked)
871  else{
872  return lock;
873  }//end else
874 
875 }//end is_dds_available(BESContainer)
876 
886 {
887  return get_read_lock_helper(name,"das_r","DAS");
888 }//end is_das_available(string)
889 
892 {
893  //return get_read_lock_helper(name, "das_r", "DAS");
894  //call get_read_lock_helper
895  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"das_r","DAS");
896  if (lock()){
897 
898  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "das_r");
899 
900  if(reload){
901  lock.clearLock();
902  return lock;
903  }//end if
904  else{
905  return lock;
906  }//end else
907 
908  }//end if(is locked)
909  else{
910  return lock;
911  }//end else
912 
913 }//end is_das_available(BESContainer)
914 
935 {
936  return get_read_lock_helper(name,"dmrpp_r","DMR++");
937 }//end is_dmrpp_available(string)
938 
941 {
942  //return get_read_lock_helper(name, "dmrpp_r", "DMR++");
943  //call get_read_lock_helper
944  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dmrpp_r","DMR++");
945  if (lock()){
946 
947  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmrpp_r");
948 
949  if(reload){
950  lock.clearLock();
951  return lock;
952  }//end if
953  else{
954  return lock;
955  }//end else
956 
957  }//end if(is locked)
958  else{
959  return lock;
960  }//end else
961 
962 }//end is_dmrpp_available(BESContainer)
963 
974 bool
975 GlobalMetadataStore::is_available_helper(const string &realName, const string &relativeName, const string &fileType, const string &suffix)
976 {
977  //use type with find_handler() to get handler
978  BESRequestHandler *besRH = BESRequestHandlerList::TheList()->find_handler(fileType);
979 
980  //use handler.get_lmt()
981  time_t file_time = besRH->get_lmt(realName);
982 
983  //get the cache time of the handler
984  time_t cache_time = get_cache_lmt(relativeName, suffix);
985 
986  //compare file lmt and time of creation of cache
987  if (file_time > cache_time){
988  return true;
989  }//end if(file > cache)
990  else {
991  return false;
992  }//end else
993 }
994 
1002 time_t
1003 GlobalMetadataStore::get_cache_lmt(const string &fileName, const string &suffix)
1004 {
1005  string item_name = get_cache_file_name(get_hash(fileName + suffix), false);
1006  struct stat statbuf;
1007 
1008  if (stat(item_name.c_str(), &statbuf) == -1){
1009  throw BESNotFoundError(strerror(errno), __FILE__, __LINE__);
1010  }//end if(error)
1011 
1012  return statbuf.st_mtime;
1013 }//end get_cache_lmt()
1014 
1017 
1025 void
1026 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &object_name)
1027 {
1028  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1029  int fd; // value-result parameter;
1030  if (get_read_lock(item_name, fd)) {
1031  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1032  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1033  try {
1034  transfer_bytes(fd, os);
1035  unlock_and_close(item_name); // closes fd
1036  }
1037  catch (...) {
1038  unlock_and_close(item_name);
1039  throw;
1040  }
1041  }
1042  else {
1043  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1044  }
1045 }
1046 
1055 void
1056 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &xml_base,
1057  const string &object_name)
1058 {
1059  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1060  int fd; // value-result parameter;
1061  if (get_read_lock(item_name, fd)) {
1062  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1063  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1064  try {
1065  insert_xml_base(fd, os, xml_base);
1066 
1067  transfer_bytes(fd, os);
1068  unlock_and_close(item_name); // closes fd
1069  }
1070  catch (...) {
1071  unlock_and_close(item_name);
1072  throw;
1073  }
1074  }
1075  else {
1076  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1077  }
1078 }
1080 
1087 void
1088 GlobalMetadataStore::write_dds_response(const std::string &name, ostream &os)
1089 {
1090  write_response_helper(name, os, "dds_r", "DDS");
1091 }
1092 
1099 void
1100 GlobalMetadataStore::write_das_response(const std::string &name, ostream &os)
1101 {
1102  write_response_helper(name, os, "das_r", "DAS");
1103 }
1104 
1111 void
1112 GlobalMetadataStore::write_dmr_response(const std::string &name, ostream &os)
1113 {
1114  bool found = false;
1115  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1116  if (!found) {
1117 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1118  write_response_helper(name, os, "dmr_r", "DMR");
1119 #else
1120  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1121 #endif
1122  }
1123  else {
1124  write_response_helper(name, os, "dmr_r", xml_base, "DMR");
1125  }
1126 }
1127 
1134 void
1135 GlobalMetadataStore::write_dmrpp_response(const std::string &name, ostream &os)
1136 {
1137  bool found = false;
1138  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1139  if (!found) {
1140 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1141  write_response_helper(name, os, "dmrpp_r", "DMR++");
1142 #else
1143  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1144 #endif
1145  }
1146  else {
1147  write_response_helper(name, os, "dmrpp_r", xml_base, "DMR++");
1148  }
1149 }
1150 
1158 bool
1159 GlobalMetadataStore::remove_response_helper(const string& name, const string &suffix, const string &object_name)
1160 {
1161  string hash = get_hash(name + suffix);
1162  if (unlink(get_cache_file_name(hash, false).c_str()) == 0) {
1163  VERBOSE("Metadata store: Removed " << object_name << " response for '" << hash << "'." << endl);
1164  d_ledger_entry.append(" ").append(hash);
1165  return true;
1166  }
1167  else {
1168  ERROR_LOG(prolog << "Metadata store: unable to remove the " << object_name << " response for '" << name << "' (" << strerror(errno) << ")."<< endl);
1169  }
1170 
1171  return false;
1172 }
1173 
1180 bool
1182 {
1183  // Start the index entry
1184  d_ledger_entry = string("remove ").append(name);
1185 
1186  bool removed_dds = remove_response_helper(name, "dds_r", "DDS");
1187 
1188  bool removed_das = remove_response_helper(name, "das_r", "DAS");
1189 
1190  bool removed_dmr = remove_response_helper(name, "dmr_r", "DMR");
1191 
1192  bool removed_dmrpp = remove_response_helper(name, "dmrpp_r", "DMR++");
1193 
1194  write_ledger(); // write the index line
1195 
1196 #if SYMETRIC_ADD_RESPONSES
1197  return (removed_dds && removed_das && removed_dmr);
1198 #else
1199  return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1200 #endif
1201 }
1202 
1215 DMR *
1217 {
1218  stringstream oss;
1219  write_dmr_response(name, oss); // throws BESInternalError if not found
1220 
1221  D4BaseTypeFactory d4_btf;
1222  unique_ptr<DMR> dmr(new DMR(&d4_btf, "mds"));
1223 
1224  D4ParserSax2 parser;
1225  parser.intern(oss.str(), dmr.get());
1226 
1227  dmr->set_factory(0);
1228 
1229  return dmr.release();
1230 }
1231 
1253 DDS *
1255 {
1256  TempFile dds_tmp(get_cache_directory() + "/opendapXXXXXX");
1257 
1258  fstream dds_fs(dds_tmp.get_name().c_str(), std::fstream::out);
1259  try {
1260  write_dds_response(name, dds_fs); // throws BESInternalError if not found
1261  dds_fs.close();
1262  }
1263  catch (...) {
1264  dds_fs.close();
1265  throw;
1266  }
1267 
1268  BaseTypeFactory btf;
1269  unique_ptr<DDS> dds(new DDS(&btf));
1270  dds->parse(dds_tmp.get_name());
1271 
1272  TempFile das_tmp(get_cache_directory() + "/opendapXXXXXX");
1273  fstream das_fs(das_tmp.get_name().c_str(), std::fstream::out);
1274  try {
1275  write_das_response(name, das_fs); // throws BESInternalError if not found
1276  das_fs.close();
1277  }
1278  catch (...) {
1279  das_fs.close();
1280  throw;
1281  }
1282 
1283  unique_ptr<DAS> das(new DAS());
1284  das->parse(das_tmp.get_name());
1285 
1286  dds->transfer_attributes(das.get());
1287  dds->set_factory(nullptr);
1288 
1289  return dds.release();
1290 }
1291 
1292 
1293 void
1294 GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das, const std::string &name) {
1295  string suffix = "das_r";
1296  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1297  int fd; // value-result parameter;
1298  if (get_read_lock(item_name, fd)) {
1299  VERBOSE("Metadata store: Cache hit: read " << " response for '" << name << "'." << endl);
1300  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1301  try {
1302  // Just generate the DAS by parsing from the file
1303  das->parse(item_name);
1304  unlock_and_close(item_name); // closes fd
1305  }
1306  catch (...) {
1307  unlock_and_close(item_name);
1308  throw;
1309  }
1310  }
1311  else {
1312  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1313  }
1314 
1315 }
1316 
A container is something that holds data. E.G., a netcdf file or a database entry.
Definition: BESContainer.h:65
std::string get_relative_name() const
Get the relative name of the object in this container.
Definition: BESContainer.h:186
std::string get_container_type() const
retrieve the type of data this container holds, such as cedar or netcdf.
Definition: BESContainer.h:232
std::string get_real_name() const
retrieve the real name for this container, such as a file name.
Definition: BESContainer.h:180
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Implementation of a caching mechanism for compressed data.
virtual void unlock_and_close(const std::string &target)
const std::string get_cache_directory()
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual bool get_exclusive_lock(const std::string &target, int &fd)
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
error thrown if the resource requested cannot be found
virtual BESRequestHandler * find_handler(const std::string &handler_name)
find and return the specified request handler
Represents a specific data type request handler.
virtual time_t get_lmt(const std::string &name)
Get the Last modified time for.
static std::string lowercase(const std::string &s)
Definition: BESUtil.cc:206
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
Definition: TheBESKeys.cc:340
static TheBESKeys * TheKeys()
Definition: TheBESKeys.cc:71
Store the DAP metadata responses.
virtual libdap::DDS * get_dds_object(const std::string &name)
Build a DDS object from the cached Response.
virtual void write_dmr_response(const std::string &name, std::ostream &os)
Write the stored DMR response to a stream.
void write_response_helper(const std::string &name, std::ostream &os, const std::string &suffix, const std::string &object_name)
std::string get_hash(const std::string &name)
static void transfer_bytes(int fd, std::ostream &os)
virtual bool remove_responses(const std::string &name)
Remove all cached responses and objects for a granule.
virtual void write_dds_response(const std::string &name, std::ostream &os)
Write the stored DDS response to a stream.
void initialize()
Configure the ledger using LEDGER_KEY and LOCAL_TIME_KEY.
bool store_dap_response(StreamDAP &writer, const std::string &key, const std::string &name, const std::string &response_name)
static void insert_xml_base(int fd, std::ostream &os, const std::string &xml_base)
like transfer_bytes(), but adds the xml:base attribute to the DMR/++
virtual void write_das_response(const std::string &name, std::ostream &os)
Write the stored DAS response to a stream.
bool remove_response_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual MDSReadLock is_dmrpp_available(const std::string &name)
Is the DMR++ response for.
virtual MDSReadLock is_dds_available(const std::string &name)
Is the DDS response for.
virtual libdap::DMR * get_dmr_object(const std::string &name)
Build a DMR object from the cached Response.
MDSReadLock get_read_lock_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual time_t get_cache_lmt(const std::string &fileName, const std::string &suffix)
Get the last modified time for the cached object file.
virtual MDSReadLock is_dmr_available(const std::string &name)
Is the DMR response for.
virtual MDSReadLock is_das_available(const std::string &name)
Is the DAS response for.
virtual bool is_available_helper(const std::string &realName, const std::string &relativeName, const std::string &fileType, const std::string &suffix)
helper function that checks if last modified time is greater than cached file
virtual bool add_responses(libdap::DDS *dds, const std::string &name)
Add the DAP2 metadata responses using a DDS.
virtual void write_dmrpp_response(const std::string &name, std::ostream &os)
Write the stored DMR++ response to a stream.
Get a new temporary file.
Definition: TempFile.h:46
std::string get_name() const
Definition: TempFile.h:70
Unlock and close the MDS item when the ReadLock goes out of scope.
Instantiate with a DDS or DMR and use to write the DAS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DDS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DMR response.
virtual void operator()(std::ostream &os)
Use an object (DDS or DMR) to write data to the MDS.