Changeset 3663
- Timestamp:
- 16/11/08 20:28:02 (8 weeks ago)
- Location:
- trunk/base/src
- Files:
-
- 3 modified
-
net/tls.py (modified) (8 diffs)
-
notifier/io.py (modified) (25 diffs)
-
notifier/sockets.py (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/base/src/net/tls.py
r3646 r3663 105 105 return self._iterate_handshake(handshake) 106 106 107 107 108 def fileno(self): 108 109 """ … … 111 112 """ 112 113 return self.sock.fileno() 114 113 115 114 116 def close(self): … … 135 137 def __init__(self): 136 138 kaa.Socket.__init__(self) 137 self.signals ['tls'] = kaa.Signal()139 self.signals += ('tls',) 138 140 self._handshake = False 139 141 140 def _update_read_monitor(self, signal = None, change = None): 141 # FIXME: This function is broken in TLSSocket for two reasons: 142 # 1. auto-reconnect while doing a tls handshake is wrong 143 # This could be fixed using self._handshake 144 # 2. Passing self._socket to register does not work, 145 # self._socket.fileno() is needed. Always using fileno() 146 # does not work for some strange reason. 147 return 148 149 def wrap(self, sock, addr = None): 150 """ 151 Wraps an existing low-level socket object. addr specifies the address 152 corresponding to the socket. 153 """ 154 super(TLSSocket, self).wrap(sock, addr) 155 # since _update_read_monitor is deactivated we need to always register 156 # the rmon to the notifier. 157 if not self._rmon.active(): 158 self._rmon.register(self._socket.fileno(), kaa.IO_READ) 159 160 def write(self, data): 161 """ 162 Write data to the socket. The data will be delayed while the socket 163 is doing the TLS handshake. 164 """ 142 def _is_read_connected(self): 143 """ 144 Returns True if we're interested in read events. 145 """ 146 # During the handshake stage, we handle all reads internally. So 147 # if self._handshake is True, we are always interested in read 148 # events. If it's False, we defer to the default behaviour. 149 # 150 # We can't simply always return True, because then read() and 151 # readline() may not work correctly (due to a race condition 152 # described in IOChannel._handle_read) 153 return self._handshake or super(TLSSocket, self)._is_read_connected() 154 155 156 def _handle_write(self): 165 157 if self._handshake: 166 # do not send data while doing a handshake 167 return self._write_buffer.append(data) 168 return super(TLSSocket, self).write(data) 158 # During the handshake stage, we don't want to write user data 159 # to the socket. It's still queued; we return immediately 160 # and retry later. 161 return 162 return super(TLSSocket, self)._handle_write() 163 169 164 170 165 def _handle_read(self): … … 180 175 return self.close(immediate=True, expected=False) 181 176 177 182 178 @kaa.coroutine() 183 179 def starttls_client(self, session=None, key=None, srp=None, checker=None): … … 194 190 @param checker: callback to check the credentials from the server 195 191 """ 192 if not self._rmon: 193 raise RuntimeError('Socket not connected') 196 194 try: 197 195 self._handshake = True 198 196 if session is None: 199 197 session = tlslite.api.Session() 200 c = TLSConnection(self._ socket)198 c = TLSConnection(self._channel) 201 199 c.ignoreAbruptClose = True 202 200 self._rmon.unregister() … … 210 208 else: 211 209 yield c.handshakeClientCert(session=session, checker=checker) 212 self._ socket= c210 self._channel = c 213 211 self.signals['tls'].emit() 214 self._rmon.register(self._socket.fileno(), kaa.IO_READ) 212 self._update_read_monitor() 213 finally: 215 214 self._handshake = False 216 except: 217 self._handshake = False 218 type, value, tb = sys.exc_info() 219 raise type, value, tb 215 220 216 221 217 @kaa.coroutine() … … 236 232 try: 237 233 self._handshake = True 238 c = TLSConnection(self._ socket)234 c = TLSConnection(self._channel) 239 235 c.ignoreAbruptClose = True 240 236 self._rmon.unregister() … … 248 244 kwargs['reqCert'] = True 249 245 yield c.handshakeServer(checker=checker, **kwargs) 250 self._ socket= c246 self._channel = c 251 247 self.signals['tls'].emit() 252 self._rmon.register(self._socket.fileno(), kaa.IO_READ) 248 self._update_read_monitor() 249 finally: 253 250 self._handshake = False 254 except:255 self._handshake = False256 type, value, tb = sys.exc_info()257 raise type, value, tb258 251 259 252 -
trunk/base/src/notifier/io.py
r3662 r3663 31 31 # ----------------------------------------------------------------------------- 32 32 33 __all__ = [ 'IO_READ', 'IO_WRITE', 'IOMonitor', 'WeakIOMonitor', 'IO Descriptor' ]33 __all__ = [ 'IO_READ', 'IO_WRITE', 'IOMonitor', 'WeakIOMonitor', 'IOChannel' ] 34 34 35 35 import sys … … 106 106 from kaa.notifier import main 107 107 108 109 class IODescriptor(object): 108 class IOChannel(object): 110 109 """ 111 110 Base class for read-only, write-only or read-write descriptors such as 112 111 Socket and Process. Implements logic common to communication over 113 descriptors such as async read/writes and read/write buffering.112 such channels such as async read/writes and read/write buffering. 114 113 """ 115 def __init__(self, fd=None, mode=IO_READ|IO_WRITE, chunk_size=1024*1024):114 def __init__(self, channel=None, mode=IO_READ|IO_WRITE, chunk_size=1024*1024): 116 115 self.signals = Signals('closed', 'read', 'readline', 'write') 117 116 self._write_queue = [] … … 130 129 131 130 # These variables hold the IOMonitors for monitoring; we only allocate 132 # a monitor when the fdis connected to avoid a ref cycle so that133 # disconnected fds will get properly deleted when they are not131 # a monitor when the channel is connected to avoid a ref cycle so that 132 # disconnected channels will get properly deleted when they are not 134 133 # referenced. 135 134 self._rmon = None 136 135 self._wmon = None 137 136 138 self.wrap( fd, mode)137 self.wrap(channel, mode) 139 138 140 139 … … 142 141 def alive(self): 143 142 """ 144 Returns True if the fdexists and is open.145 """ 146 # If the fd is closed, self._fdwill be None.147 return self._ fd!= None143 Returns True if the channel exists and is open. 144 """ 145 # If the channel is closed, self._channel will be None. 146 return self._channel != None 148 147 149 148 … … 151 150 def fileno(self): 152 151 """ 153 Returns the integer for this file descriptor, or None if no descriptor154 has been set.152 Returns the file descriptor (integer) for this channel, or None if no 153 channel has been set. 155 154 """ 156 155 try: 157 return self._ fd.fileno()156 return self._channel.fileno() 158 157 except AttributeError: 159 return self._ fd158 return self._channel 160 159 161 160 … … 163 162 def chunk_size(self): 164 163 """ 165 Number of bytes to attempt to read from the fdat a time. The164 Number of bytes to attempt to read from the channel at a time. The 166 165 default is 1M. A 'read' signal is emitted for each chunk read from the 167 fd. (The number of bytes read at a time may be less than the chunk166 channel. (The number of bytes read at a time may be less than the chunk 168 167 size, but will never be more.) 169 168 """ … … 182 181 """ 183 182 Returns the number of bytes queued in memory to be written to the 184 descriptor.183 channel. 185 184 """ 186 185 # XXX: this is not terribly efficient when the write queue has … … 201 200 Update read IOMonitor to register or unregister based on if there are 202 201 any handlers attached to the read signals. If there are no handlers, 203 there is no point in reading data from the descriptorsince it will go202 there is no point in reading data from the channel since it will go 204 203 nowhere. This also allows us to push back the read buffer to the OS. 205 204 … … 218 217 def _set_non_blocking(self): 219 218 """ 220 Low-level call to set the fdnon-blocking. Can be overridden by219 Low-level call to set the channel non-blocking. Can be overridden by 221 220 subclasses. 222 221 """ … … 225 224 226 225 227 def wrap(self, fd, mode): 228 """ 229 Wraps an existing file descriptor. 230 """ 231 self._fd = fd 226 def wrap(self, channel, mode): 227 """ 228 Wraps an existing channel. Assumes a file-like object or a file 229 descriptor (int). 230 """ 231 self._channel = channel 232 232 self._mode = mode 233 if not fd:233 if not channel: 234 234 return 235 235 self._set_non_blocking() … … 256 256 def _is_readable(self): 257 257 """ 258 Low-level call to read from fd. Can be overridden by subclasses.259 """ 260 return self._ fd!= None258 Low-level call to read from channel. Can be overridden by subclasses. 259 """ 260 return self._channel != None 261 261 262 262 … … 266 266 """ 267 267 if not (self._mode & IO_READ): 268 raise IOError(9, 'Cannot read on a write-only descriptor')268 raise IOError(9, 'Cannot read on a write-only channel') 269 269 if not self._is_readable(): 270 # fdis not readable. Return an InProgress pre-finished270 # channel is not readable. Return an InProgress pre-finished 271 271 # with None 272 272 return InProgress().finish(None) … … 277 277 def read(self): 278 278 """ 279 Reads a chunk of data from the fd. This function returns an279 Reads a chunk of data from the channel. This function returns an 280 280 InProgress object. If the InProgress is finished with None, it 281 means that no data was collected and the fdclosed.281 means that no data was collected and the channel closed. 282 282 283 283 It is therefore possible to busy-loop by reading on a closed 284 fd::284 channel:: 285 285 286 286 while True: 287 fd.read().wait()287 channel.read().wait() 288 288 289 289 So the return value of read() should be checked. Alternatively, 290 fd.alive could be tested::291 292 while fd.alive:293 fd.read().wait()290 channel.alive could be tested:: 291 292 while channel.alive: 293 channel.read().wait() 294 294 295 295 """ … … 299 299 def readline(self): 300 300 """ 301 Reads a line from the fd(with newline stripped). The function301 Reads a line from the channel (with newline stripped). The function 302 302 returns an InProgress object. If the InProgress is finished with None 303 303 or the empty string, it means that no data was collected and the socket … … 309 309 def _read(self, size): 310 310 """ 311 Low-level call to read from fd. Can be overridden by subclasses.311 Low-level call to read from channel. Can be overridden by subclasses. 312 312 Must return a string of at most size bytes, or the empty string or 313 313 None if no data is available. … … 318 318 def _handle_read(self): 319 319 """ 320 IOMonitor callback when there is data to be read from the fd.320 IOMonitor callback when there is data to be read from the channel. 321 321 322 322 This callback is only registered when we know the user is interested in … … 334 334 data = None 335 335 except: 336 log.exception('%s._handle_read failed with unknown exception, closing socket', self.__class__.__name__)336 log.exception('%s._handle_read failed, closing socket', self.__class__.__name__) 337 337 data = None 338 338 … … 359 359 def _write(self, data): 360 360 """ 361 Low-level call to write to the fdCan be overridden by subclasses.362 Must return number of bytes written to the file descriptor.361 Low-level call to write to the channel Can be overridden by subclasses. 362 Must return number of bytes written to the channel. 363 363 """ 364 364 return os.write(self.fileno, data) … … 367 367 def write(self, data): 368 368 """ 369 Writes the given data to the fd. This method returns an InProgress369 Writes the given data to the channel. This method returns an InProgress 370 370 object which is finished when the given data is fully written to the 371 file descriptor.372 373 It is not required that the descriptorbe open in order to write to it.374 Written data is queued until the descriptoropen and then flushed. As371 channel. 372 373 It is not required that the channel be open in order to write to it. 374 Written data is queued until the channel open and then flushed. As 375 375 writes are asynchronous, all written data is queued. It is the 376 376 caller's responsibility to ensure the internal write queue does not … … 378 378 finish before writing more data. 379 379 380 If a write does not complete because the file descriptorwas closed380 If a write does not complete because the channel was closed 381 381 prematurely, an IOError is thrown to the InProgress. 382 382 """ 383 383 if not (self._mode & IO_WRITE): 384 raise IOError(9, 'Cannot write to a read-only descriptor')384 raise IOError(9, 'Cannot write to a read-only channel') 385 385 386 386 inprogress = InProgress() 387 387 if data: 388 388 self._write_queue.append((data, inprogress)) 389 if self._ fdand self._wmon and not self._wmon.active():389 if self._channel and self._wmon and not self._wmon.active(): 390 390 self._wmon.register(self.fileno, IO_WRITE) 391 391 else: … … 398 398 def _handle_write(self): 399 399 """ 400 IOMonitor callback when the fdis writable. This callback is not400 IOMonitor callback when the channel is writable. This callback is not 401 401 registered then the write queue is empty, so we only get called when 402 402 there is something to write. … … 453 453 def _close(self): 454 454 """ 455 Low-level call to close the fdCan be overridden by subclasses.455 Low-level call to close the channel. Can be overridden by subclasses. 456 456 """ 457 457 try: 458 self._ fd.close()458 self._channel.close() 459 459 except AttributeError: 460 460 os.close(self.fileno) … … 463 463 def close(self, immediate=False, expected=True): 464 464 """ 465 Closes the fd. If immediate is False and there is data in the466 write buffer, the fdis closed once the write buffer is emptied.467 Otherwise the fdis closed immediately and the 'closed' signal465 Closes the channel. If immediate is False and there is data in the 466 write buffer, the channel is closed once the write buffer is emptied. 467 Otherwise the channel is closed immediately and the 'closed' signal 468 468 is emitted. 469 469 """ … … 492 492 # Somebody cares about this InProgress, so we need to finish 493 493 # it. 494 inprogress.throw(IOError, IOError(9, "Descriptor closed prematurely"), None)494 inprogress.throw(IOError, IOError(9, 'Channel closed prematurely'), None) 495 495 del self._write_queue[:] 496 496 … … 498 498 self._close() 499 499 except (IOError, socket.error), (errno, msg): 500 # Descriptormay already be closed, which is ok.500 # Channel may already be closed, which is ok. 501 501 if errno != 9: 502 502 # It isn't, this is some other error, so reraise exception. 503 503 raise 504 504 finally: 505 self._ fd= None505 self._channel = None 506 506 507 507 self.signals['closed'].emit(expected) -
trunk/base/src/notifier/sockets.py
r3662 r3663 42 42 from thread import threaded 43 43 from async import InProgress 44 from io import IO_READ, IO_WRITE, IO Descriptor44 from io import IO_READ, IO_WRITE, IOChannel 45 45 from kaa.utils import property 46 46 from kaa.tmpfile import tempfile … … 52 52 pass 53 53 54 class Socket(IO Descriptor):54 class Socket(IOChannel): 55 55 """ 56 56 Notifier-aware socket class, implementing fully asynchronous reads … … 69 69 70 70 def __repr__(self): 71 if not self._fd: 72 return '<kaa.Socket - disconnected>' 73 return '<kaa.Socket fd=%d>' % self.fileno 71 clsname = self.__class__.__name__ 72 if not self._channel: 73 return '<kaa.%s - disconnected>' % clsname 74 return '<kaa.%s fd=%d>' % (clsname, self.fileno) 74 75 75 76 … … 113 114 try: 114 115 # Will raise exception if socket is not connected. 115 self._ fd.getpeername()116 self._channel.getpeername() 116 117 return True 117 118 except: … … 143 144 def buffer_size(self, size): 144 145 self._buffer_size = size 145 if self._ fdand size:146 self._set_buffer_size(self._ fd, size)146 if self._channel and size: 147 self._set_buffer_size(self._channel, size) 147 148 148 149 … … 299 300 300 301 def _set_non_blocking(self): 301 self._ fd.setblocking(False)302 self._channel.setblocking(False) 302 303 303 304 304 305 def _is_readable(self): 305 return self._ fdand not self._connecting306 return self._channel and not self._connecting 306 307 307 308 308 309 def _read(self, size): 309 return self._ fd.recv(size)310 return self._channel.recv(size) 310 311 311 312 312 313 def _write(self, data): 313 return self._ fd.send(data)314 return self._channel.send(data) 314 315 315 316 … … 318 319 Accept a new connection and return a new Socket object. 319 320 """ 320 sock, addr = self._ fd.accept()321 sock, addr = self._channel.accept() 321 322 # create new Socket from the same class this object is 322 323 client_socket = self.__class__()
