Changeset 3663

Show
Ignore:
Timestamp:
16/11/08 20:28:02 (8 weeks ago)
Author:
tack
Message:

Rename IODescriptor to IOChannel; fix TLSSocket to confirm to Socket
changes, and also fix a bug where tlssocket.read() might miss data.

Location:
trunk/base/src
Files:
3 modified

Legend:

Unmodified
Added
Removed
  • trunk/base/src/net/tls.py

    r3646 r3663  
    105105        return self._iterate_handshake(handshake) 
    106106 
     107 
    107108    def fileno(self): 
    108109        """ 
     
    111112        """ 
    112113        return self.sock.fileno() 
     114 
    113115 
    114116    def close(self): 
     
    135137    def __init__(self): 
    136138        kaa.Socket.__init__(self) 
    137         self.signals['tls'] = kaa.Signal() 
     139        self.signals += ('tls',) 
    138140        self._handshake = False 
    139141 
    140     def _update_read_monitor(self, signal = None, change = None): 
    141         # FIXME: This function is broken in TLSSocket for two reasons: 
    142         # 1. auto-reconnect while doing a tls handshake is wrong 
    143         #    This could be fixed using self._handshake 
    144         # 2. Passing self._socket to register does not work, 
    145         #    self._socket.fileno() is needed. Always using fileno() 
    146         #    does not work for some strange reason. 
    147         return 
    148  
    149     def wrap(self, sock, addr = None): 
    150         """ 
    151         Wraps an existing low-level socket object.  addr specifies the address 
    152         corresponding to the socket. 
    153         """ 
    154         super(TLSSocket, self).wrap(sock, addr) 
    155         # since _update_read_monitor is deactivated we need to always register 
    156         # the rmon to the notifier. 
    157         if not self._rmon.active(): 
    158             self._rmon.register(self._socket.fileno(), kaa.IO_READ) 
    159  
    160     def write(self, data): 
    161         """ 
    162         Write data to the socket. The data will be delayed while the socket 
    163         is doing the TLS handshake. 
    164         """ 
     142    def _is_read_connected(self): 
     143        """ 
     144        Returns True if we're interested in read events. 
     145        """ 
     146        # During the handshake stage, we handle all reads internally.  So 
     147        # if self._handshake is True, we are always interested in read 
     148        # events.  If it's False, we defer to the default behaviour. 
     149        # 
     150        # We can't simply always return True, because then read() and 
     151        # readline() may not work correctly (due to a race condition 
     152        # described in IOChannel._handle_read) 
     153        return self._handshake or super(TLSSocket, self)._is_read_connected() 
     154 
     155 
     156    def _handle_write(self): 
    165157        if self._handshake: 
    166             # do not send data while doing a handshake 
    167             return self._write_buffer.append(data) 
    168         return super(TLSSocket, self).write(data) 
     158            # During the handshake stage, we don't want to write user data 
     159            # to the socket.  It's still queued; we return immediately 
     160            # and retry later. 
     161            return 
     162        return super(TLSSocket, self)._handle_write() 
     163 
    169164 
    170165    def _handle_read(self): 
     
    180175            return self.close(immediate=True, expected=False) 
    181176 
     177 
    182178    @kaa.coroutine() 
    183179    def starttls_client(self, session=None, key=None, srp=None, checker=None): 
     
    194190        @param checker: callback to check the credentials from the server 
    195191        """ 
     192        if not self._rmon: 
     193            raise RuntimeError('Socket not connected') 
    196194        try: 
    197195            self._handshake = True 
    198196            if session is None: 
    199197                session = tlslite.api.Session() 
    200             c = TLSConnection(self._socket) 
     198            c = TLSConnection(self._channel) 
    201199            c.ignoreAbruptClose = True 
    202200            self._rmon.unregister() 
     
    210208            else: 
    211209                yield c.handshakeClientCert(session=session, checker=checker) 
    212             self._socket = c 
     210            self._channel = c 
    213211            self.signals['tls'].emit() 
    214             self._rmon.register(self._socket.fileno(), kaa.IO_READ) 
     212            self._update_read_monitor() 
     213        finally: 
    215214            self._handshake = False 
    216         except: 
    217             self._handshake = False 
    218             type, value, tb = sys.exc_info() 
    219             raise type, value, tb 
     215 
    220216 
    221217    @kaa.coroutine() 
     
    236232        try: 
    237233            self._handshake = True 
    238             c = TLSConnection(self._socket) 
     234            c = TLSConnection(self._channel) 
    239235            c.ignoreAbruptClose = True 
    240236            self._rmon.unregister() 
     
    248244                kwargs['reqCert'] = True 
    249245            yield c.handshakeServer(checker=checker, **kwargs) 
    250             self._socket = c 
     246            self._channel = c 
    251247            self.signals['tls'].emit() 
    252             self._rmon.register(self._socket.fileno(), kaa.IO_READ) 
     248            self._update_read_monitor() 
     249        finally: 
    253250            self._handshake = False 
    254         except: 
    255             self._handshake = False 
    256             type, value, tb = sys.exc_info() 
    257             raise type, value, tb 
    258251 
    259252 
  • trunk/base/src/notifier/io.py

    r3662 r3663  
    3131# ----------------------------------------------------------------------------- 
    3232 
    33 __all__ = [ 'IO_READ', 'IO_WRITE', 'IOMonitor', 'WeakIOMonitor', 'IODescriptor' ] 
     33__all__ = [ 'IO_READ', 'IO_WRITE', 'IOMonitor', 'WeakIOMonitor', 'IOChannel' ] 
    3434 
    3535import sys 
     
    106106from kaa.notifier import main 
    107107 
    108  
    109 class IODescriptor(object): 
     108class IOChannel(object): 
    110109    """ 
    111110    Base class for read-only, write-only or read-write descriptors such as 
    112111    Socket and Process.  Implements logic common to communication over 
    113     descriptors such as async read/writes and read/write buffering. 
     112    such channels such as async read/writes and read/write buffering. 
    114113    """ 
    115     def __init__(self, fd=None, mode=IO_READ|IO_WRITE, chunk_size=1024*1024): 
     114    def __init__(self, channel=None, mode=IO_READ|IO_WRITE, chunk_size=1024*1024): 
    116115        self.signals = Signals('closed', 'read', 'readline', 'write') 
    117116        self._write_queue = [] 
     
    130129 
    131130        # These variables hold the IOMonitors for monitoring; we only allocate 
    132         # a monitor when the fd is connected to avoid a ref cycle so that 
    133         # disconnected fds will get properly deleted when they are not 
     131        # a monitor when the channel is connected to avoid a ref cycle so that 
     132        # disconnected channels will get properly deleted when they are not 
    134133        # referenced. 
    135134        self._rmon = None 
    136135        self._wmon = None 
    137136 
    138         self.wrap(fd, mode) 
     137        self.wrap(channel, mode) 
    139138 
    140139 
     
    142141    def alive(self): 
    143142        """ 
    144         Returns True if the fd exists and is open. 
    145         """ 
    146         # If the fd is closed, self._fd will be None. 
    147         return self._fd != None 
     143        Returns True if the channel exists and is open. 
     144        """ 
     145        # If the channel is closed, self._channel will be None. 
     146        return self._channel != None 
    148147 
    149148 
     
    151150    def fileno(self): 
    152151        """ 
    153         Returns the integer for this file descriptor, or None if no descriptor 
    154         has been set. 
     152        Returns the file descriptor (integer) for this channel, or None if no 
     153        channel has been set. 
    155154        """ 
    156155        try: 
    157             return self._fd.fileno() 
     156            return self._channel.fileno() 
    158157        except AttributeError: 
    159             return self._fd 
     158            return self._channel 
    160159 
    161160 
     
    163162    def chunk_size(self): 
    164163        """ 
    165         Number of bytes to attempt to read from the fd at a time.  The 
     164        Number of bytes to attempt to read from the channel at a time.  The 
    166165        default is 1M.  A 'read' signal is emitted for each chunk read from the 
    167         fd.  (The number of bytes read at a time may be less than the chunk 
     166        channel.  (The number of bytes read at a time may be less than the chunk 
    168167        size, but will never be more.) 
    169168        """ 
     
    182181        """ 
    183182        Returns the number of bytes queued in memory to be written to the 
    184         descriptor. 
     183        channel. 
    185184        """ 
    186185        # XXX: this is not terribly efficient when the write queue has 
     
    201200        Update read IOMonitor to register or unregister based on if there are 
    202201        any handlers attached to the read signals.  If there are no handlers, 
    203         there is no point in reading data from the descriptor since it will go  
     202        there is no point in reading data from the channel since it will go  
    204203        nowhere.  This also allows us to push back the read buffer to the OS. 
    205204 
     
    218217    def _set_non_blocking(self): 
    219218        """ 
    220         Low-level call to set the fd non-blocking.  Can be overridden by 
     219        Low-level call to set the channel non-blocking.  Can be overridden by 
    221220        subclasses. 
    222221        """ 
     
    225224 
    226225 
    227     def wrap(self, fd, mode): 
    228         """ 
    229         Wraps an existing file descriptor. 
    230         """ 
    231         self._fd = fd 
     226    def wrap(self, channel, mode): 
     227        """ 
     228        Wraps an existing channel.  Assumes a file-like object or a file 
     229        descriptor (int). 
     230        """ 
     231        self._channel = channel 
    232232        self._mode = mode 
    233         if not fd: 
     233        if not channel: 
    234234            return 
    235235        self._set_non_blocking() 
     
    256256    def _is_readable(self): 
    257257        """ 
    258         Low-level call to read from fd.  Can be overridden by subclasses. 
    259         """ 
    260         return self._fd != None 
     258        Low-level call to read from channel.  Can be overridden by subclasses. 
     259        """ 
     260        return self._channel != None 
    261261 
    262262 
     
    266266        """ 
    267267        if not (self._mode & IO_READ): 
    268             raise IOError(9, 'Cannot read on a write-only descriptor') 
     268            raise IOError(9, 'Cannot read on a write-only channel') 
    269269        if not self._is_readable(): 
    270             # fd is not readable.  Return an InProgress pre-finished 
     270            # channel is not readable.  Return an InProgress pre-finished 
    271271            # with None 
    272272            return InProgress().finish(None) 
     
    277277    def read(self): 
    278278        """ 
    279         Reads a chunk of data from the fd.  This function returns an  
     279        Reads a chunk of data from the channel.  This function returns an  
    280280        InProgress object.  If the InProgress is finished with None, it 
    281         means that no data was collected and the fd closed. 
     281        means that no data was collected and the channel closed. 
    282282 
    283283        It is therefore possible to busy-loop by reading on a closed 
    284         fd:: 
     284        channel:: 
    285285 
    286286            while True: 
    287                 fd.read().wait() 
     287                channel.read().wait() 
    288288 
    289289        So the return value of read() should be checked.  Alternatively, 
    290         fd.alive could be tested:: 
    291  
    292             while fd.alive: 
    293                 fd.read().wait() 
     290        channel.alive could be tested:: 
     291 
     292            while channel.alive: 
     293                channel.read().wait() 
    294294 
    295295        """ 
     
    299299    def readline(self): 
    300300        """ 
    301         Reads a line from the fd (with newline stripped).  The function 
     301        Reads a line from the channel (with newline stripped).  The function 
    302302        returns an InProgress object.  If the InProgress is finished with None 
    303303        or the empty string, it means that no data was collected and the socket 
     
    309309    def _read(self, size): 
    310310        """ 
    311         Low-level call to read from fd.  Can be overridden by subclasses. 
     311        Low-level call to read from channel.  Can be overridden by subclasses. 
    312312        Must return a string of at most size bytes, or the empty string or 
    313313        None if no data is available. 
     
    318318    def _handle_read(self): 
    319319        """ 
    320         IOMonitor callback when there is data to be read from the fd. 
     320        IOMonitor callback when there is data to be read from the channel. 
    321321         
    322322        This callback is only registered when we know the user is interested in 
     
    334334            data = None 
    335335        except: 
    336             log.exception('%s._handle_read failed with unknown exception, closing socket', self.__class__.__name__) 
     336            log.exception('%s._handle_read failed, closing socket', self.__class__.__name__) 
    337337            data = None 
    338338 
     
    359359    def _write(self, data): 
    360360        """ 
    361         Low-level call to write to the fd  Can be overridden by subclasses. 
    362         Must return number of bytes written to the file descriptor. 
     361        Low-level call to write to the channel  Can be overridden by subclasses. 
     362        Must return number of bytes written to the channel. 
    363363        """ 
    364364        return os.write(self.fileno, data) 
     
    367367    def write(self, data): 
    368368        """ 
    369         Writes the given data to the fd.  This method returns an InProgress 
     369        Writes the given data to the channel.  This method returns an InProgress 
    370370        object which is finished when the given data is fully written to the 
    371         file descriptor. 
    372  
    373         It is not required that the descriptor be open in order to write to it. 
    374         Written data is queued until the descriptor open and then flushed.  As 
     371        channel. 
     372 
     373        It is not required that the channel be open in order to write to it. 
     374        Written data is queued until the channel open and then flushed.  As 
    375375        writes are asynchronous, all written data is queued.  It is the 
    376376        caller's responsibility to ensure the internal write queue does not 
     
    378378        finish before writing more data. 
    379379 
    380         If a write does not complete because the file descriptor was closed 
     380        If a write does not complete because the channel was closed 
    381381        prematurely, an IOError is thrown to the InProgress. 
    382382        """ 
    383383        if not (self._mode & IO_WRITE): 
    384             raise IOError(9, 'Cannot write to a read-only descriptor') 
     384            raise IOError(9, 'Cannot write to a read-only channel') 
    385385 
    386386        inprogress = InProgress() 
    387387        if data: 
    388388            self._write_queue.append((data, inprogress)) 
    389             if self._fd and self._wmon and not self._wmon.active(): 
     389            if self._channel and self._wmon and not self._wmon.active(): 
    390390                self._wmon.register(self.fileno, IO_WRITE) 
    391391        else: 
     
    398398    def _handle_write(self): 
    399399        """ 
    400         IOMonitor callback when the fd is writable.  This callback is not 
     400        IOMonitor callback when the channel is writable.  This callback is not 
    401401        registered then the write queue is empty, so we only get called when 
    402402        there is something to write. 
     
    453453    def _close(self): 
    454454        """ 
    455         Low-level call to close the fd  Can be overridden by subclasses. 
     455        Low-level call to close the channel.  Can be overridden by subclasses. 
    456456        """ 
    457457        try: 
    458             self._fd.close() 
     458            self._channel.close() 
    459459        except AttributeError: 
    460460            os.close(self.fileno) 
     
    463463    def close(self, immediate=False, expected=True): 
    464464        """ 
    465         Closes the fd.  If immediate is False and there is data in the 
    466         write buffer, the fd is closed once the write buffer is emptied. 
    467         Otherwise the fd is closed immediately and the 'closed' signal 
     465        Closes the channel.  If immediate is False and there is data in the 
     466        write buffer, the channel is closed once the write buffer is emptied. 
     467        Otherwise the channel is closed immediately and the 'closed' signal 
    468468        is emitted. 
    469469        """ 
     
    492492                # Somebody cares about this InProgress, so we need to finish 
    493493                # it. 
    494                 inprogress.throw(IOError, IOError(9, "Descriptor closed prematurely"), None) 
     494                inprogress.throw(IOError, IOError(9, 'Channel closed prematurely'), None) 
    495495        del self._write_queue[:] 
    496496 
     
    498498            self._close() 
    499499        except (IOError, socket.error), (errno, msg): 
    500             # Descriptor may already be closed, which is ok. 
     500            # Channel may already be closed, which is ok. 
    501501            if errno != 9: 
    502502                # It isn't, this is some other error, so reraise exception. 
    503503                raise 
    504504        finally: 
    505             self._fd = None 
     505            self._channel = None 
    506506 
    507507            self.signals['closed'].emit(expected) 
  • trunk/base/src/notifier/sockets.py

    r3662 r3663  
    4242from thread import threaded 
    4343from async import InProgress 
    44 from io import IO_READ, IO_WRITE, IODescriptor 
     44from io import IO_READ, IO_WRITE, IOChannel 
    4545from kaa.utils import property 
    4646from kaa.tmpfile import tempfile 
     
    5252    pass 
    5353 
    54 class Socket(IODescriptor): 
     54class Socket(IOChannel): 
    5555    """ 
    5656    Notifier-aware socket class, implementing fully asynchronous reads 
     
    6969 
    7070    def __repr__(self): 
    71         if not self._fd: 
    72             return '<kaa.Socket - disconnected>' 
    73         return '<kaa.Socket fd=%d>' % self.fileno 
     71        clsname = self.__class__.__name__ 
     72        if not self._channel: 
     73            return '<kaa.%s - disconnected>' % clsname 
     74        return '<kaa.%s fd=%d>' % (clsname, self.fileno) 
    7475 
    7576 
     
    113114        try: 
    114115            # Will raise exception if socket is not connected. 
    115             self._fd.getpeername() 
     116            self._channel.getpeername() 
    116117            return True 
    117118        except: 
     
    143144    def buffer_size(self, size): 
    144145        self._buffer_size = size 
    145         if self._fd and size: 
    146             self._set_buffer_size(self._fd, size) 
     146        if self._channel and size: 
     147            self._set_buffer_size(self._channel, size) 
    147148 
    148149 
     
    299300 
    300301    def _set_non_blocking(self): 
    301         self._fd.setblocking(False) 
     302        self._channel.setblocking(False) 
    302303 
    303304 
    304305    def _is_readable(self): 
    305         return self._fd and not self._connecting 
     306        return self._channel and not self._connecting 
    306307 
    307308 
    308309    def _read(self, size): 
    309         return self._fd.recv(size) 
     310        return self._channel.recv(size) 
    310311 
    311312 
    312313    def _write(self, data): 
    313         return self._fd.send(data) 
     314        return self._channel.send(data) 
    314315 
    315316 
     
    318319        Accept a new connection and return a new Socket object. 
    319320        """ 
    320         sock, addr = self._fd.accept() 
     321        sock, addr = self._channel.accept() 
    321322        # create new Socket from the same class this object is 
    322323        client_socket = self.__class__()