00001 //------------------------------------------------------------------------------ 00002 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN) 00003 // Author: Michal Simon <michal.simon@cern.ch> 00004 //------------------------------------------------------------------------------ 00005 // This file is part of the XRootD software suite. 00006 // 00007 // XRootD is free software: you can redistribute it and/or modify 00008 // it under the terms of the GNU Lesser General Public License as published by 00009 // the Free Software Foundation, either version 3 of the License, or 00010 // (at your option) any later version. 00011 // 00012 // XRootD is distributed in the hope that it will be useful, 00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 // GNU General Public License for more details. 00016 // 00017 // You should have received a copy of the GNU Lesser General Public License 00018 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 00019 // 00020 // In applying this licence, CERN does not waive the privileges and immunities 00021 // granted to it by virtue of its status as an Intergovernmental Organization 00022 // or submit itself to any jurisdiction. 00023 //------------------------------------------------------------------------------ 00024 00025 #ifndef SRC_XRDCL_XRDCLXCPCTX_HH_ 00026 #define SRC_XRDCL_XRDCLXCPCTX_HH_ 00027 00028 #include "XrdCl/XrdClSyncQueue.hh" 00029 #include "XrdCl/XrdClXRootDResponses.hh" 00030 #include "XrdSys/XrdSysPthread.hh" 00031 00032 #include <stdint.h> 00033 #include <iostream> 00034 00035 namespace XrdCl 00036 { 00037 00038 class XCpSrc; 00039 00040 class XCpCtx 00041 { 00042 public: 00043 00056 XCpCtx( const std::vector<std::string> &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize ); 00057 00061 void Delete() 00062 { 00063 XrdSysMutexHelper lck( pMtx ); 00064 --pRefCount; 00065 if( !pRefCount ) 00066 { 00067 lck.UnLock(); 00068 delete this; 00069 } 00070 } 00071 00077 XCpCtx* Self() 00078 { 00079 XrdSysMutexHelper lck( pMtx ); 00080 ++pRefCount; 00081 return this; 00082 } 00083 00091 bool GetNextUrl( std::string & url ); 00092 00100 XCpSrc* WeakestLink( XCpSrc *exclude ); 00101 00107 void PutChunk( ChunkInfo* chunk ); 00108 00114 std::pair<uint64_t, uint64_t> GetBlock(); 00115 00123 void SetFileSize( int64_t size ); 00124 00129 int64_t GetSize() 00130 { 00131 XrdSysCondVarHelper lck( pFileSizeCV ); 00132 while( pFileSize < 0 && GetRunning() > 0 ) pFileSizeCV.Wait(); 00133 return pFileSize; 00134 } 00135 00144 XRootDStatus Initialize(); 00145 00160 XRootDStatus GetChunk( XrdCl::ChunkInfo &ci ); 00161 00167 void RemoveSrc( XCpSrc *src ) 00168 { 00169 XrdSysMutexHelper lck( pMtx ); 00170 pSources.remove( src ); 00171 } 00172 00180 void NotifyIdleSrc(); 00181 00190 bool AllDone(); 00191 00197 void NotifyInitExpectant() 00198 { 00199 pFileSizeCV.Broadcast(); 00200 } 00201 00202 00203 private: 00204 00210 size_t GetRunning(); 00211 00217 virtual ~XCpCtx(); 00218 00223 std::queue<std::string> pUrls; 00224 00228 uint64_t pBlockSize; 00229 00233 uint8_t pParallelSrc; 00234 00238 uint32_t pChunkSize; 00239 00243 uint8_t pParallelChunks; 00244 00250 uint64_t pOffset; 00251 00255 int64_t pFileSize; 00256 00261 XrdSysCondVar pFileSizeCV; 00262 00267 std::list<XCpSrc*> pSources; 00268 00273 SyncQueue<ChunkInfo*> pSink; 00274 00278 uint64_t pDataReceived; 00279 00284 bool pDone; 00285 00290 XrdSysCondVar pDoneCV; 00291 00295 XrdSysMutex pMtx; 00296 00300 size_t pRefCount; 00301 }; 00302 00303 } /* namespace XrdCl */ 00304 00305 #endif /* SRC_XRDCL_XRDCLXCPCTX_HH_ */