xrootd
XrdClParallelOperation.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
28 
29 #include "XrdCl/XrdClOperations.hh"
31 
32 #include <atomic>
33 
34 namespace XrdCl
35 {
36 
38  {
39 
40  };
41 
42  //----------------------------------------------------------------------------
48  //----------------------------------------------------------------------------
49  template<bool HasHndl>
50  class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
51  {
52  template<bool> friend class ParallelOperation;
53 
54  public:
55 
56  //------------------------------------------------------------------------
58  //------------------------------------------------------------------------
59  template<bool from>
61  ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
62  pipelines( std::move( obj.pipelines ) )
63  {
64  }
65 
66  //------------------------------------------------------------------------
72  //------------------------------------------------------------------------
73  template<class Container>
74  ParallelOperation( Container &&container )
75  {
76  static_assert( !HasHndl, "Constructor is available only operation without handler");
77 
78  pipelines.reserve( container.size() );
79  auto begin = std::make_move_iterator( container.begin() );
80  auto end = std::make_move_iterator( container.end() );
81  std::copy( begin, end, std::back_inserter( pipelines ) );
82  container.clear(); // there's junk inside so we clear it
83  }
84 
85  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  std::string ToString()
89  {
90  std::ostringstream oss;
91  oss << "Parallel(";
92  for( size_t i = 0; i < pipelines.size(); i++ )
93  {
94  oss << pipelines[i]->ToString();
95  if( i + 1 != pipelines.size() )
96  {
97  oss << " && ";
98  }
99  }
100  oss << ")";
101  return oss.str();
102  }
103 
104  private:
105 
106  //------------------------------------------------------------------------
111  //------------------------------------------------------------------------
112  struct Ctx
113  {
114  //----------------------------------------------------------------------
118  //----------------------------------------------------------------------
120  {
121 
122  }
123 
124  //----------------------------------------------------------------------
126  //----------------------------------------------------------------------
128  {
129  Handle( XRootDStatus() );
130  }
131 
132  //----------------------------------------------------------------------
137  //----------------------------------------------------------------------
138  void Handle( const XRootDStatus &st )
139  {
140  PipelineHandler* hdlr = handler.exchange( nullptr );
141  if( hdlr )
142  hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
143  }
144 
145  //----------------------------------------------------------------------
147  //----------------------------------------------------------------------
148  std::atomic<PipelineHandler*> handler;
149  };
150 
151  //------------------------------------------------------------------------
157  //------------------------------------------------------------------------
159  {
160  std::shared_ptr<Ctx> ctx( new Ctx( this->handler.release() ) );
161 
162  try
163  {
164  for( size_t i = 0; i < pipelines.size(); ++i )
165  {
166  pipelines[i].Run( [ctx]( const XRootDStatus &st ){ if( !st.IsOK() ) ctx->Handle( st ); } );
167  }
168  }
169  catch( const PipelineException& ex )
170  {
171  return ex.GetError();
172  }
173  catch( const std::exception& ex )
174  {
175  return XRootDStatus( stError, ex.what() );
176  }
177 
178  return XRootDStatus();
179  }
180 
181  std::vector<Pipeline> pipelines;
182  };
183 
184  //----------------------------------------------------------------------------
186  //----------------------------------------------------------------------------
187  template<class Container>
188  ParallelOperation<false> Parallel( Container &container )
189  {
190  return ParallelOperation<false>( container );
191  }
192 
193  //----------------------------------------------------------------------------
195  //----------------------------------------------------------------------------
196  inline void PipesToVec( std::vector<Pipeline>& )
197  {
198  // base case
199  }
200 
201  //----------------------------------------------------------------------------
202  // Declare PipesToVec (we need to do declare those functions ahead of
203  // definitions, as they may call each other.
204  //----------------------------------------------------------------------------
205  template<typename ... Others>
206  inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
207  Others&... others );
208 
209  template<typename ... Others>
210  inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
211  Others&... others );
212 
213  template<typename ... Others>
214  inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
215  Others&... others );
216 
217  //----------------------------------------------------------------------------
218  // Define PipesToVec
219  //----------------------------------------------------------------------------
220  template<typename ... Others>
221  void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
222  Others&... others )
223  {
224  v.emplace_back( operation );
225  PipesToVec( v, others... );
226  }
227 
228  template<typename ... Others>
229  void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
230  Others&... others )
231  {
232  v.emplace_back( operation );
233  PipesToVec( v, others... );
234  }
235 
236  template<typename ... Others>
237  void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
238  Others&... others )
239  {
240  v.emplace_back( std::move( pipeline ) );
241  PipesToVec( v, others... );
242  }
243 
244  //----------------------------------------------------------------------------
249  //----------------------------------------------------------------------------
250  template<typename ... Operations>
251  ParallelOperation<false> Parallel( Operations&& ... operations )
252  {
253  constexpr size_t size = sizeof...( operations );
254  std::vector<Pipeline> v;
255  v.reserve( size );
256  PipesToVec( v, operations... );
257  return Parallel( v );
258  }
259 }
260 
261 #endif // __XRD_CL_OPERATIONS_HH__
XrdClOperationHandlers.hh
XrdCl::Resp
Definition: XrdClOperationHandlers.hh:553
XrdCl::ParallelOperation::ToString
std::string ToString()
Definition: XrdClParallelOperation.hh:88
XrdCl::PipelineException::GetError
const XRootDStatus & GetError() const
Definition: XrdClOperationHandlers.hh:351
XrdCl::ParallelOperation::Ctx::Handle
void Handle(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:138
XrdCl::ParallelOperation::pipelines
std::vector< Pipeline > pipelines
Definition: XrdClParallelOperation.hh:181
XrdCl::PipelineHandler
Definition: XrdClOperations.hh:50
XrdCl::Operation::handler
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:280
XrdCl::PipelineException
Pipeline exception, wrapps an XRootDStatus.
Definition: XrdClOperationHandlers.hh:311
XrdCl::PipesToVec
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition: XrdClParallelOperation.hh:196
XrdCl::ConcreteOperation
Definition: XrdClOperations.hh:467
XrdCl::ParallelOperation
Definition: XrdClParallelOperation.hh:50
XrdCl::ParallelOperation::Ctx
Definition: XrdClParallelOperation.hh:112
XrdCl::PipelineHandler::HandleResponse
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
XrdCl::ParallelOperation::RunImpl
XRootDStatus RunImpl()
Definition: XrdClParallelOperation.hh:158
XrdClOperations.hh
XrdCl::XRootDStatus
Request status.
Definition: XrdClXRootDResponses.hh:212
XrdCl::ParallelOperation::Ctx::handler
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition: XrdClParallelOperation.hh:148
XrdCl::ParallelHandler
Definition: XrdClParallelOperation.hh:37
XrdCl::Pipeline
Definition: XrdClOperations.hh:295
XrdCl::Status::IsOK
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:119
XrdCl
Definition: XrdClAnyObject.hh:25
XrdCl::Operation
Definition: XrdClOperations.hh:42
XrdCl::stError
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
XrdCl::ParallelOperation::ParallelOperation
ParallelOperation(Container &&container)
Definition: XrdClParallelOperation.hh:74
XrdCl::ParallelOperation::ParallelOperation
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition: XrdClParallelOperation.hh:60
XrdCl::Parallel
ParallelOperation< false > Parallel(Container &container)
Factory function for creating parallel operation from a vector.
Definition: XrdClParallelOperation.hh:188
XrdCl::ParallelOperation::Ctx::~Ctx
~Ctx()
Destructor.
Definition: XrdClParallelOperation.hh:127
XrdCl::ParallelOperation::Ctx::Ctx
Ctx(PipelineHandler *handler)
Definition: XrdClParallelOperation.hh:119