xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
XrdClStream.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_STREAM_HH__
20 #define __XRD_CL_STREAM_HH__
21 
22 #include "XrdCl/XrdClPoller.hh"
23 #include "XrdCl/XrdClStatus.hh"
24 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClJobManager.hh"
28 #include "XrdCl/XrdClInQueue.hh"
29 #include "XrdCl/XrdClUtils.hh"
30 
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdNet/XrdNetAddr.hh"
33 #include <list>
34 #include <vector>
35 #include <functional>
36 
37 namespace XrdCl
38 {
39  class Message;
40  class Channel;
41  class TransportHandler;
42  class TaskManager;
43  struct SubStreamData;
44 
45  //----------------------------------------------------------------------------
47  //----------------------------------------------------------------------------
48  class Stream
49  {
50  public:
51  //------------------------------------------------------------------------
53  //------------------------------------------------------------------------
55  {
57  Connected = 1,
58  Connecting = 2,
59  Error = 3
60  };
61 
62  //------------------------------------------------------------------------
64  //------------------------------------------------------------------------
65  Stream( const URL *url, uint16_t streamNum );
66 
67  //------------------------------------------------------------------------
69  //------------------------------------------------------------------------
70  ~Stream();
71 
72  //------------------------------------------------------------------------
74  //------------------------------------------------------------------------
76 
77  //------------------------------------------------------------------------
79  //------------------------------------------------------------------------
80  Status Send( Message *msg,
81  OutgoingMsgHandler *handler,
82  bool stateful,
83  time_t expires );
84 
85  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  void SetTransport( TransportHandler *transport )
89  {
90  pTransport = transport;
91  }
92 
93  //------------------------------------------------------------------------
95  //------------------------------------------------------------------------
96  void SetPoller( Poller *poller )
97  {
98  pPoller = poller;
99  }
100 
101  //------------------------------------------------------------------------
103  //------------------------------------------------------------------------
104  void SetIncomingQueue( InQueue *incomingQueue )
105  {
106  pIncomingQueue = incomingQueue;
107  delete pQueueIncMsgJob;
108  pQueueIncMsgJob = new QueueIncMsgJob( incomingQueue );
109  }
110 
111  //------------------------------------------------------------------------
113  //------------------------------------------------------------------------
114  void SetChannelData( AnyObject *channelData )
115  {
116  pChannelData = channelData;
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
122  void SetTaskManager( TaskManager *taskManager )
123  {
124  pTaskManager = taskManager;
125  }
126 
127  //------------------------------------------------------------------------
129  //------------------------------------------------------------------------
130  void SetJobManager( JobManager *jobManager )
131  {
132  pJobManager = jobManager;
133  }
134 
135  //------------------------------------------------------------------------
139  //------------------------------------------------------------------------
140  Status EnableLink( PathID &path );
141 
142  //------------------------------------------------------------------------
144  //------------------------------------------------------------------------
145  void Disconnect( bool force = false );
146 
147  //------------------------------------------------------------------------
150  //------------------------------------------------------------------------
151  void Tick( time_t now );
152 
153  //------------------------------------------------------------------------
155  //------------------------------------------------------------------------
156  const URL *GetURL() const
157  {
158  return pUrl;
159  }
160 
161  //------------------------------------------------------------------------
163  //------------------------------------------------------------------------
164  uint16_t GetStreamNumber() const
165  {
166  return pStreamNum;
167  }
168 
169  //------------------------------------------------------------------------
171  //------------------------------------------------------------------------
172  void ForceConnect();
173 
174  //------------------------------------------------------------------------
176  //------------------------------------------------------------------------
177  const std::string &GetName() const
178  {
179  return pStreamName;
180  }
181 
182  //------------------------------------------------------------------------
184  //------------------------------------------------------------------------
185  void DisableIfEmpty( uint16_t subStream );
186 
187  //------------------------------------------------------------------------
189  //------------------------------------------------------------------------
190  void OnIncoming( uint16_t subStream,
191  Message *msg,
192  uint32_t bytesReceived );
193 
194  //------------------------------------------------------------------------
195  // Call when one of the sockets is ready to accept a new message
196  //------------------------------------------------------------------------
197  std::pair<Message *, OutgoingMsgHandler *>
198  OnReadyToWrite( uint16_t subStream );
199 
200  //------------------------------------------------------------------------
201  // Call when a message is written to the socket
202  //------------------------------------------------------------------------
203  void OnMessageSent( uint16_t subStream,
204  Message *msg,
205  uint32_t bytesSent );
206 
207  //------------------------------------------------------------------------
209  //------------------------------------------------------------------------
210  void OnConnect( uint16_t subStream );
211 
212  //------------------------------------------------------------------------
214  //------------------------------------------------------------------------
215  void OnConnectError( uint16_t subStream, Status status );
216 
217  //------------------------------------------------------------------------
219  //------------------------------------------------------------------------
220  void OnError( uint16_t subStream, Status status );
221 
222  //------------------------------------------------------------------------
224  //------------------------------------------------------------------------
225  void ForceError( Status status );
226 
227  //------------------------------------------------------------------------
229  //------------------------------------------------------------------------
230  void OnReadTimeout( uint16_t subStream, bool &isBroken );
231 
232  //------------------------------------------------------------------------
234  //------------------------------------------------------------------------
235  void OnWriteTimeout( uint16_t subStream );
236 
237  //------------------------------------------------------------------------
239  //------------------------------------------------------------------------
240  void RegisterEventHandler( ChannelEventHandler *handler );
241 
242  //------------------------------------------------------------------------
244  //------------------------------------------------------------------------
245  void RemoveEventHandler( ChannelEventHandler *handler );
246 
247  //------------------------------------------------------------------------
256  //------------------------------------------------------------------------
257  std::pair<IncomingMsgHandler *, bool>
258  InstallIncHandler( Message *msg, uint16_t stream );
259 
260  //------------------------------------------------------------------------
262  //------------------------------------------------------------------------
263  void SetOnConnectHandler( Job *onConnJob )
264  {
265  delete pOnConnJob;
266  pOnConnJob = onConnJob;
267  }
268 
269  private:
270 
271  //------------------------------------------------------------------------
272  // Job queuing the incoming messages
273  //------------------------------------------------------------------------
274  class QueueIncMsgJob: public Job
275  {
276  public:
277  QueueIncMsgJob( InQueue *queue ): pQueue( queue ) {};
278  virtual ~QueueIncMsgJob() {};
279  virtual void Run( void *arg )
280  {
281  Message *msg = (Message *)arg;
282  pQueue->AddMessage( msg );
283  }
284  private:
286  };
287 
288  //------------------------------------------------------------------------
289  // Job handling the incoming messages
290  //------------------------------------------------------------------------
291  class HandleIncMsgJob: public Job
292  {
293  public:
294  HandleIncMsgJob( IncomingMsgHandler *handler ): pHandler( handler ) {};
295  virtual ~HandleIncMsgJob() {};
296  virtual void Run( void *arg )
297  {
298  Message *msg = (Message *)arg;
299  pHandler->Process( msg );
300  delete this;
301  }
302  private:
304  };
305 
306  //------------------------------------------------------------------------
308  //------------------------------------------------------------------------
309  void OnFatalError( uint16_t subStream,
310  Status status,
311  XrdSysMutexHelper &lock );
312 
313  //------------------------------------------------------------------------
315  //------------------------------------------------------------------------
316  void MonitorDisconnection( Status status );
317 
318  //------------------------------------------------------------------------
320  //------------------------------------------------------------------------
321  Status RequestClose( Message *resp );
322 
323  typedef std::vector<SubStreamData*> SubStreamList;
324 
325  //------------------------------------------------------------------------
326  // Data members
327  //------------------------------------------------------------------------
328  const URL *pUrl;
329  uint16_t pStreamNum;
330  std::string pStreamName;
346  std::vector<XrdNetAddr> pAddresses;
349  uint64_t pSessionId;
350 
351  //------------------------------------------------------------------------
352  // Jobs
353  //------------------------------------------------------------------------
355 
356  //------------------------------------------------------------------------
357  // Monitoring info
358  //------------------------------------------------------------------------
361  uint64_t pBytesSent;
362  uint64_t pBytesReceived;
363 
364  //------------------------------------------------------------------------
365  // Data stream on-connect handler
366  //------------------------------------------------------------------------
368  };
369 }
370 
371 #endif // __XRD_CL_STREAM_HH__
std::pair< Message *, OutgoingMsgHandler * > OnReadyToWrite(uint16_t subStream)
A synchronized queue.
Definition: XrdClJobManager.hh:50
Definition: XrdClStream.hh:291
Definition: XrdClAnyObject.hh:32
uint32_t pLastStreamError
Definition: XrdClStream.hh:338
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:346
Status Initialize()
Initializer.
void OnReadTimeout(uint16_t subStream, bool &isBroken)
On read timeout.
Definition: XrdSysPthread.hh:239
Interface for socket pollers.
Definition: XrdClPoller.hh:86
Definition: XrdClStream.hh:274
virtual void Run(void *arg)
The job logic.
Definition: XrdClStream.hh:296
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void OnWriteTimeout(uint16_t subStream)
On write timeout.
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
void OnError(uint16_t subStream, Status status)
On error.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
timeval pConnectionDone
Definition: XrdClStream.hh:360
IncomingMsgHandler * pHandler
Definition: XrdClStream.hh:303
uint16_t GetStreamNumber() const
Get the stream number.
Definition: XrdClStream.hh:164
QueueIncMsgJob(InQueue *queue)
Definition: XrdClStream.hh:277
Definition: XrdClPostMasterInterfaces.hh:282
InQueue * pIncomingQueue
Definition: XrdClStream.hh:336
Status RequestClose(Message *resp)
Send close after an open request timed out.
In the process of being connected.
Definition: XrdClStream.hh:58
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:104
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:348
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:96
Stream(const URL *url, uint16_t streamNum)
Constructor.
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:33
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:323
void SetOnConnectHandler(Job *onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:263
void ForceError(Status status)
Force error.
Procedure execution status.
Definition: XrdClStatus.hh:109
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:122
uint64_t pSessionId
Definition: XrdClStream.hh:349
uint16_t pConnectionWindow
Definition: XrdClStream.hh:344
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:88
AddressType
Address type.
Definition: XrdClUtils.hh:93
uint16_t pConnectionCount
Definition: XrdClStream.hh:341
Broken.
Definition: XrdClStream.hh:59
AnyObject * pChannelData
Definition: XrdClStream.hh:337
void MonitorDisconnection(Status status)
Inform the monitoring about disconnection.
Status pLastFatalError
Definition: XrdClStream.hh:339
HandleIncMsgJob(IncomingMsgHandler *handler)
Definition: XrdClStream.hh:294
TransportHandler * pTransport
Definition: XrdClStream.hh:331
TaskManager * pTaskManager
Definition: XrdClStream.hh:333
uint16_t pConnectionRetry
Definition: XrdClStream.hh:342
timeval pConnectionStarted
Definition: XrdClStream.hh:359
uint16_t pStreamNum
Definition: XrdClStream.hh:329
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:335
void OnIncoming(uint16_t subStream, Message *msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
QueueIncMsgJob * pQueueIncMsgJob
Definition: XrdClStream.hh:354
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:302
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:295
JobManager * pJobManager
Definition: XrdClStream.hh:334
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:130
Status Send(Message *msg, OutgoingMsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:220
Poller * pPoller
Definition: XrdClStream.hh:332
Message handler.
Definition: XrdClPostMasterInterfaces.hh:68
uint64_t pBytesSent
Definition: XrdClStream.hh:361
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:35
Utils::AddressType pAddressType
Definition: XrdClStream.hh:347
~Stream()
Destructor.
void Disconnect(bool force=false)
Disconnect the stream.
Connected.
Definition: XrdClStream.hh:57
std::pair< IncomingMsgHandler *, bool > InstallIncHandler(Message *msg, uint16_t stream)
void ForceConnect()
Force connection.
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:114
uint64_t pBytesReceived
Definition: XrdClStream.hh:362
SubStreamList pSubStreams
Definition: XrdClStream.hh:345
void OnConnectError(uint16_t subStream, Status status)
On connect error.
URL representation.
Definition: XrdClURL.hh:30
Job * pOnConnJob
Definition: XrdClStream.hh:367
bool AddMessage(Message *msg)
Add a fully reconstructed message to the queue.
virtual void Process(Message *msg)
Definition: XrdClPostMasterInterfaces.hh:126
Status EnableLink(PathID &path)
std::string pStreamName
Definition: XrdClStream.hh:330
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
virtual ~QueueIncMsgJob()
Definition: XrdClStream.hh:278
Message status handler.
Definition: XrdClPostMasterInterfaces.hh:167
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
Stream.
Definition: XrdClStream.hh:48
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:156
void Tick(time_t now)
InQueue * pQueue
Definition: XrdClStream.hh:285
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:177
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
const URL * pUrl
Definition: XrdClStream.hh:328
void OnFatalError(uint16_t subStream, Status status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
time_t pConnectionInitTime
Definition: XrdClStream.hh:343
virtual void Run(void *arg)
The job logic.
Definition: XrdClStream.hh:279
Definition: XrdClTaskManager.hh:75
Definition: XrdSysPthread.hh:260
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:54
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:340
Not connected.
Definition: XrdClStream.hh:56