00001 //------------------------------------------------------------------------------ 00002 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN) 00003 // Author: Lukasz Janyst <ljanyst@cern.ch> 00004 //------------------------------------------------------------------------------ 00005 // XRootD is free software: you can redistribute it and/or modify 00006 // it under the terms of the GNU Lesser General Public License as published by 00007 // the Free Software Foundation, either version 3 of the License, or 00008 // (at your option) any later version. 00009 // 00010 // XRootD is distributed in the hope that it will be useful, 00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 // GNU General Public License for more details. 00014 // 00015 // You should have received a copy of the GNU Lesser General Public License 00016 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 00017 //------------------------------------------------------------------------------ 00018 00019 #ifndef __XRD_CL_STREAM_HH__ 00020 #define __XRD_CL_STREAM_HH__ 00021 00022 #include "XrdCl/XrdClPoller.hh" 00023 #include "XrdCl/XrdClStatus.hh" 00024 #include "XrdCl/XrdClURL.hh" 00025 #include "XrdCl/XrdClPostMasterInterfaces.hh" 00026 #include "XrdCl/XrdClChannelHandlerList.hh" 00027 #include "XrdCl/XrdClJobManager.hh" 00028 #include "XrdCl/XrdClInQueue.hh" 00029 #include "XrdCl/XrdClUtils.hh" 00030 00031 #include "XrdSys/XrdSysPthread.hh" 00032 #include "XrdNet/XrdNetAddr.hh" 00033 #include <list> 00034 #include <vector> 00035 #include <functional> 00036 00037 namespace XrdCl 00038 { 00039 class Message; 00040 class Channel; 00041 class TransportHandler; 00042 class TaskManager; 00043 struct SubStreamData; 00044 00045 //---------------------------------------------------------------------------- 00047 //---------------------------------------------------------------------------- 00048 class Stream 00049 { 00050 public: 00051 //------------------------------------------------------------------------ 00053 //------------------------------------------------------------------------ 00054 enum StreamStatus 00055 { 00056 Disconnected = 0, 00057 Connected = 1, 00058 Connecting = 2, 00059 Error = 3 00060 }; 00061 00062 //------------------------------------------------------------------------ 00064 //------------------------------------------------------------------------ 00065 Stream( const URL *url, uint16_t streamNum ); 00066 00067 //------------------------------------------------------------------------ 00069 //------------------------------------------------------------------------ 00070 ~Stream(); 00071 00072 //------------------------------------------------------------------------ 00074 //------------------------------------------------------------------------ 00075 Status Initialize(); 00076 00077 //------------------------------------------------------------------------ 00079 //------------------------------------------------------------------------ 00080 Status Send( Message *msg, 00081 OutgoingMsgHandler *handler, 00082 bool stateful, 00083 time_t expires ); 00084 00085 //------------------------------------------------------------------------ 00087 //------------------------------------------------------------------------ 00088 void SetTransport( TransportHandler *transport ) 00089 { 00090 pTransport = transport; 00091 } 00092 00093 //------------------------------------------------------------------------ 00095 //------------------------------------------------------------------------ 00096 void SetPoller( Poller *poller ) 00097 { 00098 pPoller = poller; 00099 } 00100 00101 //------------------------------------------------------------------------ 00103 //------------------------------------------------------------------------ 00104 void SetIncomingQueue( InQueue *incomingQueue ) 00105 { 00106 pIncomingQueue = incomingQueue; 00107 delete pQueueIncMsgJob; 00108 pQueueIncMsgJob = new QueueIncMsgJob( incomingQueue ); 00109 } 00110 00111 //------------------------------------------------------------------------ 00113 //------------------------------------------------------------------------ 00114 void SetChannelData( AnyObject *channelData ) 00115 { 00116 pChannelData = channelData; 00117 } 00118 00119 //------------------------------------------------------------------------ 00121 //------------------------------------------------------------------------ 00122 void SetTaskManager( TaskManager *taskManager ) 00123 { 00124 pTaskManager = taskManager; 00125 } 00126 00127 //------------------------------------------------------------------------ 00129 //------------------------------------------------------------------------ 00130 void SetJobManager( JobManager *jobManager ) 00131 { 00132 pJobManager = jobManager; 00133 } 00134 00135 //------------------------------------------------------------------------ 00139 //------------------------------------------------------------------------ 00140 Status EnableLink( PathID &path ); 00141 00142 //------------------------------------------------------------------------ 00144 //------------------------------------------------------------------------ 00145 void Disconnect( bool force = false ); 00146 00147 //------------------------------------------------------------------------ 00150 //------------------------------------------------------------------------ 00151 void Tick( time_t now ); 00152 00153 //------------------------------------------------------------------------ 00155 //------------------------------------------------------------------------ 00156 const URL *GetURL() const 00157 { 00158 return pUrl; 00159 } 00160 00161 //------------------------------------------------------------------------ 00163 //------------------------------------------------------------------------ 00164 uint16_t GetStreamNumber() const 00165 { 00166 return pStreamNum; 00167 } 00168 00169 //------------------------------------------------------------------------ 00171 //------------------------------------------------------------------------ 00172 void ForceConnect(); 00173 00174 //------------------------------------------------------------------------ 00176 //------------------------------------------------------------------------ 00177 const std::string &GetName() const 00178 { 00179 return pStreamName; 00180 } 00181 00182 //------------------------------------------------------------------------ 00184 //------------------------------------------------------------------------ 00185 void DisableIfEmpty( uint16_t subStream ); 00186 00187 //------------------------------------------------------------------------ 00189 //------------------------------------------------------------------------ 00190 void OnIncoming( uint16_t subStream, 00191 Message *msg, 00192 uint32_t bytesReceived ); 00193 00194 //------------------------------------------------------------------------ 00195 // Call when one of the sockets is ready to accept a new message 00196 //------------------------------------------------------------------------ 00197 std::pair<Message *, OutgoingMsgHandler *> 00198 OnReadyToWrite( uint16_t subStream ); 00199 00200 //------------------------------------------------------------------------ 00201 // Call when a message is written to the socket 00202 //------------------------------------------------------------------------ 00203 void OnMessageSent( uint16_t subStream, 00204 Message *msg, 00205 uint32_t bytesSent ); 00206 00207 //------------------------------------------------------------------------ 00209 //------------------------------------------------------------------------ 00210 void OnConnect( uint16_t subStream ); 00211 00212 //------------------------------------------------------------------------ 00214 //------------------------------------------------------------------------ 00215 void OnConnectError( uint16_t subStream, Status status ); 00216 00217 //------------------------------------------------------------------------ 00219 //------------------------------------------------------------------------ 00220 void OnError( uint16_t subStream, Status status ); 00221 00222 //------------------------------------------------------------------------ 00224 //------------------------------------------------------------------------ 00225 void ForceError( Status status ); 00226 00227 //------------------------------------------------------------------------ 00229 //------------------------------------------------------------------------ 00230 void OnReadTimeout( uint16_t subStream, bool &isBroken ); 00231 00232 //------------------------------------------------------------------------ 00234 //------------------------------------------------------------------------ 00235 void OnWriteTimeout( uint16_t subStream ); 00236 00237 //------------------------------------------------------------------------ 00239 //------------------------------------------------------------------------ 00240 void RegisterEventHandler( ChannelEventHandler *handler ); 00241 00242 //------------------------------------------------------------------------ 00244 //------------------------------------------------------------------------ 00245 void RemoveEventHandler( ChannelEventHandler *handler ); 00246 00247 //------------------------------------------------------------------------ 00256 //------------------------------------------------------------------------ 00257 std::pair<IncomingMsgHandler *, bool> 00258 InstallIncHandler( Message *msg, uint16_t stream ); 00259 00260 //------------------------------------------------------------------------ 00262 //------------------------------------------------------------------------ 00263 void SetOnConnectHandler( Job *onConnJob ) 00264 { 00265 delete pOnConnJob; 00266 pOnConnJob = onConnJob; 00267 } 00268 00269 private: 00270 00271 //------------------------------------------------------------------------ 00272 // Job queuing the incoming messages 00273 //------------------------------------------------------------------------ 00274 class QueueIncMsgJob: public Job 00275 { 00276 public: 00277 QueueIncMsgJob( InQueue *queue ): pQueue( queue ) {}; 00278 virtual ~QueueIncMsgJob() {}; 00279 virtual void Run( void *arg ) 00280 { 00281 Message *msg = (Message *)arg; 00282 pQueue->AddMessage( msg ); 00283 } 00284 private: 00285 InQueue *pQueue; 00286 }; 00287 00288 //------------------------------------------------------------------------ 00289 // Job handling the incoming messages 00290 //------------------------------------------------------------------------ 00291 class HandleIncMsgJob: public Job 00292 { 00293 public: 00294 HandleIncMsgJob( IncomingMsgHandler *handler ): pHandler( handler ) {}; 00295 virtual ~HandleIncMsgJob() {}; 00296 virtual void Run( void *arg ) 00297 { 00298 Message *msg = (Message *)arg; 00299 pHandler->Process( msg ); 00300 delete this; 00301 } 00302 private: 00303 IncomingMsgHandler *pHandler; 00304 }; 00305 00306 //------------------------------------------------------------------------ 00308 //------------------------------------------------------------------------ 00309 void OnFatalError( uint16_t subStream, 00310 Status status, 00311 XrdSysMutexHelper &lock ); 00312 00313 //------------------------------------------------------------------------ 00315 //------------------------------------------------------------------------ 00316 void MonitorDisconnection( Status status ); 00317 00318 //------------------------------------------------------------------------ 00320 //------------------------------------------------------------------------ 00321 Status RequestClose( Message *resp ); 00322 00323 typedef std::vector<SubStreamData*> SubStreamList; 00324 00325 //------------------------------------------------------------------------ 00326 // Data members 00327 //------------------------------------------------------------------------ 00328 const URL *pUrl; 00329 uint16_t pStreamNum; 00330 std::string pStreamName; 00331 TransportHandler *pTransport; 00332 Poller *pPoller; 00333 TaskManager *pTaskManager; 00334 JobManager *pJobManager; 00335 XrdSysRecMutex pMutex; 00336 InQueue *pIncomingQueue; 00337 AnyObject *pChannelData; 00338 uint32_t pLastStreamError; 00339 Status pLastFatalError; 00340 uint16_t pStreamErrorWindow; 00341 uint16_t pConnectionCount; 00342 uint16_t pConnectionRetry; 00343 time_t pConnectionInitTime; 00344 uint16_t pConnectionWindow; 00345 SubStreamList pSubStreams; 00346 std::vector<XrdNetAddr> pAddresses; 00347 Utils::AddressType pAddressType; 00348 ChannelHandlerList pChannelEvHandlers; 00349 uint64_t pSessionId; 00350 00351 //------------------------------------------------------------------------ 00352 // Jobs 00353 //------------------------------------------------------------------------ 00354 QueueIncMsgJob *pQueueIncMsgJob; 00355 00356 //------------------------------------------------------------------------ 00357 // Monitoring info 00358 //------------------------------------------------------------------------ 00359 timeval pConnectionStarted; 00360 timeval pConnectionDone; 00361 uint64_t pBytesSent; 00362 uint64_t pBytesReceived; 00363 00364 //------------------------------------------------------------------------ 00365 // Data stream on-connect handler 00366 //------------------------------------------------------------------------ 00367 Job *pOnConnJob; 00368 }; 00369 } 00370 00371 #endif // __XRD_CL_STREAM_HH__