34 #include <BESInternalError.h>
35 #include <BESSyntaxUserError.h>
36 #include <BESForbiddenError.h>
37 #include <BESContextManager.h>
40 #define PUGIXML_NO_XPATH
41 #define PUGIXML_HEADER_ONLY
42 #include <pugixml.hpp>
45 #include "CurlUtils.h"
46 #include "CurlHandlePool.h"
47 #include "EffectiveUrlCache.h"
48 #include "DmrppRequestHandler.h"
49 #include "DmrppNames.h"
54 #define prolog std::string("Chunk::").append(__func__).append("() - ")
56 #define FLETCHER32_CHECKSUM 4
57 #define ACTUALLY_USE_FLETCHER32_CHECKSUM 1
73 size_t chunk_header_callback(
char *buffer,
size_t ,
size_t nitems,
void *data) {
79 string header(buffer, buffer + nitems - 2);
82 if (header.find(
"Content-Type") != string::npos) {
84 auto c_ptr =
reinterpret_cast<Chunk *
>(data);
85 c_ptr->set_response_content_type(header.substr(header.find_last_of(
' ') + 1));
96 void process_s3_error_response(
const shared_ptr<http::url> &data_url,
const string &xml_message)
99 string json_message = xml2json(xml_message.c_str());
101 d.Parse(json_message.c_str());
107 pugi::xml_document error;
108 pugi::xml_parse_result result = error.load_string(xml_message.c_str());
110 throw BESInternalError(
"The underlying data store returned an unintelligible error message.", __FILE__, __LINE__);
112 pugi::xml_node err_elmnt = error.document_element();
113 if (!err_elmnt || (strcmp(err_elmnt.name(),
"Error") != 0))
114 throw BESInternalError(
"The underlying data store returned a bogus error message.", __FILE__, __LINE__);
116 string code = err_elmnt.child_value(
"Code");
117 string message = err_elmnt.child_value(
"Message");
123 if (code ==
"AccessDenied") {
125 msg << prolog <<
"ACCESS DENIED - The underlying object store has refused access to: ";
126 msg << data_url->str() <<
" Object Store Message: " << message;
127 BESDEBUG(MODULE, msg.str() << endl);
128 VERBOSE(msg.str() << endl);
133 msg << prolog <<
"ERROR - The underlying object store returned an error. ";
134 msg <<
"(Tried: " << data_url->str() <<
") Object Store Message: " << message;
135 BESDEBUG(MODULE, msg.str() << endl);
136 VERBOSE(msg.str() << endl);
154 size_t chunk_write_data(
void *buffer,
size_t size,
size_t nmemb,
void *data) {
155 BESDEBUG(MODULE, prolog <<
"BEGIN " << endl);
156 size_t nbytes = size * nmemb;
157 auto chunk =
reinterpret_cast<Chunk *
>(data);
160 auto data_url = chunk->get_data_url();
161 BESDEBUG(MODULE, prolog <<
"chunk->get_data_url():" << data_url << endl);
164 BESDEBUG(MODULE, prolog <<
"chunk->get_response_content_type():" << chunk->get_response_content_type() << endl);
165 if (chunk->get_response_content_type().find(
"application/xml") != string::npos) {
168 string xml_message =
reinterpret_cast<const char *
>(buffer);
169 xml_message.erase(xml_message.find_last_not_of(
"\t\n\v\f\r 0") + 1);
174 process_s3_error_response(data_url, xml_message);
181 catch (std::exception &e) {
183 msg << prolog <<
"Caught std::exception when accessing object store data.";
184 msg <<
" (Tried: " << data_url->str() <<
")" <<
" Message: " << e.what();
185 BESDEBUG(MODULE, msg.str() << endl);
194 unsigned long long bytes_read = chunk->get_bytes_read();
197 if (bytes_read + nbytes > chunk->get_rbuf_size()) {
199 msg << prolog <<
"ERROR! The number of bytes_read: " << bytes_read <<
" plus the number of bytes to read: "
200 << nbytes <<
" is larger than the target buffer size: " << chunk->get_rbuf_size();
201 BESDEBUG(MODULE, msg.str() << endl);
202 DmrppRequestHandler::curl_handle_pool->release_all_handles();
206 memcpy(chunk->get_rbuf() + bytes_read, buffer, nbytes);
207 chunk->set_bytes_read(bytes_read + nbytes);
209 BESDEBUG(MODULE, prolog <<
"END" << endl);
224 void inflate(
char *dest,
unsigned long long dest_len,
char *src,
unsigned long long src_len) {
228 assert(dest_len > 0);
235 memset(&z_strm, 0,
sizeof(z_strm));
236 z_strm.next_in = (Bytef *) src;
237 z_strm.avail_in = src_len;
238 z_strm.next_out = (Bytef *) dest;
239 z_strm.avail_out = dest_len;
242 if (Z_OK != inflateInit(&z_strm))
243 throw BESError(
"Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
249 status = inflate(&z_strm, Z_SYNC_FLUSH);
252 if (Z_STREAM_END == status)
break;
255 if (Z_OK != status) {
256 stringstream err_msg;
257 err_msg <<
"Failed to inflate data chunk.";
258 char *err_msg_cstr = z_strm.msg;
260 err_msg <<
" zlib message: " << err_msg_cstr;
261 (void) inflateEnd(&z_strm);
262 throw BESError(err_msg.str(), BES_INTERNAL_ERROR, __FILE__, __LINE__);
269 if (0 == z_strm.avail_out) {
270 throw BESError(
"Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
277 if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
278 (void) inflateEnd(&z_strm);
279 HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0,
"memory allocation failed for inflate decompression")
284 z_strm.next_out = (
unsigned char*) outbuf + z_strm.total_out;
285 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
292 (void) inflateEnd(&z_strm);
320 void unshuffle(
char *dest,
const char *src,
unsigned long long src_size,
unsigned long long width) {
321 unsigned long long elems = src_size / width;
324 if (!(width > 1 && elems > 1)) {
325 memcpy(dest,
const_cast<char *
>(src), src_size);
329 char *_src =
const_cast<char *
>(src);
333 for (
unsigned int i = 0; i < width; i++) {
345 size_t duffs_index = (elems + 7) / 8;
348 assert(0 &&
"This Should never be executed!");
353 #define DUFF_GUTS *_dest = *_src++; _dest += width;
370 }
while (--duffs_index > 0);
378 size_t leftover = src_size % width;
383 _dest -= (width - 1);
384 memcpy((
void *) _dest, (
void *) _src, leftover);
394 static void split_by_comma(
const string &s, vector<unsigned long long> &res)
396 const string delimiter =
",";
397 const size_t delim_len = delimiter.length();
399 size_t pos_start = 0, pos_end;
401 while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
402 res.push_back (stoull(s.substr(pos_start, pos_end - pos_start)));
403 pos_start = pos_end + delim_len;
406 res.push_back (stoull(s.substr (pos_start)));
409 void Chunk::parse_chunk_position_in_array_string(
const string &pia, vector<unsigned long long> &cpia_vect)
411 if (pia.empty())
return;
413 if (!cpia_vect.empty()) cpia_vect.clear();
417 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
418 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
420 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
421 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
425 istringstream iss(pia.substr(1, pia.length() - 2));
431 cpia_vect.push_back(i);
436 split_by_comma(pia.substr(1, pia.length() - 2), cpia_vect);
438 catch(std::invalid_argument &e) {
439 throw BESInternalError(
string(
"while parsing a DMR++, chunk position string illegal character(s): ").append(e.what()), __FILE__, __LINE__);
458 void Chunk::set_position_in_array(
const string &pia) {
460 if (pia.empty())
return;
462 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
466 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
467 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
469 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
470 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
473 istringstream iss(pia.substr(1, pia.length() - 2));
479 d_chunk_position_in_array.push_back(i);
483 parse_chunk_position_in_array_string(pia,d_chunk_position_in_array);
494 void Chunk::set_position_in_array(
const std::vector<unsigned long long> &pia) {
495 if (pia.empty())
return;
497 if (!d_chunk_position_in_array.empty()) d_chunk_position_in_array.clear();
499 d_chunk_position_in_array = pia;
509 string Chunk::get_curl_range_arg_string() {
510 return curl::get_range_arg_string(d_offset, d_size);
526 void Chunk::add_tracking_query_param() {
529 if(d_data_url ==
nullptr)
533 string cloudydap_context_value = BESContextManager::TheManager()->
get_context(S3_TRACKING_CONTEXT, found);
551 bool add_tracking =
false;
556 string s3_vh_regex_str = R
"(^https?:\/\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\.s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/.*$)";
558 BESRegex s3_vh_regex(s3_vh_regex_str.c_str());
559 int match_result = s3_vh_regex.
match(d_data_url->str().c_str(), d_data_url->str().length());
560 if(match_result>=0) {
561 auto match_length = (
unsigned int) match_result;
562 if (match_length == d_data_url->str().length()) {
564 prolog <<
"FULL MATCH. pattern: " << s3_vh_regex_str <<
" url: " << d_data_url->str() << endl);
565 add_tracking =
true;;
571 string s3_path_regex_str = R
"(^https?:\/\/s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\/.*$)";
572 BESRegex s3_path_regex(s3_path_regex_str.c_str());
573 match_result = s3_path_regex.match(d_data_url->str().c_str(), d_data_url->str().length());
574 if(match_result>=0) {
575 auto match_length = (
unsigned int) match_result;
576 if (match_length == d_data_url->str().length()) {
578 prolog <<
"FULL MATCH. pattern: " << s3_vh_regex_str <<
" url: " << d_data_url->str() << endl);
579 add_tracking =
true;;
586 d_query_marker.append(S3_TRACKING_CONTEXT).append(
"=").append(cloudydap_context_value);
597 checksum_fletcher32(
const void *_data,
size_t _len)
599 const auto *data = (
const uint8_t *)_data;
600 size_t len = _len / 2;
601 uint32_t sum1 = 0, sum2 = 0;
610 size_t tlen = len > 360 ? 360 : len;
613 sum1 += (uint32_t)(((uint16_t)data[0]) << 8) | ((uint16_t)data[1]);
617 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
618 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
623 sum1 += (uint32_t)(((uint16_t)*data) << 8);
625 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
626 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
630 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
631 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
633 return ((sum2 << 16) | sum1);
648 void Chunk::inflate_chunk(
bool deflate,
bool shuffle,
bool fletcher32,
unsigned long long chunk_size,
649 unsigned long long elem_width) {
664 chunk_size *= elem_width;
667 char *dest =
new char[chunk_size];
669 inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
671 #if DMRPP_USE_SUPER_CHUNKS
672 set_read_buffer(dest, chunk_size, chunk_size,
true);
674 set_rbuf(dest, chunk_size);
685 char *dest =
new char[get_rbuf_size()];
687 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
688 #if DMRPP_USE_SUPER_CHUNKS
689 set_read_buffer(dest,get_rbuf_size(),get_rbuf_size(),
true);
691 set_rbuf(dest, get_rbuf_size());
702 #if ACTUALLY_USE_FLETCHER32_CHECKSUM
705 #pragma GCC diagnostic push
706 #pragma GCC diagnostic ignored "-Wcast-align"
707 assert(get_rbuf_size() - FLETCHER32_CHECKSUM >= 0);
708 assert((get_rbuf_size() - FLETCHER32_CHECKSUM) % 4 == 0);
709 auto f_checksum = *(uint32_t *)(get_rbuf() + get_rbuf_size() - FLETCHER32_CHECKSUM);
710 #pragma GCC diagnostic pop
715 if (f_checksum != checksum_fletcher32((
const void *)get_rbuf(), get_rbuf_size() - FLETCHER32_CHECKSUM)) {
716 throw BESInternalError(
"Data read from the DMR++ handler did not match the Fletcher32 checksum.",
720 if (d_read_buffer_size > FLETCHER32_CHECKSUM)
721 d_read_buffer_size -= FLETCHER32_CHECKSUM;
723 throw BESInternalError(
"Data filtered with fletcher32 don't include the four-byte checksum.",
728 d_is_inflated =
true;
732 unsigned long long chunk_buf_size = get_rbuf_size();
733 dods_float32 *vals = (dods_float32 *) get_rbuf();
735 (*os) << std::fixed << std::setfill(
'_') << std::setw(10) << std::setprecision(0);
736 (*os) <<
"DmrppArray::"<< __func__ <<
"() - Chunk[" << i <<
"]: " << endl;
737 for(
unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
738 (*os) << vals[k] <<
", " << ((k==0)|((k+1)%10)?
"":
"\n");
755 void Chunk::filter_chunk(
const string &filters,
unsigned long long chunk_size,
unsigned long long elem_width) {
760 chunk_size *= elem_width;
764 for (
auto i = filter_array.rbegin(), e = filter_array.rend(); i != e; ++i){
767 if (filter ==
"deflate"){
768 char *dest =
new char[chunk_size];
770 inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
772 #if DMRPP_USE_SUPER_CHUNKS
773 set_read_buffer(dest, chunk_size, chunk_size,
true);
775 set_rbuf(dest, chunk_size);
783 else if (filter ==
"shuffle"){
785 char *dest =
new char[get_rbuf_size()];
787 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
788 #if DMRPP_USE_SUPER_CHUNKS
789 set_read_buffer(dest,get_rbuf_size(),get_rbuf_size(),
true);
791 set_rbuf(dest, get_rbuf_size());
799 else if (filter ==
"fletcher32"){
801 #if ACTUALLY_USE_FLETCHER32_CHECKSUM
804 #pragma GCC diagnostic push
805 #pragma GCC diagnostic ignored "-Wcast-align"
806 assert(get_rbuf_size() > FLETCHER32_CHECKSUM);
808 auto f_checksum = *(uint32_t *)(get_rbuf() + get_rbuf_size() - FLETCHER32_CHECKSUM);
809 #pragma GCC diagnostic pop
814 uint32_t calc_checksum = checksum_fletcher32((
const void *)get_rbuf(), get_rbuf_size() - FLETCHER32_CHECKSUM);
815 if (f_checksum != calc_checksum) {
816 throw BESInternalError(
"Data read from the DMR++ handler did not match the Fletcher32 checksum.",
820 if (d_read_buffer_size > FLETCHER32_CHECKSUM)
821 d_read_buffer_size -= FLETCHER32_CHECKSUM;
823 throw BESInternalError(
"Data filtered with fletcher32 don't include the four-byte checksum.",
828 d_is_inflated =
true;
840 void Chunk::read_chunk() {
842 BESDEBUG(MODULE, prolog <<
"Already been read! Returning." << endl);
848 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(
this);
850 throw BESInternalError(prolog +
"No more libcurl handles.", __FILE__, __LINE__);
854 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
857 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
862 if (get_size() != get_bytes_read()) {
864 oss <<
"Wrong number of bytes read for chunk; read: " << get_bytes_read() <<
", expected: " << get_size();
880 void Chunk::dump(ostream &oss)
const {
882 oss <<
"[ptr='" << (
void *)
this <<
"']";
883 oss <<
"[data_url='" << d_data_url->str() <<
"']";
884 oss <<
"[offset=" << d_offset <<
"]";
885 oss <<
"[size=" << d_size <<
"]";
886 oss <<
"[chunk_position_in_array=(";
887 for (
unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
889 oss << d_chunk_position_in_array[i];
892 oss <<
"[is_read=" << d_is_read <<
"]";
893 oss <<
"[is_inflated=" << d_is_inflated <<
"]";
896 string Chunk::to_string()
const {
897 std::ostringstream oss;
903 std::shared_ptr<http::url> Chunk::get_data_url()
const {
905 std::shared_ptr<http::EffectiveUrl> effective_url = EffectiveUrlCache::TheCache()->get_effective_url(d_data_url);
906 BESDEBUG(MODULE, prolog <<
"Using data_url: " << effective_url->str() << endl);
910 if (!d_query_marker.empty()) {
911 string url_str = effective_url->str();
912 if(url_str.find(
"?") != string::npos){
918 url_str += d_query_marker;
919 shared_ptr<http::url> query_marker_url(
new http::url(url_str));
920 return query_marker_url;
923 return effective_url;
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
static std::ostream * GetStrm()
return the debug stream
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Abstract exception class for the BES with basic string message.
error thrown if the BES is not allowed to access the resource requested
exception thrown if internal error encountered
Regular expression matching.
int match(const char *s, int len, int pos=0) const
Does the pattern match.
error thrown if there is a user syntax error in the request or any other user error
static std::vector< std::string > split(const std::string &s, char delim='/', bool skip_empty=true)
Splits the string s into the return vector of tokens using the delimiter delim and skipping empty val...
Bundle a libcurl easy handle with other information.
void read_data()
This is the read_data() method for all transfers.
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
GenericDocument< UTF8<> > Document
GenericDocument with UTF8 encoding.