45 #include <libdap/D4Enum.h>
46 #include <libdap/D4Attributes.h>
47 #include <libdap/D4Maps.h>
48 #include <libdap/D4Group.h>
50 #include "BESInternalError.h"
53 #include "BESStopWatch.h"
55 #include "byteswap_compat.h"
56 #include "CurlHandlePool.h"
58 #include "DmrppArray.h"
59 #include "DmrppRequestHandler.h"
60 #include "DmrppNames.h"
64 #define dmrpp_3 "dmrpp:3"
65 #define dmrpp_4 "dmrpp:4"
70 #define MB (1024*1024)
71 #define prolog std::string("DmrppArray::").append(__func__).append("() - ")
77 std::mutex transfer_thread_pool_mtx;
78 atomic_uint transfer_thread_counter(0);
96 bool get_next_future(list<std::future<bool>> &futures, atomic_uint &thread_counter,
unsigned long timeout,
string debug_prefix) {
97 bool future_finished =
false;
99 std::chrono::milliseconds timeout_ms (timeout);
102 auto futr = futures.begin();
103 auto fend = futures.end();
104 bool future_is_valid =
true;
105 while(!future_finished && future_is_valid && futr != fend){
106 future_is_valid = (*futr).valid();
114 if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
116 bool success = (*futr).get();
117 future_finished =
true;
118 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"Called future::get() on a ready future."
119 <<
" success: " << (success?
"true":
"false") << endl);
122 msg << debug_prefix << prolog <<
"The std::future has failed!";
123 msg <<
" thread_counter: " << thread_counter;
139 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"future::wait_for() timed out. (timeout: " <<
140 timeout <<
" ms) There are currently " << futures.size() <<
" futures in process."
141 <<
" thread_counter: " << thread_counter << endl);
145 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"The future was not valid. Dumping... " << endl);
146 future_finished =
true;
150 if (futr!=fend && future_finished) {
153 BESDEBUG(dmrpp_3, debug_prefix << prolog <<
"Erased future from futures list. (Erased future was "
154 << (future_is_valid?
"":
"not ") <<
"valid at start.) There are currently " <<
155 futures.size() <<
" futures in process. thread_counter: " << thread_counter << endl);
158 done = future_finished || futures.empty();
161 return future_finished;
176 bool one_child_chunk_thread_new(unique_ptr<one_child_chunk_args_new> args)
179 args->child_chunk->read_chunk();
181 assert(args->the_one_chunk->get_rbuf());
182 assert(args->child_chunk->get_rbuf());
183 assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
196 unsigned long long offset_within_the_one_chunk = args->child_chunk->get_offset() - args->the_one_chunk->get_offset();
198 memcpy(args->the_one_chunk->get_rbuf() + offset_within_the_one_chunk, args->child_chunk->get_rbuf(),
199 args->child_chunk->get_bytes_read());
211 bool one_super_chunk_transfer_thread(unique_ptr<one_super_chunk_args> args)
214 #if DMRPP_ENABLE_THREAD_TIMERS
215 stringstream timer_tag;
216 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
217 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" sc_id: " << args->super_chunk->id();
219 sw.start(timer_tag.str());
222 args->super_chunk->read();
231 bool one_super_chunk_unconstrained_transfer_thread(unique_ptr<one_super_chunk_args> args)
234 #if DMRPP_ENABLE_THREAD_TIMERS
235 stringstream timer_tag;
236 timer_tag << prolog <<
"tid: 0x" << std::hex << std::this_thread::get_id() <<
237 " parent_tid: 0x" << std::hex << args->parent_thread_id <<
" sc_id: " << args->super_chunk->id();
239 sw.start(timer_tag.str());
242 args->super_chunk->read_unconstrained();
247 bool start_one_child_chunk_thread(list<std::future<bool>> &futures, unique_ptr<one_child_chunk_args_new> args) {
249 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
250 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
251 transfer_thread_counter++;
252 futures.push_back( std::async(std::launch::async, one_child_chunk_thread_new, std::move(args)));
254 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<
255 "' from std::async for " << args->child_chunk->to_string() << endl);
268 bool start_super_chunk_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
270 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
271 if (transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
272 transfer_thread_counter++;
273 futures.push_back(std::async(std::launch::async, one_super_chunk_transfer_thread, std::move(args)));
275 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<
276 "' from std::async for " << args->super_chunk->to_string(
false) << endl);
288 bool start_super_chunk_unconstrained_transfer_thread(list<std::future<bool>> &futures, unique_ptr<one_super_chunk_args> args) {
290 std::unique_lock<std::mutex> lck (transfer_thread_pool_mtx);
291 if(transfer_thread_counter < DmrppRequestHandler::d_max_transfer_threads) {
292 transfer_thread_counter++;
293 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_transfer_thread, std::move(args)));
295 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '" << futures.size() <<
296 "' from std::async, transfer_thread_counter: " << transfer_thread_counter << endl);
322 void read_super_chunks_unconstrained_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
332 list<future<bool>> futures;
335 bool future_finished =
true;
339 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
343 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
345 if (!super_chunks.empty()){
347 bool thread_started =
true;
348 while(thread_started && !super_chunks.empty()) {
349 auto super_chunk = super_chunks.front();
350 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
352 auto args = unique_ptr<one_super_chunk_args>(
new one_super_chunk_args(super_chunk, array));
353 thread_started = start_super_chunk_unconstrained_transfer_thread(futures, std::move(args));
355 if (thread_started) {
357 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << super_chunk->to_string(
false) << endl);
360 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
361 " transfer_thread_counter: " << transfer_thread_counter <<
362 " futures.size(): " << futures.size() << endl);
371 future_finished =
false;
376 while(!futures.empty()){
377 if(futures.back().valid())
378 futures.back().get();
409 void read_super_chunks_concurrent(queue<shared_ptr<SuperChunk>> &super_chunks, DmrppArray *array)
419 list<future<bool>> futures;
422 bool future_finished =
true;
426 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
430 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
432 if (!super_chunks.empty()){
434 bool thread_started =
true;
435 while(thread_started && !super_chunks.empty()) {
436 auto super_chunk = super_chunks.front();
437 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
439 auto args = unique_ptr<one_super_chunk_args>(
new one_super_chunk_args(super_chunk, array));
440 thread_started = start_super_chunk_transfer_thread(futures, std::move(args));
442 if (thread_started) {
444 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << super_chunk->to_string(
false) << endl);
447 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
448 " transfer_thread_counter: " << transfer_thread_counter <<
449 " futures.size(): " << futures.size() << endl);
458 future_finished =
false;
463 while(!futures.empty()){
464 if(futures.back().valid())
465 futures.back().get();
491 static unsigned long long
492 get_index(
const vector<unsigned long long> &address_in_target,
const vector<unsigned long long> &target_shape)
494 assert(address_in_target.size() == target_shape.size());
496 auto shape_index = target_shape.rbegin();
497 auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
499 unsigned long long multiplier_var = *shape_index++;
500 unsigned long long offset = *index++;
502 while (index != index_end) {
503 assert(*index < *shape_index);
505 offset += multiplier_var * *index++;
506 multiplier_var *= *shape_index++;
528 static unsigned long multiplier(
const vector<unsigned long long> &shape,
unsigned int k)
530 assert(shape.size() > 1);
531 assert(shape.size() > k + 1);
533 vector<unsigned long long>::const_iterator i = shape.begin(), e = shape.end();
535 unsigned long multiplier = *i++;
550 DmrppArray::operator=(
const DmrppArray &rhs)
552 if (
this == &rhs)
return *
this;
554 dynamic_cast<Array &
>(*this) = rhs;
556 dynamic_cast<DmrppCommon &
>(*this) = rhs;
566 bool DmrppArray::is_projected()
568 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
569 if (dimension_size(p,
true) != dimension_size(p,
false))
return true;
580 unsigned long long DmrppArray::get_size(
bool constrained)
583 unsigned long long size = 1;
584 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
585 size *= dimension_size(dim, constrained);
596 vector<unsigned long long> DmrppArray::get_shape(
bool constrained)
598 auto dim = dim_begin(), edim = dim_end();
599 vector<unsigned long long> shape;
603 shape.reserve(edim - dim);
605 for (; dim != edim; dim++) {
606 shape.push_back(dimension_size(dim, constrained));
617 DmrppArray::dimension DmrppArray::get_dimension(
unsigned int i)
619 assert(i <= (dim_end() - dim_begin()));
620 return *(dim_begin() + i);
636 void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter,
unsigned long *target_index,
637 vector<unsigned long long> &subset_addr,
638 const vector<unsigned long long> &array_shape,
char *src_buf)
640 BESDEBUG(
"dmrpp",
"DmrppArray::" << __func__ <<
"() - subsetAddress.size(): " << subset_addr.size() << endl);
642 unsigned int bytes_per_elem = prototype()->width();
644 char *dest_buf = get_buf();
646 unsigned int start = this->dimension_start(dim_iter,
true);
647 unsigned int stop = this->dimension_stop(dim_iter,
true);
648 unsigned int stride = this->dimension_stride(dim_iter,
true);
654 if (dim_iter == dim_end() && stride == 1) {
656 subset_addr.push_back(start);
657 unsigned long long start_index = get_index(subset_addr, array_shape);
658 subset_addr.pop_back();
660 subset_addr.push_back(stop);
661 unsigned long long stop_index = get_index(subset_addr, array_shape);
662 subset_addr.pop_back();
666 for (
unsigned long source_index = start_index; source_index <= stop_index; source_index++) {
667 unsigned long target_byte = *target_index * bytes_per_elem;
668 unsigned long source_byte = source_index * bytes_per_elem;
670 for (
unsigned long i = 0; i < bytes_per_elem; i++) {
671 dest_buf[target_byte++] = src_buf[source_byte++];
678 for (
unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
681 if (dim_iter != dim_end()) {
683 subset_addr.push_back(myDimIndex);
684 insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf);
685 subset_addr.pop_back();
689 subset_addr.push_back(myDimIndex);
690 unsigned int sourceIndex = get_index(subset_addr, array_shape);
691 subset_addr.pop_back();
694 unsigned long target_byte = *target_index * bytes_per_elem;
695 unsigned long source_byte = sourceIndex * bytes_per_elem;
697 for (
unsigned int i = 0; i < bytes_per_elem; i++) {
698 dest_buf[target_byte++] = src_buf[source_byte++];
722 void DmrppArray::read_contiguous()
728 if (get_chunks_size() != 1)
729 throw BESInternalError(
string(
"Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
732 auto the_one_chunk = get_immutable_chunks()[0];
734 unsigned long long the_one_chunk_offset = the_one_chunk->get_offset();
735 unsigned long long the_one_chunk_size = the_one_chunk->get_size();
740 if (!DmrppRequestHandler::d_use_transfer_threads || the_one_chunk_size <= DmrppRequestHandler::d_contiguous_concurrent_threshold) {
742 the_one_chunk->read_chunk();
748 the_one_chunk->set_rbuf_to_size();
753 unsigned long long num_chunks = floor(the_one_chunk_size / MB);
754 if (num_chunks >= DmrppRequestHandler::d_max_transfer_threads)
755 num_chunks = DmrppRequestHandler::d_max_transfer_threads;
758 unsigned long long chunk_size = the_one_chunk_size / num_chunks;
759 std::string chunk_byteorder = the_one_chunk->get_byte_order();
763 unsigned long long chunk_remainder = the_one_chunk_size % num_chunks;
765 auto chunk_url = the_one_chunk->get_data_url();
768 queue<shared_ptr<Chunk>> chunks_to_read;
771 unsigned long long chunk_offset = the_one_chunk_offset;
772 for (
unsigned int i = 0; i < num_chunks - 1; i++) {
773 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_url, chunk_byteorder, chunk_size, chunk_offset)));
774 chunk_offset += chunk_size;
777 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder, chunk_offset)));
780 list<future<bool>> futures;
783 bool future_finished =
true;
786 if (!futures.empty())
787 future_finished = get_next_future(futures, transfer_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
791 BESDEBUG(dmrpp_3, prolog <<
"future_finished: " << (future_finished ?
"true" :
"false") << endl);
793 if (!chunks_to_read.empty()) {
795 bool thread_started =
true;
796 while (thread_started && !chunks_to_read.empty()) {
797 auto current_chunk = chunks_to_read.front();
798 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << current_chunk->to_string() << endl);
800 auto args = unique_ptr<one_child_chunk_args_new>(
new one_child_chunk_args_new(current_chunk, the_one_chunk));
801 thread_started = start_one_child_chunk_thread(futures, std::move(args));
803 if (thread_started) {
804 chunks_to_read.pop();
805 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for " << current_chunk->to_string() << endl);
808 BESDEBUG(dmrpp_3, prolog <<
"Thread not started. args deleted, Chunk remains in queue.)" <<
809 " transfer_thread_counter: " << transfer_thread_counter <<
810 " futures.size(): " << futures.size() << endl);
818 future_finished =
false;
823 while (!futures.empty()) {
824 if (futures.back().valid())
825 futures.back().get();
834 if (!is_filters_empty()){
835 the_one_chunk->filter_chunk(get_filters(), get_chunk_size_in_elements(),var()->width());
839 if (!is_projected()) {
840 reserve_value_capacity(get_size(
false));
841 val2buf(the_one_chunk->get_rbuf());
844 vector<unsigned long long> array_shape = get_shape(
false);
847 reserve_value_capacity(get_size(
true));
848 unsigned long target_index = 0;
849 vector<unsigned long long> subset;
851 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, the_one_chunk->get_rbuf());
876 void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk,
unsigned int dim,
unsigned long long array_offset,
877 const vector<unsigned long long> &array_shape,
878 unsigned long long chunk_offset,
const vector<unsigned long long> &chunk_shape,
879 const vector<unsigned long long> &chunk_origin)
884 dimension thisDim = this->get_dimension(dim);
885 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
886 if ((
unsigned) thisDim.stop < end_element) {
887 end_element = thisDim.stop;
890 unsigned long long chunk_end = end_element - chunk_origin[dim];
892 unsigned int last_dim = chunk_shape.size() - 1;
893 if (dim == last_dim) {
894 unsigned int elem_width = prototype()->width();
896 array_offset += chunk_origin[dim];
899 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
900 char *source_buffer = chunk->get_rbuf();
901 char *target_buffer = get_buf();
902 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
905 unsigned long mc = multiplier(chunk_shape, dim);
906 unsigned long ma = multiplier(array_shape, dim);
909 for (
unsigned int chunk_index = 0 ; chunk_index <= chunk_end; ++chunk_index) {
910 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
911 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
914 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
931 void DmrppArray::read_chunks_unconstrained()
933 if (get_chunks_size() < 2)
934 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
939 unsigned long long sc_count=0;
941 sc_id << name() <<
"-" << sc_count++;
942 queue<shared_ptr<SuperChunk>> super_chunks;
943 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this)) ;
944 super_chunks.push(current_super_chunk);
947 for(
const auto& chunk: get_immutable_chunks()){
948 bool added = current_super_chunk->add_chunk(chunk);
950 sc_id.str(std::string());
951 sc_id << name() <<
"-" << sc_count++;
952 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this));
953 super_chunks.push(current_super_chunk);
954 if(!current_super_chunk->add_chunk(chunk)){
956 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
961 reserve_value_capacity(get_size());
963 const vector<unsigned long long> array_shape = get_shape(
true);
965 const vector<unsigned long long> chunk_shape = get_chunk_dimension_sizes();
968 BESDEBUG(dmrpp_3, prolog <<
"d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ?
"true" :
"false") << endl);
969 BESDEBUG(dmrpp_3, prolog <<
"d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
971 if (!DmrppRequestHandler::d_use_transfer_threads) {
972 #if DMRPP_ENABLE_THREAD_TIMERS
974 sw.
start(prolog +
"Serial SuperChunk Processing.");
976 while(!super_chunks.empty()) {
977 auto super_chunk = super_chunks.front();
979 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
986 #if DMRPP_ENABLE_THREAD_TIMERS
987 stringstream timer_name;
988 timer_name << prolog <<
"Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
990 sw.
start(timer_name.str());
992 read_super_chunks_unconstrained_concurrent(super_chunks,
this);
1013 unsigned long long DmrppArray::get_chunk_start(
const dimension &thisDim,
unsigned int chunk_origin)
1016 unsigned long long first_element_offset = 0;
1017 if ((
unsigned) (thisDim.start) < chunk_origin) {
1019 if (thisDim.stride != 1) {
1021 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1023 if (first_element_offset != 0) {
1025 first_element_offset = thisDim.stride - first_element_offset;
1030 first_element_offset = thisDim.start - chunk_origin;
1033 return first_element_offset;
1058 DmrppArray::find_needed_chunks(
unsigned int dim, vector<unsigned long long> *target_element_address, shared_ptr<Chunk> chunk)
1060 BESDEBUG(dmrpp_3, prolog <<
" BEGIN, dim: " << dim << endl);
1063 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1066 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1068 dimension thisDim = this->get_dimension(dim);
1071 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1072 (
unsigned) thisDim.stop < chunk_origin[dim]) {
1077 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1080 if (chunk_start > chunk_shape[dim]) {
1085 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1086 if ((
unsigned) thisDim.stop < end_element) {
1087 end_element = thisDim.stop;
1090 unsigned long long chunk_end = end_element - chunk_origin[dim];
1092 unsigned int last_dim = chunk_shape.size() - 1;
1093 if (dim == last_dim) {
1094 BESDEBUG(dmrpp_3, prolog <<
" END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1099 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1100 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1103 auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1105 BESDEBUG(dmrpp_3, prolog <<
" END, Found chunk: " << needed->to_string() << endl);
1111 BESDEBUG(dmrpp_3, prolog <<
" END, dim: " << dim << endl);
1135 void DmrppArray::insert_chunk(
1137 vector<unsigned long long> *target_element_address,
1138 vector<unsigned long long> *chunk_element_address,
1139 shared_ptr<Chunk> chunk,
1140 const vector<unsigned long long> &constrained_array_shape){
1143 const vector<unsigned long long> &chunk_shape = get_chunk_dimension_sizes();
1146 const vector<unsigned long long> &chunk_origin = chunk->get_position_in_array();
1148 dimension thisDim = this->get_dimension(dim);
1151 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1154 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1155 if ((
unsigned) thisDim.stop < end_element) {
1156 end_element = thisDim.stop;
1159 unsigned long long chunk_end = end_element - chunk_origin[dim];
1161 unsigned int last_dim = chunk_shape.size() - 1;
1162 if (dim == last_dim) {
1163 char *source_buffer = chunk->get_rbuf();
1164 char *target_buffer = get_buf();
1165 unsigned int elem_width = prototype()->width();
1167 if (thisDim.stride == 1) {
1169 unsigned long long start_element = chunk_origin[dim] + chunk_start;
1171 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1174 (*target_element_address)[dim] = (start_element - thisDim.start);
1176 (*chunk_element_address)[dim] = chunk_start;
1179 unsigned long long target_char_start_index =
1180 get_index(*target_element_address, constrained_array_shape) * elem_width;
1181 unsigned long long chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1183 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1184 chunk_constrained_inner_dim_bytes);
1188 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1190 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1193 (*chunk_element_address)[dim] = chunk_index;
1196 unsigned int target_char_start_index =
1197 get_index(*target_element_address, constrained_array_shape) * elem_width;
1198 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1200 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1206 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1207 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1208 (*chunk_element_address)[dim] = chunk_index;
1211 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
1222 void DmrppArray::read_chunks()
1224 if (get_chunks_size() < 2)
1225 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
1229 unsigned long long sc_count=0;
1231 sc_id << name() <<
"-" << sc_count++;
1232 queue<shared_ptr<SuperChunk>> super_chunks;
1233 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this)) ;
1234 super_chunks.push(current_super_chunk);
1241 for(
const auto& chunk: get_immutable_chunks()){
1242 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
1243 auto needed = find_needed_chunks(0 , &target_element_address, chunk);
1245 bool added = current_super_chunk->add_chunk(chunk);
1247 sc_id.str(std::string());
1248 sc_id << name() <<
"-" << sc_count++;
1249 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk(sc_id.str(),
this));
1250 super_chunks.push(current_super_chunk);
1251 if(!current_super_chunk->add_chunk(chunk)){
1253 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1260 reserve_value_capacity(get_size(
true));
1262 BESDEBUG(dmrpp_3, prolog <<
"d_use_transfer_threads: " << (DmrppRequestHandler::d_use_transfer_threads ?
"true" :
"false") << endl);
1263 BESDEBUG(dmrpp_3, prolog <<
"d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads << endl);
1264 BESDEBUG(dmrpp_3, prolog <<
"d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ?
"true" :
"false") << endl);
1265 BESDEBUG(dmrpp_3, prolog <<
"d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
1266 BESDEBUG(dmrpp_3, prolog <<
"SuperChunks.size(): " << super_chunks.size() << endl);
1268 if (!DmrppRequestHandler::d_use_transfer_threads) {
1271 #if DMRPP_ENABLE_THREAD_TIMERS
1273 sw.
start(prolog +
"Serial SuperChunk Processing.");
1275 while (!super_chunks.empty()) {
1276 auto super_chunk = super_chunks.front();
1278 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
1279 super_chunk->read();
1283 #if DMRPP_ENABLE_THREAD_TIMERS
1284 stringstream timer_name;
1285 timer_name << prolog <<
"Concurrent SuperChunk Processing. d_max_transfer_threads: " << DmrppRequestHandler::d_max_transfer_threads;
1287 sw.
start(timer_name.str());
1289 read_super_chunks_concurrent(super_chunks,
this);
1295 #ifdef USE_READ_SERIAL
1317 void DmrppArray::insert_chunk_serial(
unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
1320 BESDEBUG(
"dmrpp", __func__ <<
" dim: "<< dim <<
" BEGIN "<< endl);
1323 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1326 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1328 dimension thisDim = this->get_dimension(dim);
1331 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (
unsigned) thisDim.stop < chunk_origin[dim]) {
1336 unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
1339 if (first_element_offset > chunk_shape[dim]) {
1344 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1345 if ((
unsigned) thisDim.stop < end_element) {
1346 end_element = thisDim.stop;
1349 unsigned long long chunk_start = first_element_offset;
1350 unsigned long long chunk_end = end_element - chunk_origin[dim];
1351 vector<unsigned int> constrained_array_shape = get_shape(
true);
1353 unsigned int last_dim = chunk_shape.size() - 1;
1354 if (dim == last_dim) {
1356 chunk->read_chunk();
1358 chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
1360 char *source_buffer = chunk->get_rbuf();
1361 char *target_buffer = get_buf();
1362 unsigned int elem_width = prototype()->width();
1364 if (thisDim.stride == 1) {
1366 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
1368 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1371 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
1373 (*chunk_element_address)[dim] = first_element_offset;
1375 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1376 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1378 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
1382 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1384 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1387 (*chunk_element_address)[dim] = chunk_index;
1389 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1390 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1392 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1398 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1399 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1400 (*chunk_element_address)[dim] = chunk_index;
1403 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
1408 void DmrppArray::read_chunks_serial()
1410 BESDEBUG(
"dmrpp", __func__ <<
" for variable '" << name() <<
"' - BEGIN" << endl);
1412 vector<Chunk> &chunk_refs = get_chunk_vec();
1413 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1416 reserve_value_capacity(get_size(
true));
1424 for (
unsigned long i = 0; i < chunk_refs.size(); i++) {
1425 Chunk &chunk = chunk_refs[i];
1427 vector<unsigned int> chunk_source_address(dimensions(), 0);
1428 vector<unsigned int> target_element_address = chunk.get_position_in_array();
1431 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
1436 BESDEBUG(
"dmrpp",
"DmrppArray::"<< __func__ <<
"() for " << name() <<
" END"<< endl);
1441 DmrppArray::set_send_p(
bool state)
1443 if (!get_attributes_loaded())
1444 load_attributes(
this);
1446 Array::set_send_p(state);
1460 bool DmrppArray::read()
1466 if (!get_chunks_loaded())
1469 if (read_p())
return true;
1473 if (get_chunks_size() == 1) {
1474 BESDEBUG(dmrpp_4,
"Calling read_contiguous() for " << name() << endl);
1478 if (!is_projected()) {
1479 BESDEBUG(dmrpp_4,
"Calling read_chunks_unconstrained() for " << name() << endl);
1480 read_chunks_unconstrained();
1483 BESDEBUG(dmrpp_4,
"Calling read_chunks() for " << name() << endl);
1488 if (this->twiddle_bytes()) {
1489 int num = this->length();
1490 Type var_type = this->var()->type();
1494 case dods_uint16_c: {
1495 dods_uint16 *local =
reinterpret_cast<dods_uint16*
>(this->get_buf());
1497 *local = bswap_16(*local);
1503 case dods_uint32_c: {
1504 dods_uint32 *local =
reinterpret_cast<dods_uint32*
>(this->get_buf());;
1506 *local = bswap_32(*local);
1512 case dods_uint64_c: {
1513 dods_uint64 *local =
reinterpret_cast<dods_uint64*
>(this->get_buf());;
1515 *local = bswap_64(*local);
1531 class PrintD4ArrayDimXMLWriter :
public unary_function<Array::dimension &, void> {
1538 PrintD4ArrayDimXMLWriter(XMLWriter &xml,
bool c) :
1539 xml(xml), d_constrained(c)
1543 void operator()(Array::dimension &d)
1549 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *)
"Dim") < 0)
1550 throw InternalErr(__FILE__, __LINE__,
"Could not write Dim element");
1552 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1555 if (!d_constrained && !name.empty()) {
1556 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name",
1557 (
const xmlChar *) name.c_str()) < 0)
1558 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1560 else if (d.use_sdim_for_slice) {
1561 assert(!name.empty());
1562 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name",
1563 (
const xmlChar *) name.c_str()) < 0)
1564 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1568 size << (d_constrained ? d.c_size : d.size);
1569 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"size",
1570 (
const xmlChar *) size.str().c_str()) < 0)
1571 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1574 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1575 throw InternalErr(__FILE__, __LINE__,
"Could not end Dim element");
1579 class PrintD4ConstructorVarXMLWriter :
public unary_function<BaseType *, void> {
1583 PrintD4ConstructorVarXMLWriter(XMLWriter &xml,
bool c) :
1584 xml(xml), d_constrained(c)
1588 void operator()(BaseType *btp)
1590 btp->print_dap4(xml, d_constrained);
1594 class PrintD4MapXMLWriter :
public unary_function<D4Map *, void> {
1598 PrintD4MapXMLWriter(XMLWriter &xml) :
1603 void operator()(D4Map *m)
1633 void DmrppArray::print_dap4(XMLWriter &xml,
bool constrained )
1635 if (constrained && !send_p())
return;
1637 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *) var()->type_name().c_str()) < 0)
1638 throw InternalErr(__FILE__, __LINE__,
"Could not write " + type_name() +
" element");
1640 if (!name().empty())
1641 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name", (
const xmlChar *) name().c_str()) <
1643 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1646 if (var()->type() == dods_enum_c) {
1647 D4Enum *e =
static_cast<D4Enum *
>(var());
1648 string path = e->enumeration()->name();
1649 if (e->enumeration()->parent()) {
1651 path =
static_cast<D4Group *
>(e->enumeration()->parent()->parent())->FQN() + path;
1653 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"enum", (
const xmlChar *) path.c_str()) < 0)
1654 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for enum");
1657 if (prototype()->is_constructor_type()) {
1658 Constructor &c =
static_cast<Constructor &
>(*prototype());
1659 for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1664 for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1666 attributes()->print_dap4(xml);
1668 for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1672 if (DmrppCommon::d_print_chunks && get_chunks_size() > 0)
1673 print_chunks_element(xml, DmrppCommon::d_ns_prefix);
1682 if (DmrppCommon::d_print_chunks && is_compact_layout() && read_p()) {
1683 switch (var()->type()) {
1697 case dods_float32_c:
1698 case dods_float64_c: {
1699 u_int8_t *values = 0;
1701 size_t size = buf2val(
reinterpret_cast<void **
>(&values));
1702 string encoded = base64::Base64::encode(values, size);
1703 print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
1718 buf2val(
reinterpret_cast<void **
>(&values));
1720 for (
int i = 0; i < length(); ++i) {
1721 str = (*(
static_cast<string *
> (values) + i));
1722 string encoded = base64::Base64::encode(
reinterpret_cast<const u_int8_t *
>(str.c_str()), str.size());
1723 print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
1735 throw InternalErr(__FILE__, __LINE__,
"Vector::val2buf: bad type");
1738 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1739 throw InternalErr(__FILE__, __LINE__,
"Could not end " + type_name() +
" element");
1742 void DmrppArray::dump(ostream &strm)
const
1744 strm << BESIndent::LMarg <<
"DmrppArray::" << __func__ <<
"(" << (
void *)
this <<
")" << endl;
1745 BESIndent::Indent();
1746 DmrppCommon::dump(strm);
1748 strm << BESIndent::LMarg <<
"value: " <<
"----" << endl;
1749 BESIndent::UnIndent();
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
exception thrown if internal error encountered
virtual bool start(std::string name)