00001 #ifndef XRDCPMTHRQ__HH 00002 #define XRDCPMTHRQ__HH 00003 /******************************************************************************/ 00004 /* */ 00005 /* X r d C p M t h r Q u e u e . h h */ 00006 /* */ 00007 /* Author: Fabrizio Furano (INFN Padova, 2004) */ 00008 /* */ 00009 /* This file is part of the XRootD software suite. */ 00010 /* */ 00011 /* XRootD is free software: you can redistribute it and/or modify it under */ 00012 /* the terms of the GNU Lesser General Public License as published by the */ 00013 /* Free Software Foundation, either version 3 of the License, or (at your */ 00014 /* option) any later version. */ 00015 /* */ 00016 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ 00017 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ 00018 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ 00019 /* License for more details. */ 00020 /* */ 00021 /* You should have received a copy of the GNU Lesser General Public License */ 00022 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ 00023 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */ 00024 /* */ 00025 /* The copyright holder's institutional names and contributor's names may not */ 00026 /* be used to endorse or promote products derived from this software without */ 00027 /* specific prior written permission of the institution or contributor. */ 00028 /******************************************************************************/ 00029 00031 // // 00032 // A thread safe queue to be used for multithreaded producers-consumers // 00033 // // 00035 00036 #include "XrdSys/XrdSysPthread.hh" 00037 #include "XrdClient/XrdClientVector.hh" 00038 #include "XrdSys/XrdSysSemWait.hh" 00039 #include "XrdSys/XrdSysHeaders.hh" 00040 00041 using namespace std; 00042 00043 struct XrdCpMessage { 00044 void *buf; 00045 long long offs; 00046 int len; 00047 }; 00048 00049 // The max allowed size for this queue 00050 // If this value is reached, then the writer has to wait... 00051 #define CPMTQ_BUFFSIZE 50000000 00052 00053 class XrdCpMthrQueue { 00054 private: 00055 long fTotSize; 00056 XrdClientVector<XrdCpMessage*> fMsgQue; // queue for incoming messages 00057 int fMsgIter; // an iterator on it 00058 int fWrWait; // Write waiters 00059 00060 XrdSysRecMutex fMutex; // mutex to protect data structures 00061 00062 XrdSysSemWait fReadSem; // variable to make the reader wait 00063 // until some data is available 00064 XrdSysSemaphore fWriteSem; // variable to make the writer wait 00065 // if the queue is full 00066 public: 00067 00068 XrdCpMthrQueue(); 00069 ~XrdCpMthrQueue() {} 00070 00071 int PutBuffer(void *buf, long long offs, int len); 00072 int GetBuffer(void **buf, long long &offs, int &len); 00073 int GetLength() { return fMsgQue.GetSize(); } 00074 void Clear(); 00075 }; 00076 #endif